diff --git a/src/stasis/bufferManager/legacy/legacyBufferManager.c b/src/stasis/bufferManager/legacy/legacyBufferManager.c index 8e29e67..20557ae 100644 --- a/src/stasis/bufferManager/legacy/legacyBufferManager.c +++ b/src/stasis/bufferManager/legacy/legacyBufferManager.c @@ -115,7 +115,7 @@ static void bufManReleasePage (Page * p) { } -static Page * bufManGetPage(pageid_t pageid, int locktype, int uninitialized) { +static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) { Page * ret; int spin = 0; diff --git a/src/stasis/bufferManager/legacy/pageFile.c b/src/stasis/bufferManager/legacy/pageFile.c index 320117d..b5a3ff8 100644 --- a/src/stasis/bufferManager/legacy/pageFile.c +++ b/src/stasis/bufferManager/legacy/pageFile.c @@ -94,7 +94,7 @@ static void pfPageWrite(Page * ret) { // If necessary, force the log to disk so that ret's LSN will be stable. assert(ret->LSN == stasis_page_lsn_read(ret)); - LogForce(ret->LSN); + LogForce(stasis_log_file, ret->LSN); pthread_mutex_lock(&stable_mutex); diff --git a/src/stasis/logger/inMemoryLog.c b/src/stasis/logger/inMemoryLog.c index 584b559..09d4a60 100644 --- a/src/stasis/logger/inMemoryLog.c +++ b/src/stasis/logger/inMemoryLog.c @@ -8,17 +8,8 @@ static lsn_t globalOffset; static rwl * globalOffset_lock; static LogEntry ** buffer; static lsn_t bufferLen; -int open_InMemoryLog() { - flushedLSN_lock = initlock(); - globalOffset_lock = initlock(); - globalOffset = 0; - nextAvailableLSN = 0; - buffer = malloc(4096 * 1024 * sizeof (LogEntry *)); - bufferLen =4096 * 1024; - return 0; -} -int writeLogEntry_InMemoryLog(LogEntry *e) { +static int writeLogEntry_InMemoryLog(stasis_log_t * log, LogEntry *e) { writelock(flushedLSN_lock, 0); lsn_t bufferOffset; @@ -33,6 +24,7 @@ int writeLogEntry_InMemoryLog(LogEntry *e) { done = 1; } } while (!done); + return 0; e->LSN = nextAvailableLSN; @@ -48,22 +40,21 @@ int writeLogEntry_InMemoryLog(LogEntry *e) { unlock(globalOffset_lock); unlock(flushedLSN_lock); - return 0; } -lsn_t flushedLSN_InMemoryLog() { +static lsn_t flushedLSN_InMemoryLog(stasis_log_t* log) { return nextAvailableLSN; } -void syncLog_InMemoryLog() { +static void syncLog_InMemoryLog(stasis_log_t* log) { // no-op } -lsn_t nextEntry_InMemoryLog(const LogEntry * e) { +static lsn_t nextEntry_InMemoryLog(stasis_log_t * log, const LogEntry * e) { return e->LSN + 1; } -int truncateLog_InMemoryLog(lsn_t lsn) { +static int truncateLog_InMemoryLog(stasis_log_t * log, lsn_t lsn) { writelock(flushedLSN_lock,1); writelock(globalOffset_lock,1); @@ -85,11 +76,11 @@ int truncateLog_InMemoryLog(lsn_t lsn) { return 0; } -lsn_t firstLogEntry_InMemoryLog() { +static lsn_t firstLogEntry_InMemoryLog() { return globalOffset; } -void close_InMemoryLog() { +static int close_InMemoryLog(stasis_log_t * log) { if(buffer) { lsn_t firstEmptyOffset = nextAvailableLSN-globalOffset; for(lsn_t i = 0; i < firstEmptyOffset; i++) { @@ -103,10 +94,12 @@ void close_InMemoryLog() { buffer = 0; } - + free (log); + return 0; } -LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn) { +static const LogEntry * readLSNEntry_InMemoryLog(stasis_log_t* log, + lsn_t lsn) { // printf("lsn: %ld\n", lsn); if(lsn >= nextAvailableLSN) { return 0; } assert(lsn-globalOffset >= 0 && lsn-globalOffset< bufferLen); @@ -123,6 +116,32 @@ LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn) { //printf("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN); return ret; } -long sizeofInternalLogEntry_InMemoryLog(const LogEntry * e) { +static lsn_t sizeofInternalLogEntry_InMemoryLog(stasis_log_t* log, + const LogEntry * e) { abort(); } +static int isDurable_InMemoryLog(stasis_log_t*log) { return 0; } + +stasis_log_t* open_InMemoryLog() { + flushedLSN_lock = initlock(); + globalOffset_lock = initlock(); + globalOffset = 0; + nextAvailableLSN = 0; + buffer = malloc(4096 * 1024 * sizeof (LogEntry *)); + bufferLen =4096 * 1024; + static stasis_log_t proto = { + sizeofInternalLogEntry_InMemoryLog, // sizeof_internal_entry + writeLogEntry_InMemoryLog,// write_entry + readLSNEntry_InMemoryLog, // read_entry + nextEntry_InMemoryLog,// next_entry + flushedLSN_InMemoryLog, // first_unstable_lsn + syncLog_InMemoryLog, // force_tail + truncateLog_InMemoryLog, // truncate + firstLogEntry_InMemoryLog,// truncation_point + close_InMemoryLog, // deinit + isDurable_InMemoryLog// is_durable + }; + stasis_log_t* log = malloc(sizeof(*log)); + memcpy(log,&proto, sizeof(proto)); + return log; +} diff --git a/src/stasis/logger/logEntry.c b/src/stasis/logger/logEntry.c index 53cdc7b..f1bfaf6 100644 --- a/src/stasis/logger/logEntry.c +++ b/src/stasis/logger/logEntry.c @@ -123,11 +123,13 @@ LogEntry * allocCLRLogEntry(const LogEntry * old_e) { return (LogEntry*)ret; } +void freeLogEntry(const LogEntry* e) { + free((void*)e); +} - -long sizeofLogEntry(const LogEntry * log) { - switch (log->type) { +lsn_t sizeofLogEntry(const LogEntry * e) { + switch (e->type) { case CLRLOG: { return sizeof(CLRLogEntry); @@ -135,10 +137,10 @@ long sizeofLogEntry(const LogEntry * log) { case UPDATELOG: { return sizeof(struct __raw_log_entry) + - sizeof(UpdateLogEntry) + log->update.arg_size; + sizeof(UpdateLogEntry) + e->update.arg_size; } case INTERNALLOG: - return LoggerSizeOfInternalLogEntry(log); + return stasis_log_file->sizeof_internal_entry(stasis_log_file, e); case XPREPARE: return sizeof(struct __raw_log_entry)+sizeof(lsn_t); default: diff --git a/src/stasis/logger/logHandle.c b/src/stasis/logger/logHandle.c index 4f319aa..a5f5350 100644 --- a/src/stasis/logger/logHandle.c +++ b/src/stasis/logger/logHandle.c @@ -46,6 +46,16 @@ terms specified in this license. #include +struct LogHandle { + /** The LSN of the log entry that we would return if next is called. */ + lsn_t next_offset; + /** The LSN of the log entry that we would return if previous is called. */ + lsn_t prev_offset; + guard_fcn_t* guard; + void* guard_state; + stasis_log_t* log; +}; + /** Sets the next and prev field of h, but does not set h.file_offset. That should probably be set before calling this function. @@ -55,35 +65,39 @@ static void set_offsets(LogHandle * h, const LogEntry * e); /*-------------------------------------------------------*/ -LogHandle getLogHandle() { +LogHandle* getLogHandle(stasis_log_t* log) { - lsn_t lsn = LogTruncationPoint(); + lsn_t lsn = log->truncation_point(log); - return getGuardedHandle(lsn, NULL, NULL); + return getGuardedHandle(log, lsn, NULL, NULL); } -LogHandle getLSNHandle(lsn_t lsn) { - return getGuardedHandle(lsn, NULL, NULL); +LogHandle* getLSNHandle(stasis_log_t * log, lsn_t lsn) { + return getGuardedHandle(log, lsn, NULL, NULL); } -LogHandle getGuardedHandle(lsn_t lsn, guard_fcn_t * guard, void * guard_state) { - LogHandle ret; - ret.next_offset = lsn; - ret.prev_offset = lsn; - ret.guard = guard; - ret.guard_state = guard_state; +LogHandle* getGuardedHandle(stasis_log_t* log, lsn_t lsn, + guard_fcn_t * guard, void * guard_state) { + LogHandle* ret = malloc(sizeof(*ret)); + ret->next_offset = lsn; + ret->prev_offset = lsn; + ret->guard = guard; + ret->guard_state = guard_state; + ret->log = log; return ret; } - +void freeLogHandle(LogHandle* lh) { + free(lh); +} const LogEntry * nextInLog(LogHandle * h) { - const LogEntry * ret = LogReadLSN(h->next_offset); + const LogEntry * ret = h->log->read_entry(h->log,h->next_offset); if(ret != NULL) { set_offsets(h, ret); } if(h->guard) { if(!(h->guard(ret, h->guard_state))) { - FreeLogEntry(ret); + freeLogEntry(ret); ret = NULL; } } @@ -95,12 +109,12 @@ const LogEntry * nextInLog(LogHandle * h) { const LogEntry * previousInTransaction(LogHandle * h) { const LogEntry * ret = NULL; if(h->prev_offset > 0) { - ret = LogReadLSN(h->prev_offset); + ret = h->log->read_entry(h->log, h->prev_offset); set_offsets(h, ret); if(h->guard) { if(!h->guard(ret, h->guard_state)) { - FreeLogEntry(ret); + freeLogEntry(ret); ret = NULL; } } @@ -111,6 +125,6 @@ const LogEntry * previousInTransaction(LogHandle * h) { } static void set_offsets(LogHandle * h, const LogEntry * e) { - h->next_offset = LogNextEntry(e); + h->next_offset = h->log->next_entry(h->log, e); h->prev_offset = e->prevLSN; } diff --git a/src/stasis/logger/logWriter.c b/src/stasis/logger/logWriter.c index a580a4c..394f643 100644 --- a/src/stasis/logger/logWriter.c +++ b/src/stasis/logger/logWriter.c @@ -66,13 +66,14 @@ terms specified in this license. #include -/** - @todo Should the log file be global? +/** + @todo remove all static vairables from logWriter.c */ -static FILE * log = 0; +static FILE * logFILE = 0; static int roLogFD = 0; -int logWriter_isDurable = 1; +static int logWriter_isDurable = 1; +static lsn_t debug_lsn = -1; /** @see flushedLSN_LogWriter() @@ -106,27 +107,26 @@ static lsn_t global_offset; It is also used by truncateLog to block read requests while rename() is called. */ -pthread_mutex_t log_read_mutex; +static pthread_mutex_t log_read_mutex; /** Invariant: Any thread writing to the file must hold this lock. The log truncation thread hold this lock from the point where it copies the tail of the old log to the new log, until after the rename call - returns. -*/ -pthread_mutex_t log_write_mutex; - + returns. +*/ +static pthread_mutex_t log_write_mutex; /** This mutex protects nextAvailableLSN, which has its own mutex because the routines for reading and writing log entries both need to acquire it, but only for a relatively short time. */ -pthread_mutex_t nextAvailableLSN_mutex; +static pthread_mutex_t nextAvailableLSN_mutex; /** Invariant: We only want one thread in truncateLog at a time. */ -pthread_mutex_t truncateLog_mutex; +static pthread_mutex_t truncateLog_mutex; static char * buffer; @@ -145,14 +145,56 @@ static unsigned int log_crc; static inline void update_log_crc(const LogEntry * le, unsigned int * crc) { *crc = stasis_crc32(le, sizeofLogEntry(le), *crc); } - -static LogEntry * readLogEntry(); +// internal methods static void syncLogInternal(); -int openLogWriter() { +static LogEntry* readLogEntry(); +static inline lsn_t nextEntry(const LogEntry* e) { + return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t); +} + +// implementations of log api methods +static lsn_t sizeofInternalLogEntry_LogWriter(stasis_log_t * log, + const LogEntry * e); +static void syncLog_LogWriter(); +static lsn_t flushedLSN_LogWriter(); +static const LogEntry* readLSNEntry_LogWriter(stasis_log_t * log, lsn_t lsn); +static int truncateLog_LogWriter(stasis_log_t* log, lsn_t lsn); +static int close_LogWriter(stasis_log_t * log); +static int writeLogEntry_LogWriter(stasis_log_t* log, LogEntry * e); + +static inline int isDurable_LogWriter(stasis_log_t* log) { + return logWriter_isDurable; +} +static inline lsn_t firstLogEntry_LogWriter(stasis_log_t* log); + +static inline lsn_t nextEntry_LogWriter(stasis_log_t* log, + const LogEntry* e) { + return nextEntry(e); +} + +stasis_log_t* openLogWriter() { + + stasis_log_t proto = { + sizeofInternalLogEntry_LogWriter, // sizeof_internal_entry + writeLogEntry_LogWriter,// write_entry + readLSNEntry_LogWriter, // read_entry + nextEntry_LogWriter,// next_entry + flushedLSN_LogWriter, // first_unstable_lsn + syncLog_LogWriter, // force_tail + truncateLog_LogWriter, // truncate + firstLogEntry_LogWriter,// truncation_point + close_LogWriter, // deinit + isDurable_LogWriter // is_durable + }; + stasis_log_t* log = malloc(sizeof(*log)); + memcpy(log,&proto, sizeof(proto)); + + // XXX hack; we call things that call into this object during init! + stasis_log_file = log; buffer = malloc(BUFSIZE); - if(!buffer) { return LLADD_NO_MEM; } + if(!buffer) { return 0; /*LLADD_NO_MEM;*/ } /* The file is opened twice for a reason. fseek() seems to call fflush() under Linux, which normally would be a minor problem. @@ -173,16 +215,16 @@ int openLogWriter() { perror("Couldn't open log file for append.\n"); abort(); } - log = fdopen(logFD, "w+"); + logFILE = fdopen(logFD, "w+"); - if (log==NULL) { + if (logFILE==NULL) { perror("Couldn't open log file"); abort(); - return LLADD_IO_ERROR; + return 0; //LLADD_IO_ERROR; } /* Increase the length of log's buffer, since it's in O_SYNC mode. */ - setbuffer(log, buffer, BUFSIZE); + setbuffer(logFILE, buffer, BUFSIZE); /* fread() doesn't notice when another handle writes to its file, even if fflush() is used to push the changes out to disk. @@ -210,31 +252,31 @@ int openLogWriter() { length of the file. */ - if (myFseek(log, 0, SEEK_END)==0) { + if (myFseek(logFILE, 0, SEEK_END)==0) { /*if file is empty, write an LSN at the 0th position. LSN 0 is invalid, and this prevents us from using it. Also, the LSN at this position is used after log truncation to store the global offset for the truncated log. */ global_offset = 0; - size_t nmemb = fwrite(&global_offset, sizeof(lsn_t), 1, log); + size_t nmemb = fwrite(&global_offset, sizeof(lsn_t), 1, logFILE); if(nmemb != 1) { perror("Couldn't start new log file!"); - return LLADD_IO_ERROR; + return 0; //LLADD_IO_ERROR; } } else { off_t newPosition = lseek(roLogFD, 0, SEEK_SET); if(newPosition == -1) { perror("Could not seek to head of log"); - return LLADD_IO_ERROR; + return 0; //LLADD_IO_ERROR; } ssize_t bytesRead = read(roLogFD, &global_offset, sizeof(lsn_t)); if(bytesRead != sizeof(lsn_t)) { printf("Could not read log header."); - return LLADD_IO_ERROR; + return 0;//LLADD_IO_ERROR; } } @@ -256,28 +298,28 @@ int openLogWriter() { while((le = readLogEntry())) { if(le->type == INTERNALLOG) { if (!(le->prevLSN) || (crc == (unsigned int) le->prevLSN)) { - nextAvailableLSN = nextEntry_LogWriter(le); //le->LSN + sizeofLogEntry(le) + sizeof(lsn_t); + nextAvailableLSN = nextEntry(le); crc = 0; } else { printf("Log corruption: %x != %x (lsn = %lld)\n", (unsigned int) le->prevLSN, crc, le->LSN); // The log wasn't successfully forced to this point; discard // everything after the last CRC. - FreeLogEntry(le); + freeLogEntry(le); break; } } else { update_log_crc(le, &crc); } - FreeLogEntry(le); + freeLogEntry(le); } - if(ftruncate(fileno(log), nextAvailableLSN-global_offset) == -1) { + if(ftruncate(fileno(logFILE), nextAvailableLSN-global_offset) == -1) { perror("Couldn't discard junk at end of log"); } // If there was trailing garbage at the end of the log, overwrite // it. - if(myFseek(log, nextAvailableLSN-global_offset, SEEK_SET) != nextAvailableLSN-global_offset) { + if(myFseek(logFILE, nextAvailableLSN-global_offset, SEEK_SET) != nextAvailableLSN-global_offset) { perror("Error repositioning log"); abort(); } @@ -289,7 +331,7 @@ int openLogWriter() { flushedLSN_internal = nextAvailableLSN; log_crc = 0; - return 0; + return log; } @@ -312,7 +354,7 @@ int openLogWriter() { */ -static int writeLogEntryUnlocked(LogEntry * e) { +static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) { const lsn_t size = sizeofLogEntry(e); @@ -329,12 +371,13 @@ static int writeLogEntryUnlocked(LogEntry * e) { // assert(e->LSN == (current_offset + global_offset)); // off_t oldOffset = ftell(log); - size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, log); + size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, logFILE); if(nmemb != 1) { - if(feof(log)) { abort(); /* feof makes no sense here */ } - if(ferror(log)) { - fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log)); + if(feof(logFILE)) { abort(); /* feof makes no sense here */ } + if(ferror(logFILE)) { + fprintf(stderr, "writeLog couldn't write next log entry: %d\n", + ferror(logFILE)); abort(); } abort(); @@ -348,15 +391,15 @@ static int writeLogEntryUnlocked(LogEntry * e) { // current_offset = ftell(log); // assert(e->LSN == (current_offset + global_offset - sizeof(lsn_t))); - nmemb = fwrite(e, size, 1, log); + nmemb = fwrite(e, size, 1, logFILE); // current_offset = ftell(log); // assert(e->LSN == current_offset + global_offset - sizeof(lsn_t) - size); if(nmemb != 1) { - if(feof(log)) { abort(); /* feof makes no sense here */ } - if(ferror(log)) { - fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log)); + if(feof(logFILE)) { abort(); /* feof makes no sense here */ } + if(ferror(logFILE)) { + fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(logFILE)); abort(); } abort(); @@ -366,20 +409,21 @@ static int writeLogEntryUnlocked(LogEntry * e) { pthread_mutex_lock(&nextAvailableLSN_mutex); assert(nextAvailableLSN == e->LSN); - nextAvailableLSN = nextEntry_LogWriter(e); + nextAvailableLSN = nextEntry(e); pthread_mutex_unlock(&nextAvailableLSN_mutex); return 0; } -int writeLogEntry(LogEntry * e) { +static int writeLogEntry_LogWriter(stasis_log_t* log, LogEntry * e) { pthread_mutex_lock(&log_write_mutex); - int ret = writeLogEntryUnlocked(e); + int ret = writeLogEntryUnlocked(log, e); pthread_mutex_unlock(&log_write_mutex); return ret; } -long sizeofInternalLogEntry_LogWriter(const LogEntry * e) { +static lsn_t sizeofInternalLogEntry_LogWriter(stasis_log_t * log, + const LogEntry * e) { return sizeof(struct __raw_log_entry); } @@ -390,7 +434,7 @@ static void syncLogInternal() { newFlushedLSN = nextAvailableLSN; if(newFlushedLSN > flushedLSN_internal) { pthread_mutex_unlock(&nextAvailableLSN_mutex); - fflush(log); + fflush(logFILE); writelock(flushedLSN_lock, 0); } if(newFlushedLSN > flushedLSN_internal) { @@ -400,7 +444,7 @@ static void syncLogInternal() { } -void syncLog_LogWriter() { +static void syncLog_LogWriter(stasis_log_t * log) { lsn_t newFlushedLSN; pthread_mutex_lock(&log_write_mutex); @@ -410,14 +454,14 @@ void syncLog_LogWriter() { pthread_mutex_unlock(&nextAvailableLSN_mutex); LogEntry * crc_entry = allocCommonLogEntry(log_crc, -1, INTERNALLOG); - writeLogEntryUnlocked(crc_entry); + writeLogEntryUnlocked(log, crc_entry); free(crc_entry); // Reset log_crc to zero each time a crc entry is written. log_crc = 0; pthread_mutex_unlock(&log_write_mutex); // Since we opened the logfile with O_SYNC, fflush() is sufficient. - fflush(log); + fflush(logFILE); // update flushedLSN after fflush returns. writelock(flushedLSN_lock, 0); @@ -431,7 +475,7 @@ void syncLog_LogWriter() { writeunlock(flushedLSN_lock); } -lsn_t flushedLSN_LogWriter() { +static lsn_t flushedLSN_LogWriter(stasis_log_t* log) { readlock(flushedLSN_lock, 0); lsn_t ret = flushedLSN_stable; readunlock(flushedLSN_lock); @@ -444,20 +488,20 @@ static lsn_t flushedLSNInternal() { return ret; } -void closeLogWriter() { +static int close_LogWriter(stasis_log_t* log) { /* Get the whole thing to the disk before closing it. */ - syncLog_LogWriter(); + syncLog_LogWriter(log); - fclose(log); + fclose(logFILE); close(roLogFD); - log = NULL; + logFILE = NULL; roLogFD = 0; flushedLSN_stable = 0; flushedLSN_internal = 0; nextAvailableLSN = 0; global_offset = 0; - + /* Free locks. */ deletelock(flushedLSN_lock); @@ -465,15 +509,17 @@ void closeLogWriter() { pthread_mutex_destroy(&log_write_mutex); pthread_mutex_destroy(&nextAvailableLSN_mutex); pthread_mutex_destroy(&truncateLog_mutex); - free(buffer); + free(buffer); buffer = 0; log_crc = 0; + free(log); + return 0; } void deleteLogWriter() { remove(LOG_FILE); } -lsn_t debug_lsn = -1; + static LogEntry * readLogEntry() { LogEntry * ret = 0; lsn_t size; @@ -549,7 +595,7 @@ static LogEntry * readLogEntry() { } //static lsn_t lastPosition_readLSNEntry = -1; -LogEntry * readLSNEntry_LogWriter(const lsn_t LSN) { +const LogEntry * readLSNEntry_LogWriter(stasis_log_t * log, const lsn_t LSN) { LogEntry * ret; pthread_mutex_lock(&nextAvailableLSN_mutex); @@ -591,12 +637,36 @@ LogEntry * readLSNEntry_LogWriter(const lsn_t LSN) { return ret; } +/** + Truncates the log file. In the single-threaded case, this works as + follows: -int truncateLog_LogWriter(lsn_t LSN) { + First, the LSN passed to this function, minus sizeof(lsn_t) is + written to a new file, called logfile.txt~. (If logfile.txt~ + already exists, then it is truncated.) + + Next, the contents of the log, starting with the LSN passed into + this function are copied to logfile.txt~ + + Finally, logfile.txt~ is moved on top of logfile.txt + + As long as the move system call is atomic, this function should + maintain the system's durability. + + The multithreaded case is a bit more complicated, as we need + to deal with latching: + + With no lock, copy the log. Upon completion, if the log has grown, + then copy the part that remains. Next, obtain a read/write latch + on the logfile, and copy any remaining portions of the log. + Perform the move, and release the latch. + +*/ +int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { FILE *tmpLog; const LogEntry * le; - LogHandle lh; + LogHandle* lh; lsn_t size; @@ -634,23 +704,23 @@ int truncateLog_LogWriter(lsn_t LSN) { */ pthread_mutex_lock(&log_write_mutex); - fflush(log); + fflush(logFILE); - lh = getLSNHandle(LSN); + lh = getLSNHandle(log, LSN); lsn_t lengthOfCopiedLog = 0; int firstInternalEntry = 1; lsn_t nextLSN = 0; - while((le = nextInLog(&lh))) { + while((le = nextInLog(lh))) { size = sizeofLogEntry(le); if(nextLSN) { assert(nextLSN == le->LSN); } - nextLSN = nextEntry_LogWriter(le); + nextLSN = nextEntry_LogWriter(log, le); if(firstInternalEntry && le->type == INTERNALLOG) { LogEntry * firstCRC = malloc(size); memcpy(firstCRC, le, size); - FreeLogEntry(le); + freeLogEntry(le); firstCRC->prevLSN = 0; le = firstCRC; } @@ -663,9 +733,10 @@ int truncateLog_LogWriter(lsn_t LSN) { free((void*)le); // remove const qualifier + free firstInternalEntry = 0; } else { - FreeLogEntry(le); + freeLogEntry(le); } - } + } + freeLogHandle(lh); LogEntry * crc_entry = allocCommonLogEntry(0, -1, INTERNALLOG); assert(crc_entry->prevLSN == 0); @@ -677,7 +748,7 @@ int truncateLog_LogWriter(lsn_t LSN) { size = sizeofLogEntry(crc_entry); - nextAvailableLSN = nextEntry_LogWriter(crc_entry); + nextAvailableLSN = nextEntry_LogWriter(log, crc_entry); log_crc = 0; @@ -705,15 +776,15 @@ int truncateLog_LogWriter(lsn_t LSN) { /* closeLogWriter calls sync, and does some extra stuff that we don't want, so we basicly re-implement closeLogWriter and openLogWriter here... */ - fclose(log); + fclose(logFILE); close(roLogFD); - fclose(tmpLog); - + fclose(tmpLog); + if(rename(LOG_FILE_SCRATCH, LOG_FILE)) { pthread_mutex_unlock(&log_read_mutex); pthread_mutex_unlock(&log_write_mutex); pthread_mutex_unlock(&truncateLog_mutex); - + perror("Error replacing old log file with new log file"); return LLADD_IO_ERROR; } else { @@ -726,9 +797,9 @@ int truncateLog_LogWriter(lsn_t LSN) { perror("Couldn't open log file for append.\n"); abort(); } - log = fdopen(logFD, "w+"); + logFILE = fdopen(logFD, "w+"); - if (log==NULL) { + if (logFILE==NULL) { pthread_mutex_unlock(&log_read_mutex); pthread_mutex_unlock(&log_write_mutex); pthread_mutex_unlock(&truncateLog_mutex); @@ -738,12 +809,12 @@ int truncateLog_LogWriter(lsn_t LSN) { return LLADD_IO_ERROR; } - setbuffer(log, buffer, BUFSIZE); + setbuffer(logFILE, buffer, BUFSIZE); global_offset = LSN - sizeof(lsn_t); lsn_t logPos; - if((logPos = myFseek(log, 0, SEEK_END)) != nextAvailableLSN - global_offset) { + if((logPos = myFseek(logFILE, 0, SEEK_END)) != nextAvailableLSN - global_offset) { if(logPos == -1) { perror("Truncation couldn't seek"); } else { @@ -761,8 +832,6 @@ int truncateLog_LogWriter(lsn_t LSN) { return LLADD_IO_ERROR; } - - pthread_mutex_unlock(&log_read_mutex); pthread_mutex_unlock(&log_write_mutex); pthread_mutex_unlock(&truncateLog_mutex); @@ -771,8 +840,8 @@ int truncateLog_LogWriter(lsn_t LSN) { } -lsn_t firstLogEntry() { - assert(log); +lsn_t firstLogEntry_LogWriter(stasis_log_t* log) { + assert(logFILE); pthread_mutex_lock(&log_read_mutex); // for global offset... lsn_t ret = global_offset + sizeof(lsn_t); pthread_mutex_unlock(&log_read_mutex); diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index e5d0c19..2ec716c 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -61,147 +61,37 @@ terms specified in this license. #include #include +/** + @todo loggerType should go away. + */ #ifdef USE_LOGGER int loggerType = USE_LOGGER; #else int loggerType = LOG_TO_FILE; #endif +/** + @todo stasis_log_file should be in transactional2.c, and not global + */ +stasis_log_t* stasis_log_file = 0; + static int pendingCommits; -static int syncLogCount; -long LoggerSizeOfInternalLogEntry(const LogEntry * e) { - if(loggerType == LOG_TO_FILE) { - return sizeofInternalLogEntry_LogWriter(e); - } else if (loggerType == LOG_TO_MEMORY) { - return sizeofInternalLogEntry_InMemoryLog(e); - } else { - // we dont have an appropriate implementation, or weren't initialized... - abort(); - } -} - -void LogWrite(LogEntry * e) { - if(loggerType == LOG_TO_FILE) { - writeLogEntry(e); - } else if (loggerType == LOG_TO_MEMORY) { - writeLogEntry_InMemoryLog(e); - } else { - abort(); - } - return; -} - -int LogInit(int logType) { - - loggerType = logType; - - pendingCommits = 0; - syncLogCount = 0; - if(LOG_TO_FILE == logType) { - openLogWriter(); - } else if(LOG_TO_MEMORY == logType) { - open_InMemoryLog(); - } else { - return -1; - } - return 0; -} - -int LogDeinit() { - if(LOG_TO_FILE == loggerType) { - closeLogWriter(); - } else if(LOG_TO_MEMORY == loggerType) { - close_InMemoryLog(); - } else { - abort(); - } - return 0; -} - -void LogTruncate(lsn_t lsn) { - if(LOG_TO_FILE == loggerType) { - truncateLog_LogWriter(lsn); - } else if(LOG_TO_MEMORY == loggerType) { - truncateLog_InMemoryLog(lsn); - } else { - abort(); - } -} - -lsn_t LogFlushedLSN() { - lsn_t ret; - if(LOG_TO_FILE == loggerType) { - ret = flushedLSN_LogWriter(); - } else if(LOG_TO_MEMORY == loggerType) { - ret = flushedLSN_InMemoryLog(); - } else { - abort(); - } - return ret; -} - -lsn_t LogTruncationPoint() { - lsn_t ret; - if(LOG_TO_FILE == loggerType) { - ret = firstLogEntry(); - } else if(LOG_TO_MEMORY == loggerType) { - ret = firstLogEntry_InMemoryLog(); - } else { - abort(); - } - return ret; -} -const LogEntry * LogReadLSN(lsn_t lsn) { - LogEntry * ret; - if(LOG_TO_FILE == loggerType) { - ret = readLSNEntry_LogWriter(lsn); - } else if(LOG_TO_MEMORY == loggerType) { - ret = readLSNEntry_InMemoryLog(lsn); - } else { - abort(); - } - return ret; -} - -lsn_t LogNextEntry(const LogEntry * e) { - lsn_t ret; - if(LOG_TO_FILE == loggerType) { - ret = nextEntry_LogWriter(e); - } else if(LOG_TO_MEMORY == loggerType) { - ret = nextEntry_InMemoryLog(e); - } else { - abort(); - } - return ret; -} - -void FreeLogEntry(const LogEntry * e) { - if(LOG_TO_FILE == loggerType) { - free((void*)e); - } else if(LOG_TO_MEMORY == loggerType) { - free((void*)e); - } else { - abort(); - } - -} - -TransactionLog LogTransBegin(int xid) { +TransactionLog LogTransBegin(stasis_log_t* log, int xid) { TransactionLog tl; tl.xid = xid; - + DEBUG("Log Begin %d\n", xid); tl.prevLSN = -1; tl.recLSN = -1; return tl; } -static lsn_t LogTransCommon(TransactionLog * l, int type) { +static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) { LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type); lsn_t ret; - LogWrite(e); + log->write_entry(log, e); if(l->prevLSN == -1) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; @@ -210,40 +100,101 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) { ret = e->LSN; - FreeLogEntry(e); + freeLogEntry(e); return ret; } -static lsn_t LogTransCommonPrepare(TransactionLog * l) { +static lsn_t LogTransCommonPrepare(stasis_log_t* log, TransactionLog * l) { LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN); lsn_t ret; - DEBUG("Log prepare xid = %d prevlsn = %lld reclsn = %lld, %lld\n",e->xid,e->prevLSN,l->recLSN, getPrepareRecLSN(e)); - LogWrite(e); + DEBUG("Log prepare xid = %d prevlsn = %lld reclsn = %lld, %lld\n", + e->xid, e->prevLSN, l->recLSN, getPrepareRecLSN(e)); + log->write_entry(log, e); if(l->prevLSN == -1) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; - DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, - (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); + DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n", + e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); ret = e->LSN; - FreeLogEntry(e); + freeLogEntry(e); return ret; } -static void groupForce(lsn_t l) { +LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l, + Page * p, unsigned int op, + const byte * arg, size_t arg_size) { + + LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op, + p ? p->id : INVALID_PAGE, + arg, arg_size); + + log->write_entry(log, e); + DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid, + (long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size); + + if(l->prevLSN == -1) { l->recLSN = e->LSN; } + l->prevLSN = e->LSN; + return e; +} + +lsn_t LogCLR(stasis_log_t* log, const LogEntry * old_e) { + LogEntry * e = allocCLRLogEntry(old_e); + log->write_entry(log, e); + + DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid, + e->LSN, LSN, prevLSN); + lsn_t ret = e->LSN; + freeLogEntry(e); + + return ret; +} + +lsn_t LogDummyCLR(stasis_log_t* log, int xid, lsn_t prevLSN, + lsn_t compensatedLSN) { + LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP, + INVALID_PAGE, NULL, 0); + e->LSN = compensatedLSN; + lsn_t ret = LogCLR(log, e); + freeLogEntry(e); + return ret; +} + +static void groupForce(stasis_log_t* log, lsn_t lsn); + +lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l) { + lsn_t lsn = LogTransCommon(log, l, XCOMMIT); + groupForce(log, lsn); + return lsn; +} + +lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) { + return LogTransCommon(log, l, XABORT); +} +lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) { + lsn_t lsn = LogTransCommonPrepare(log, l); + groupForce(log, lsn); + return lsn; +} + +void LogForce(stasis_log_t* log, lsn_t lsn) { + groupForce(log, lsn); +} + +static void groupForce(stasis_log_t* log, lsn_t lsn) { static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER; struct timeval now; struct timespec timeout; - + pthread_mutex_lock(&check_commit); - if(LogFlushedLSN() >= l) { + if(log->first_unstable_lsn(log) >= lsn) { pthread_mutex_unlock(&check_commit); return; } @@ -264,92 +215,25 @@ static void groupForce(lsn_t l) { int retcode; while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) { if(retcode != 0) { - printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by a signal in groupCommit(). Acting as though it timed out.\n", __FILE__, __LINE__); + printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by " + "a signal in groupCommit(). Acting as though it timed out.\n", + __FILE__, __LINE__); break; } - if(LogFlushedLSN() >= l) { + if(log->first_unstable_lsn(log) >= lsn) { pendingCommits--; pthread_mutex_unlock(&check_commit); return; } } - } - if(LogFlushedLSN() < l) { - if(LOG_TO_FILE == loggerType) { - syncLog_LogWriter(); - } else if (LOG_TO_MEMORY == loggerType) { - syncLog_InMemoryLog(); - } else { - abort(); - } - assert(LogFlushedLSN() >= lsn); - syncLogCount++; + } + if(log->first_unstable_lsn(log) < lsn) { + log->force_tail(log); + assert(log->first_unstable_lsn(log) >= lsn); pthread_cond_broadcast(&tooFewXacts); } - assert(LogFlushedLSN() >= l); + assert(log->first_unstable_lsn(log) >= lsn); pendingCommits--; pthread_mutex_unlock(&check_commit); return; } - -static lsn_t groupCommit(TransactionLog * l) { - lsn_t ret = LogTransCommon(l, XCOMMIT); - groupForce(ret); - return ret; -} -static lsn_t groupPrepare(TransactionLog * l) { - lsn_t ret = LogTransCommonPrepare(l); - groupForce(ret); - return ret; -} - -void LogForce(lsn_t lsn) { - groupForce(lsn); -} -lsn_t LogTransCommit(TransactionLog * l) { - return groupCommit(l); -} - -lsn_t LogTransAbort(TransactionLog * l) { - return LogTransCommon(l, XABORT); -} -lsn_t LogTransPrepare(TransactionLog * l) { - return groupPrepare(l); -} - -LogEntry * LogUpdate(TransactionLog * l, Page * p, unsigned int op, - const byte * arg, size_t arg_size) { - - LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op, - p ? p->id : INVALID_PAGE, - arg, arg_size); - - LogWrite(e); - DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid, - (long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size); - - if(l->prevLSN == -1) { l->recLSN = e->LSN; } - l->prevLSN = e->LSN; - return e; -} - -lsn_t LogCLR(const LogEntry * old_e) { - LogEntry * e = allocCLRLogEntry(old_e); - LogWrite(e); - - DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid, - e->LSN, LSN, prevLSN); - lsn_t ret = e->LSN; - FreeLogEntry(e); - - return ret; -} - -lsn_t LogDummyCLR(int xid, lsn_t prevLSN, lsn_t compensatedLSN) { - LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP, - INVALID_PAGE, NULL, 0); - e->LSN = compensatedLSN; - lsn_t ret = LogCLR(e); - FreeLogEntry(e); - return ret; -} diff --git a/src/stasis/operations/prepare.c b/src/stasis/operations/prepare.c index 30c1388..566f645 100644 --- a/src/stasis/operations/prepare.c +++ b/src/stasis/operations/prepare.c @@ -55,7 +55,7 @@ terms specified in this license. recordid prepare_bogus_rec = { 0, 0, 0}; static int op_prepare(const LogEntry * e, Page * p) { - LogForce(e->LSN); + LogForce(stasis_log_file, e->LSN); return 0; } diff --git a/src/stasis/operations/regions.c b/src/stasis/operations/regions.c index 6689d6c..766992e 100644 --- a/src/stasis/operations/regions.c +++ b/src/stasis/operations/regions.c @@ -160,7 +160,7 @@ void regionsInit() { writelock(p->rwlatch,0); op_alloc_boundary_tag(e,p); unlock(p->rwlatch); - FreeLogEntry(e); + freeLogEntry(e); } holding_mutex = 0; releasePage(p); diff --git a/src/stasis/page/raw.c b/src/stasis/page/raw.c index fb87d20..dd610b6 100644 --- a/src/stasis/page/raw.c +++ b/src/stasis/page/raw.c @@ -8,8 +8,8 @@ XXX rawPageInferMetadata is wrong; setting lsn to LogFlushedLSN() breaks recovery. */ -void rawPageInferMetadata(Page * p) { - p->LSN = LogFlushedLSN(); +void rawPageInferMetadata(Page * p) { + p->LSN = stasis_log_file->first_unstable_lsn(stasis_log_file); } byte* rawPageGetData(int xid, Page * p) { diff --git a/src/stasis/pageHandle.c b/src/stasis/pageHandle.c index 34d9a23..0b2b96a 100644 --- a/src/stasis/pageHandle.c +++ b/src/stasis/pageHandle.c @@ -30,7 +30,7 @@ static void phWrite(Page * ret) { // or we'll deadlock. writelock(ret->rwlatch,0); stasis_page_flushed(ret); - LogForce(ret->LSN); + LogForce(stasis_log_file, ret->LSN); int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); if(err) { printf("Couldn't write to page file: %s\n", strerror(err)); diff --git a/src/stasis/recovery2.c b/src/stasis/recovery2.c index a7aec87..9167731 100644 --- a/src/stasis/recovery2.c +++ b/src/stasis/recovery2.c @@ -49,20 +49,20 @@ static pthread_mutex_t rollback_mutex = PTHREAD_MUTEX_INITIALIZER; no longer reads the pages in, there's no longer any reason to build the list of dirty pages. */ -static void Analysis() { +static void Analysis(stasis_log_t* log) { DEBUG("Recovery: Analysis\n"); const LogEntry * e; - LogHandle lh = getLogHandle(); + LogHandle* lh = getLogHandle(log); /** After recovery, we need to know what the highest XID in the log was so that we don't accidentally reuse XID's. This keeps track of that value. */ int highestXid = 0; - while((e = nextInLog(&lh))) { + while((e = nextInLog(lh))) { lsn_t * xactLSN = (lsn_t*)pblHtLookup(transactionLSN, &(e->xid), sizeof(int)); @@ -148,8 +148,9 @@ static void Analysis() { default: abort(); } - FreeLogEntry(e); + freeLogEntry(e); } + freeLogHandle(lh); stasis_transaction_table_max_transaction_id_set(highestXid); } @@ -170,13 +171,13 @@ static void Analysis() { Y (NTA replaces physical undo) */ -static void Redo() { - LogHandle lh = getLogHandle(); +static void Redo(stasis_log_t* log) { + LogHandle* lh = getLogHandle(log); const LogEntry * e; DEBUG("Recovery: Redo\n"); - while((e = nextInLog(&lh))) { + while((e = nextInLog(lh))) { // Is this log entry part of a transaction that needs to be redone? if(pblHtLookup(transactionLSN, &(e->xid), sizeof(int)) != NULL) { if(e->type != INTERNALLOG) { @@ -194,7 +195,9 @@ static void Redo() { } break; case CLRLOG: { - const LogEntry *ce = LogReadLSN(((CLRLogEntry*)e)->clr.compensated_lsn); + const LogEntry *ce = + log->read_entry(log,((CLRLogEntry*)e)->clr.compensated_lsn); + if(ce->update.page == INVALID_PAGE) { // logical redo of end of NTA; no-op } else { @@ -207,7 +210,7 @@ static void Redo() { unlock(p->rwlatch); releasePage(p); } - FreeLogEntry(ce); + freeLogEntry(ce); } break; case XCOMMIT: { @@ -233,11 +236,13 @@ static void Redo() { abort(); } } - FreeLogEntry(e); + freeLogEntry(e); } + freeLogHandle(lh); + } -static void Undo(int recovery) { - LogHandle lh; +static void Undo(stasis_log_t* log, int recovery) { + LogHandle* lh; DEBUG("Recovery: Undo\n"); @@ -247,7 +252,7 @@ static void Undo(int recovery) { DEBUG("Undoing LSN %ld\n", (long int)rollback); - lh = getLSNHandle(rollback); + lh = getLSNHandle(log, rollback); int thisXid = -1; @@ -255,7 +260,7 @@ static void Undo(int recovery) { int reallyAborted = 0; // Have we reached a XPREPARE that we should pay attention to? int prepared = 0; - while((!prepared) && (e = previousInTransaction(&lh))) { + while((!prepared) && (e = previousInTransaction(lh))) { thisXid = e->xid; switch(e->type) { case UPDATELOG: @@ -278,7 +283,7 @@ static void Undo(int recovery) { } // Log a CLR for this entry - lsn_t clr_lsn = LogCLR(e); + lsn_t clr_lsn = LogCLR(log, e); DEBUG("logged clr\n"); stasis_transaction_table_roll_forward(e->xid, e->LSN, e->prevLSN); @@ -296,7 +301,8 @@ static void Undo(int recovery) { } case CLRLOG: { - const LogEntry * ce = LogReadLSN(((CLRLogEntry*)e)->clr.compensated_lsn); + const LogEntry * ce + = log->read_entry(log, ((CLRLogEntry*)e)->clr.compensated_lsn); if(ce->update.page == INVALID_PAGE) { DEBUG("logical clr\n"); undoUpdate(ce, 0, 0); // logical undo; effective LSN doesn't matter @@ -304,7 +310,7 @@ static void Undo(int recovery) { DEBUG("physical clr: op %d lsn %lld\n", ce->update.funcID, ce->LSN); // no-op. Already undone during redo. This would redo the original op. } - FreeLogEntry(ce); + freeLogEntry(ce); } break; case XABORT: @@ -335,7 +341,7 @@ static void Undo(int recovery) { e->type, e->xid, e->LSN); abort(); } - FreeLogEntry(e); + freeLogEntry(e); } if(!prepared) { if(recovery) { @@ -345,18 +351,19 @@ static void Undo(int recovery) { globalLockManager.abort(thisXid); } } + freeLogHandle(lh); } } -void InitiateRecovery() { +void stasis_recovery_initiate(stasis_log_t* log) { transactionLSN = pblHtCreate(); DEBUG("Analysis started\n"); - Analysis(); + Analysis(log); DEBUG("Redo started\n"); - Redo(); + Redo(log); DEBUG("Undo started\n"); TallocPostInit(); - Undo(1); + Undo(log,1); DEBUG("Recovery complete.\n"); for(void * it = pblHtFirst(transactionLSN); it; it = pblHtNext(transactionLSN)) { @@ -369,7 +376,7 @@ void InitiateRecovery() { } -void undoTrans(TransactionLog transaction) { +void undoTrans(stasis_log_t* log, TransactionLog transaction) { pthread_mutex_lock(&rollback_mutex); assert(!rollbackLSNs); @@ -381,7 +388,7 @@ void undoTrans(TransactionLog transaction) { /* Nothing to undo. (Happens for read-only xacts.) */ } - Undo(0); + Undo(log, 0); if(rollbackLSNs) { destroyList(&rollbackLSNs); } diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 56c7b3a..f4090cc 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -18,6 +18,9 @@ #include +#include +#include + #include #include #include // XXX remove this, move Tread() to set.c @@ -137,7 +140,13 @@ int Tinit() { setupOperationsTable(); dirtyPagesInit(); - LogInit(loggerType); + if(LOG_TO_FILE == loggerType) { + stasis_log_file = openLogWriter(); + } else if(LOG_TO_MEMORY == loggerType) { + stasis_log_file = open_InMemoryLog(); + } else { + assert(stasis_log_file != NULL); + } stasis_page_init(); #ifndef HAVE_O_DIRECT @@ -223,12 +232,13 @@ int Tinit() { consumer_init(); setupLockManagerCallbacksNil(); //setupLockManagerCallbacksPage(); - - InitiateRecovery(); - - truncationInit(); + + stasis_recovery_initiate(stasis_log_file); + + stasis_truncation_init(); if(stasis_truncation_automatic) { - autoTruncate(); // should this be before InitiateRecovery? + // should this be before InitiateRecovery? + stasis_truncation_thread_start(stasis_log_file); } return 0; } @@ -265,7 +275,7 @@ int Tbegin() { pthread_mutex_unlock(&transactional_2_mutex); - XactionTable[index] = LogTransBegin(xidCount_tmp); + XactionTable[index] = LogTransBegin(stasis_log_file, xidCount_tmp); if(globalLockManager.begin) { globalLockManager.begin(XactionTable[index].xid); } @@ -285,11 +295,12 @@ static compensated_function void TactionHelper(int xid, writelock(p->rwlatch,0); - e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, op, dat, datlen); + e = LogUpdate(stasis_log_file, &XactionTable[xid % MAX_TRANSACTIONS], + p, op, dat, datlen); assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); doUpdate(e, p); - FreeLogEntry(e); + freeLogEntry(e); unlock(p->rwlatch); } @@ -358,7 +369,7 @@ int Tcommit(int xid) { pthread_mutex_unlock(&transactional_2_mutex); #endif - lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]); + lsn = LogTransCommit(stasis_log_file, &XactionTable[xid % MAX_TRANSACTIONS]); if(globalLockManager.commit) { globalLockManager.commit(xid); } allocTransactionCommit(xid); @@ -378,7 +389,7 @@ int Tprepare(int xid) { assert(xid >= 0); off_t i = xid % MAX_TRANSACTIONS; assert(XactionTable[i].xid == xid); - LogTransPrepare(&XactionTable[i]); + LogTransPrepare(stasis_log_file, &XactionTable[i]); return 0; } @@ -389,10 +400,10 @@ int Tabort(int xid) { TransactionLog * t =&XactionTable[xid%MAX_TRANSACTIONS]; assert(t->xid == xid); - lsn = LogTransAbort(t); + lsn = LogTransAbort(stasis_log_file, t); /** @todo is the order of the next two calls important? */ - undoTrans(*t); + undoTrans(stasis_log_file, *t); if(globalLockManager.abort) { globalLockManager.abort(xid); } allocTransactionAbort(xid); @@ -419,7 +430,7 @@ int Tdeinit() { } } assert( numActiveXactions == 0 ); - truncationDeinit(); + stasis_truncation_deinit(); TnaiveHashDeinit(); TallocDeinit(); bufDeinit(); @@ -431,7 +442,7 @@ int Tdeinit() { slow_close = 0; } stasis_page_deinit(); - LogDeinit(); + stasis_log_file->deinit(stasis_log_file); dirtyPagesDeinit(); initted = 0; @@ -443,7 +454,7 @@ int TuncleanShutdown() { // We're simulating a crash; don't complain when writes get lost, // and active transactions get rolled back. stasis_suppress_unclean_shutdown_warnings = 1; - truncationDeinit(); + stasis_truncation_deinit(); TnaiveHashDeinit(); simulateBufferManagerCrash(); if(slow_pfile) { @@ -452,7 +463,7 @@ int TuncleanShutdown() { slow_close = 0; } stasis_page_deinit(); - LogDeinit(); + stasis_log_file->deinit(stasis_log_file); numActiveXactions = 0; dirtyPagesDeinit(); @@ -577,14 +588,16 @@ typedef struct { */ void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) { assert(xid >= 0); - LogEntry * e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], NULL, op, dat, datSize); + LogEntry * e = LogUpdate(stasis_log_file, + &XactionTable[xid % MAX_TRANSACTIONS], + NULL, op, dat, datSize); DEBUG("Begin Nested Top Action e->LSN: %ld\n", e->LSN); stasis_nta_handle * h = malloc(sizeof(stasis_nta_handle)); h->prev_lsn = e->prevLSN; h->compensated_lsn = e->LSN; - FreeLogEntry(e); + freeLogEntry(e); return h; } @@ -597,7 +610,8 @@ lsn_t TendNestedTopAction(int xid, void * handle) { assert(xid >= 0); // Write a CLR. - lsn_t clrLSN = LogDummyCLR(xid, h->prev_lsn, h->compensated_lsn); + lsn_t clrLSN = LogDummyCLR(stasis_log_file, xid, + h->prev_lsn, h->compensated_lsn); // Ensure that the next action in this transaction points to the CLR. XactionTable[xid % MAX_TRANSACTIONS].prevLSN = clrLSN; diff --git a/src/stasis/truncation.c b/src/stasis/truncation.c index e4e786b..45c7811 100644 --- a/src/stasis/truncation.c +++ b/src/stasis/truncation.c @@ -153,11 +153,11 @@ void dirtyPagesDeinit() { pblHtDelete(dirtyPages); dirtyPages = 0; } -void truncationInit() { +void stasis_truncation_init() { initialized = 1; } -void truncationDeinit() { +void stasis_truncation_deinit() { pthread_mutex_lock(&shutdown_mutex); initialized = 0; if(automaticallyTuncating) { @@ -171,76 +171,76 @@ void truncationDeinit() { automaticallyTuncating = 0; } -static void* periodicTruncation(void * ignored) { +static void* stasis_truncation_thread_worker(void* logp) { + stasis_log_t * log = logp; pthread_mutex_lock(&shutdown_mutex); - while(initialized) { - if(LogFlushedLSN() - LogTruncationPoint() > TARGET_LOG_SIZE) { - truncateNow(0); + while(initialized) { + if(log->first_unstable_lsn(log) - log->truncation_point(log) + > TARGET_LOG_SIZE) { + stasis_truncation_truncate(log, 0); } struct timeval now; struct timespec timeout; int timeret = gettimeofday(&now, 0); assert(0 == timeret); - + timeout.tv_sec = now.tv_sec; timeout.tv_nsec = now.tv_usec; timeout.tv_sec += TRUNCATE_INTERVAL; - + pthread_cond_timedwait(&shutdown_cond, &shutdown_mutex, &timeout); } pthread_mutex_unlock(&shutdown_mutex); return (void*)0; } -void autoTruncate() { +void stasis_truncation_thread_start(stasis_log_t* log) { assert(!automaticallyTuncating); automaticallyTuncating = 1; - pthread_create(&truncationThread, 0, &periodicTruncation, 0); + pthread_create(&truncationThread, 0, &stasis_truncation_thread_worker, log); } -int truncateNow(int force) { - +int stasis_truncation_truncate(stasis_log_t* log, int force) { // *_minRecLSN() used to return the same value as flushed if //there were no outstanding transactions, but flushed might //not point to the front of a log entry... now, both return //LSN_T_MAX if there are no outstanding transactions / no //dirty pages. - + lsn_t page_rec_lsn = dirtyPages_minRecLSN(); lsn_t xact_rec_lsn = transactions_minRecLSN(); - lsn_t flushed_lsn = LogFlushedLSN(); + lsn_t flushed_lsn = log->first_unstable_lsn(log); lsn_t rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn; rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn; - lsn_t log_trunc = LogTruncationPoint(); - if(force || (xact_rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) { + lsn_t log_trunc = log->truncation_point(log); + if(force || (xact_rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) { //fprintf(stderr, "xact = %ld \t log = %ld\n", xact_rec_lsn, log_trunc); - if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) { + if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) { // fprintf(stderr, "Truncating now. rec_lsn = %ld, log_trunc = %ld\n", rec_lsn, log_trunc); // fprintf(stderr, "Truncating to rec_lsn = %ld\n", rec_lsn); forcePages(); - LogTruncate(rec_lsn); + log->truncate(log, rec_lsn); return 1; - } else { - lsn_t flushed = LogFlushedLSN(); - if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) { + } else { + lsn_t flushed = log->first_unstable_lsn(log); + if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) { //fprintf(stderr, "Flushing dirty buffers: rec_lsn = %ld log_trunc = %ld flushed = %ld\n", rec_lsn, log_trunc, flushed); dirtyPages_flush(); - + page_rec_lsn = dirtyPages_minRecLSN(); rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn; rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn; - + //fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn); forcePages(); - LogTruncate(rec_lsn); + log->truncate(log, rec_lsn); return 1; - - } else { + } else { return 0; } } diff --git a/stasis/logger/inMemoryLog.h b/stasis/logger/inMemoryLog.h index 3534dc2..d8f3115 100644 --- a/stasis/logger/inMemoryLog.h +++ b/stasis/logger/inMemoryLog.h @@ -1,16 +1,8 @@ +#ifndef __INMEMORYLOG +#define __INMEMORYLOG + #include -#ifndef __INMEMORYLOG -#define __INMEMORYLOG 1 +stasis_log_t* open_InMemoryLog(); -int open_InMemoryLog(); -int writeLogEntry_InMemoryLog(LogEntry * e); -lsn_t flushedLSN_InMemoryLog(); -void syncLog_InMemoryLog(); -int truncateLog_InMemoryLog(lsn_t lsn); -lsn_t firstLogEntry_InMemoryLog(); -void close_InMemoryLog(); -long sizeofInternalLogEntry_InMemoryLog(const LogEntry * e); -LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn); -lsn_t nextEntry_InMemoryLog(const LogEntry * e); #endif diff --git a/stasis/logger/logEntry.h b/stasis/logger/logEntry.h index d09d4bd..17c75ba 100644 --- a/stasis/logger/logEntry.h +++ b/stasis/logger/logEntry.h @@ -123,10 +123,14 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, @return a LogEntry that should be freed with free(). */ LogEntry * allocCLRLogEntry(const LogEntry * e); -/** +/** + @param e a log entry returned from one of the alloc???LogEntry functions. + */ +void freeLogEntry(const LogEntry * e); +/** @return the length, in bytes, of e. */ -long sizeofLogEntry(const LogEntry * e); +lsn_t sizeofLogEntry(const LogEntry * e); /** @return the operation's arguments. */ diff --git a/stasis/logger/logHandle.h b/stasis/logger/logHandle.h index 672a992..873a301 100644 --- a/stasis/logger/logHandle.h +++ b/stasis/logger/logHandle.h @@ -47,6 +47,8 @@ terms specified in this license. BEGIN_C_DECLS +typedef struct LogHandle LogHandle; + /** @file A simple data structure that allows forward iterations over @@ -65,13 +67,16 @@ BEGIN_C_DECLS */ /** Returns a logHandle pointing at the first log entry in the log. */ -LogHandle getLogHandle(); +LogHandle* getLogHandle(stasis_log_t* log); /** Returns a logHandle pointing at lsn. */ -LogHandle getLSNHandle(lsn_t lsn); +LogHandle* getLSNHandle(stasis_log_t* log, lsn_t lsn); /** Returns a 'guarded log handle'. This handle executes a callback function on each entry it encounters. If the guard returns 0, then it's iterator terminates. Otherwise, it behaves normally. */ -LogHandle getGuardedHandle(lsn_t lsn, guard_fcn_t * f, void * guard_state); +LogHandle* getGuardedHandle(stasis_log_t* log, lsn_t lsn, + guard_fcn_t * f, void * guard_state); + +void freeLogHandle(LogHandle* lh); /** @return a pointer to the next log entry in the log, or NULL if at diff --git a/stasis/logger/logWriter.h b/stasis/logger/logWriter.h index 92d39fe..3fe5d1f 100644 --- a/stasis/logger/logWriter.h +++ b/stasis/logger/logWriter.h @@ -71,107 +71,29 @@ terms specified in this license. #define __LOGWRITER_H__ #include +#include #include BEGIN_C_DECLS /** - start a new log stream by opening the log file for reading - - returns 0 on success, or an error code define above + Start a new log stream by opening the log file for reading. This + is stasis's default log implementation, and uses safe writes to + perform truncation. + @return NULL on error */ -int openLogWriter(); - -/** - - @param e Pointer to a log entry. After the call, e->LSN will be set appropriately. If e's xid is set to -1, then this call has no effect (and e's LSN will be set to -1.) - - returns 0 on success, or an error code defined above -*/ -int writeLogEntry(LogEntry * e); - -/** - flush the entire log (tail) that is currently in memory to disk -*/ -void syncLog_LogWriter(); - -/** - Return the highest LSN that is known to be on disk. (Currently, we - only know if an LSN is on disk if we've written that LSN before a - call to syncLog(). - - Note: This function might not return an LSN corresponding to a real - log entry, but it will definitely return one that is greater than - or equal to the LSN of a log entry that has been forced to disk, - and is less than the LSN of all log entries that might not have - been forced to disk. -*/ -lsn_t flushedLSN_LogWriter(); - -/** - Truncates the log file. In the single-threaded case, this works as - follows: - - First, the LSN passed to this function, minus sizeof(lsn_t) is - written to a new file, called logfile.txt~. (If logfile.txt~ - already exists, then it is truncated.) - - Next, the contents of the log, starting with the LSN passed into - this function are copied to logfile.txt~ - - Finally, logfile.txt~ is moved on top of logfile.txt - - As long as the move system call is atomic, this function should - maintain the system's durability. - - The multithreaded case is a bit more complicated, as we need - to deal with latching: - - With no lock, copy the log. Upon completion, if the log has grown, - then copy the part that remains. Next, obtain a read/write latch - on the logfile, and copy any remaining portions of the log. - Perform the move, and release the latch. - -*/ - -int truncateLog_LogWriter(lsn_t); - - -/** - @return The LSN of the first entry in the log file. (If the file - is empty, this returns the LSN of the log entry that would be - created if writeLogEntry were called.) -*/ -lsn_t firstLogEntry(); -/** - Close the log stream -*/ -void closeLogWriter(); +stasis_log_t* openLogWriter(); /** Actually deletes the log file that may have been written to disk! Danger!! - Only use after calling closeLogStream AND you are sure there are no active (or - future active) transactions! + Only use after calling closeLogStream AND you are sure there are no active + (or future active) transactions! @todo This only exists because the tests use it...once the logfile name isn't hardcoded, remove this function. */ void deleteLogWriter(); -/** - Read a log entry at a particular LSN. - - @param LSN the LSN of the entry that will be read. -*/ -LogEntry * readLSNEntry_LogWriter(lsn_t LSN); -static inline lsn_t nextEntry_LogWriter(const LogEntry * e) { - return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t); -} - -extern int logWriter_isDurable; - -long sizeofInternalLogEntry_LogWriter(const LogEntry * e); - END_C_DECLS #endif /* __LLADD_LOGGER_LOGWRITER_H */ diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index 8e8d5dd..9b4e3c0 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -46,9 +46,9 @@ terms specified in this license. * Interface to Stasis' log file. * * @ingroup LOGGING_DISCIPLINE - * + * * $Id$ - * + * */ @@ -64,110 +64,144 @@ terms specified in this license. */ typedef int (guard_fcn_t)(const LogEntry *, void *); -typedef struct { - /** The LSN of the log entry that we would return if next is called. */ - lsn_t next_offset; - /** The LSN of the log entry that we would return if previous is called. */ - lsn_t prev_offset; - guard_fcn_t * guard; - void * guard_state; -} LogHandle; +typedef struct stasis_log_t stasis_log_t; /** Contains the state needed by the logging layer to perform operations on a transaction. */ typedef struct { - int xid; + int xid; lsn_t prevLSN; lsn_t recLSN; } TransactionLog; -/** - This is the log implementation that is being used. - - Before Stasis is intialized it will be set to a default value. +/** + This is the log implementation that is being used. + + Before Stasis is intialized it will be set to a default value. It may be changed before Tinit() is called by assigning to it. The default can be overridden at compile time by defining USE_LOGGER. (eg: gcc ... -DUSE_LOGGER=LOG_TO_FOO) - @see constants.h for a list of recognized log implementations. + @see constants.h for a list of recognized log implementations. (The constants are named LOG_TO_*) - */ +*/ extern int loggerType; -int LogInit(int logType); -int LogDeinit(); -void LogForce(lsn_t lsn); -/** - @param lsn The first lsn that will be available after truncation. -*/ -void LogTruncate(lsn_t lsn); +struct stasis_log_t { + /** + Needed by sizeofLogEntry + */ + lsn_t (*sizeof_internal_entry)(struct stasis_log_t* log, const LogEntry * e); + + /** + Append a log entry to the end of the log. + + @param e This call sets e->LSN to entry's offset. + @return 0 on success + */ + int (*write_entry)(struct stasis_log_t* log, LogEntry * e); + + /** + Read a log entry, given its LSN. + @param lsn The lsn of the log entry to be read. + */ + const LogEntry* (*read_entry)(struct stasis_log_t* log, lsn_t lsn); + + /** + Given a log entry, return the LSN of the next entry. + */ + lsn_t (*next_entry)(struct stasis_log_t* log, const LogEntry * e); + + /** + This function returns the LSN of the most recent + log entry that has not been flushed to disk. If the entire log + is flushed, this function returns the LSN of the entry that will + be allocated the next time the log is appended to. + */ + lsn_t (*first_unstable_lsn)(struct stasis_log_t* log); + + /** + Force any enqueued, unwritten entries to disk + */ + void (*force_tail)(struct stasis_log_t* log); + + /** + @param lsn The first lsn that will be available after truncation. + @return 0 on success + */ + int (*truncate)(struct stasis_log_t* log, lsn_t lsn); + + /** + Returns the LSN of the first entry of the log. If the log is + empty, return the LSN that will be assigned to the next log + entry that is appended to the log. + */ + lsn_t (*truncation_point)(struct stasis_log_t* log); + /** + @return 0 on success + */ + int (*deinit)(struct stasis_log_t* log); + + int (*is_durable)(); +}; -/** This function is guaranteed to return the LSN of the most recent - log entry that has not been flushed to disk. (If the entire log - is flushed, this function returns the LSN of the entry that will - be allocated the next time the log is appended to. */ -lsn_t LogFlushedLSN(); -/** Returns the LSN of the first entry of the log, or the LSN of the - next to be allocated if the log is empty) */ -lsn_t LogTruncationPoint(); -/** Read a log entry, given its LSN. - @param lsn The lsn of the log entry to be read. -*/ -const LogEntry * LogReadLSN(lsn_t lsn); /** - Given a log entry, return the LSN of the next entry. -*/ -lsn_t LogNextEntry(const LogEntry * e); + @todo get rid of this! + */ +extern stasis_log_t* stasis_log_file; + + +void LogForce(stasis_log_t* log, lsn_t lsn); /** Inform the logging layer that a new transaction has begun, and obtain a handle. */ -TransactionLog LogTransBegin(int xid); +TransactionLog LogTransBegin(stasis_log_t* log, int xid); /** - Write a transaction PREPARE to the log tail. Blocks until the - prepare record is stable. + Write a transaction PREPARE to the log tail. Blocks until the + prepare record is stable. - @return the lsn of the prepare log entry - */ -lsn_t LogTransPrepare(TransactionLog * l); -/** - Write a transaction COMMIT to the log tail. Blocks until the commit - record is stable. - - @return the lsn of the commit log entry. + @return the lsn of the prepare log entry */ -lsn_t LogTransCommit(TransactionLog * l); +lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l); +/** + Write a transaction COMMIT to the log tail. Blocks until the commit + record is stable. + + @return the lsn of the commit log entry. +*/ +lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l); /** - Write a transaction ABORT to the log tail. Does not force the log. + Write a transaction ABORT to the log tail. Does not force the log. - @return the lsn of the abort log entry. + @return the lsn of the abort log entry. */ -lsn_t LogTransAbort(TransactionLog * l); +lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l); /** - LogUpdate writes an UPDATELOG log record to the log tail. It also interprets - its operation argument to the extent necessary for allocating and laying out - the log entry. Finally, it updates the state of the parameter l. + Write a end transaction record @see XEND + + @todo Implement LogEnd */ -LogEntry * LogUpdate(TransactionLog * l, Page * p, unsigned int operation, - const byte * arg, size_t arg_size); +void LogEnd (stasis_log_t* log, TransactionLog * l); /** - Any LogEntry that is returned by a function in logger2.h or - logHandle.h should be freed using this function. - - @param e The log entry to be freed. (The "const" here is a hack - that allows LogReadLSN to return a const *. + LogUpdate writes an UPDATELOG log record to the log tail. It + also interprets its operation argument to the extent necessary for + allocating and laying out the log entry. Finally, it updates the + state of the parameter l. */ -void FreeLogEntry(const LogEntry * e); +LogEntry * LogUpdate(stasis_log_t* log, + TransactionLog * l, Page * p, unsigned int operation, + const byte * arg, size_t arg_size); /** Write a compensation log record. These records are used to allow @@ -175,29 +209,14 @@ void FreeLogEntry(const LogEntry * e); record the completion of undo operations, amongst other things. @return the lsn of the CLR entry that was written to the log. - (Needed so that the lsn slot of the page in question can be - updated.) + (Needed so that the lsn slot of the page in question can be + updated.) */ -lsn_t LogCLR(const LogEntry * e); +lsn_t LogCLR(stasis_log_t* log, const LogEntry * e); -lsn_t LogDummyCLR(int xid, lsn_t prev_lsn, lsn_t compensated_lsn); +lsn_t LogDummyCLR(stasis_log_t* log, int xid, + lsn_t prev_lsn, lsn_t compensated_lsn); -/** - Write a end transaction record @see XEND - @todo Implement LogEnd -*/ -void LogEnd (TransactionLog * l); - -/** - Needed by sizeofLogEntry -*/ -long LoggerSizeOfInternalLogEntry(const LogEntry * e); - -/** - For internal use only... This would be static, but it is called by - the test cases. -*/ -void LogWrite(LogEntry * e); #endif diff --git a/stasis/pageHandle.h b/stasis/pageHandle.h index ecb3227..5079d91 100644 --- a/stasis/pageHandle.h +++ b/stasis/pageHandle.h @@ -15,6 +15,7 @@ * * @param dat The page to be flushed to disk. No concurrent calls * may have the same value of dat. + * */ extern void (*pageWrite)(Page * dat); diff --git a/stasis/recovery.h b/stasis/recovery.h index fb38228..f5fe77c 100644 --- a/stasis/recovery.h +++ b/stasis/recovery.h @@ -1,9 +1,10 @@ - #ifndef __LLADD_RECOVERY2_H #define __LLADD_RECOVERY2_H -void InitiateRecovery(); +#include + +void stasis_recovery_initiate(stasis_log_t* log); /** This really doesn't belong in recovery.c, but there's so much code overlap, it doesn't make sense not to put it there. */ -void undoTrans(); +void undoTrans(stasis_log_t*log, TransactionLog transaction); #endif diff --git a/stasis/transactional.h b/stasis/transactional.h index 722537f..0d5aa8a 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -752,7 +752,6 @@ int stasis_transaction_table_forget(int xid); */ lsn_t transactions_minRecLSN(); - /** Report Stasis' current durability guarantees. diff --git a/stasis/truncation.h b/stasis/truncation.h index 926a016..5cf6188 100644 --- a/stasis/truncation.h +++ b/stasis/truncation.h @@ -60,6 +60,8 @@ BEGIN_C_DECLS #ifndef __LLADD_TRUNCATION_H #define __LLADD_TRUNCATION_H 1 +#include + void dirtyPagesInit(); void dirtyPagesDeinit(); @@ -73,17 +75,17 @@ int dirtyPages_isDirty(Page * p); */ void dirtyPages_flushRange(pageid_t start, pageid_t stop); -void truncationInit(); -void truncationDeinit(); +void stasis_truncation_init(); +void stasis_truncation_deinit(); /** Spawn a periodic, demand-based log truncation thread. */ -void autoTruncate(); +void stasis_truncation_thread_start(); /** Initiate a round of log truncation. */ -int truncateNow(int force); +int stasis_truncation_truncate(stasis_log_t* log, int force); END_C_DECLS #endif diff --git a/test/stasis/check_logWriter.c b/test/stasis/check_logWriter.c index 5d67130..b63fb4d 100644 --- a/test/stasis/check_logWriter.c +++ b/test/stasis/check_logWriter.c @@ -54,6 +54,7 @@ terms specified in this license. #include #include #include +#include #include #include @@ -86,7 +87,7 @@ static void setup_log() { rid.slot = 0; rid.size = sizeof(unsigned long); - LogWrite(e); + stasis_log_file->write_entry(stasis_log_file,e); prevLSN = e->LSN; if(first) { @@ -94,28 +95,28 @@ static void setup_log() { firstLSN = prevLSN; } - f = LogReadLSN(prevLSN); + f = stasis_log_file->read_entry(stasis_log_file, prevLSN); fail_unless(sizeofLogEntry(e) == sizeofLogEntry(f), "Log entry changed size!!"); fail_unless(0 == memcmp(e,f,sizeofLogEntry(e)), "Log entries did not agree!!"); - FreeLogEntry (e); - FreeLogEntry (f); + freeLogEntry(e); + freeLogEntry(f); e = allocUpdateLogEntry(prevLSN, xid, 1, rid.page, args, args_size); - LogWrite(e); + stasis_log_file->write_entry(stasis_log_file,e); prevLSN = e->prevLSN; // LogEntry * g = allocCLRLogEntry(100, 1, 200, rid, 0); //prevLSN); LogEntry * g = allocCLRLogEntry(e); // XXX will probably break g->prevLSN = firstLSN; - LogWrite(g); + stasis_log_file->write_entry(stasis_log_file,g); assert (g->type == CLRLOG); - prevLSN = g->LSN; - - FreeLogEntry (e); - FreeLogEntry (g); + prevLSN = g->LSN; + + freeLogEntry (e); + freeLogEntry (g); } } /** @@ -130,7 +131,7 @@ static void setup_log() { In particular, logWriter checks to make sure that each log entry's size matches the size that it recorded before the logEntry. Also, when checking the 1000 of 3000 entries, this test uses - LogReadLSN, which tests the logWriter's ability to succesfully + log->read_entry, which tests the logWriter's ability to succesfully manipulate LSN's. @todo Test logHandle more thoroughly. (Still need to test the guard mechanism.) @@ -139,18 +140,20 @@ static void setup_log() { START_TEST(loggerTest) { const LogEntry * e; - LogHandle h; + LogHandle* h; int i = 0; setup_log(); - h = getLogHandle(); + h = getLogHandle(stasis_log_file); - while((e = nextInLog(&h))) { - FreeLogEntry(e); + while((e = nextInLog(h))) { + freeLogEntry(e); i++; assert(i < 4000); } + freeLogHandle(h); + assert(i == 3000); deleteLogWriter(); @@ -167,21 +170,22 @@ END_TEST START_TEST(logHandleColdReverseIterator) { const LogEntry * e; setup_log(); - LogHandle lh = getLogHandle(); + LogHandle* lh = getLogHandle(stasis_log_file); int i = 0; - while(((e = nextInLog(&lh)) && (i < 100)) ) { - FreeLogEntry(e); + while(((e = nextInLog(lh)) && (i < 100)) ) { + freeLogEntry(e); i++; } i = 0; - lh = getLSNHandle(e->LSN); - while((e = previousInTransaction(&lh))) { + lh = getLSNHandle(stasis_log_file, e->LSN); + while((e = previousInTransaction(lh))) { i++; - FreeLogEntry(e); + freeLogEntry(e); } + freeLogHandle(lh); assert(i <= 4); /* We should almost immediately hit a clr that goes to the beginning of the log... */ Tdeinit(); } @@ -199,58 +203,58 @@ START_TEST(loggerTruncate) { const LogEntry * tmp; setup_log(); - LogHandle lh = getLogHandle(); + LogHandle* lh = getLogHandle(stasis_log_file); int i = 0; while(i < 234) { i++; - le = nextInLog(&lh); + le = nextInLog(lh); } - le2 = nextInLog(&lh); + le2 = nextInLog(lh); i = 0; while(i < 23) { i++; - le3 = nextInLog(&lh); + le3 = nextInLog(lh); } - LogTruncate(le->LSN); + stasis_log_file->truncate(stasis_log_file, le->LSN); - tmp = LogReadLSN(le->LSN); + tmp = stasis_log_file->read_entry(stasis_log_file, le->LSN); fail_unless(NULL != tmp, NULL); fail_unless(tmp->LSN == le->LSN, NULL); - FreeLogEntry(tmp); - tmp = LogReadLSN(le2->LSN); + freeLogEntry(tmp); + tmp = stasis_log_file->read_entry(stasis_log_file, le2->LSN); fail_unless(NULL != tmp, NULL); fail_unless(tmp->LSN == le2->LSN, NULL); - FreeLogEntry(tmp); - tmp = LogReadLSN(le3->LSN); + freeLogEntry(tmp); + tmp = stasis_log_file->read_entry(stasis_log_file, le3->LSN); fail_unless(NULL != tmp, NULL); fail_unless(tmp->LSN == le3->LSN, NULL); - FreeLogEntry(tmp); - - lh = getLogHandle(); + freeLogEntry(tmp); + freeLogHandle(lh); + lh = getLogHandle(stasis_log_file); i = 0; - FreeLogEntry(le); - FreeLogEntry(le2); - FreeLogEntry(le3); + freeLogEntry(le); + freeLogEntry(le2); + freeLogEntry(le3); - while((le = nextInLog(&lh))) { + while((le = nextInLog(lh))) { if(le->type != INTERNALLOG) { i++; } - FreeLogEntry(le); + freeLogEntry(le); } assert(i == (3000 - 234 + 1)); - + freeLogHandle(lh); Tdeinit(); } END_TEST @@ -303,11 +307,11 @@ static void* worker_thread(void * arg) { #ifdef NO_CONCURRENCY pthread_mutex_lock(&big); #endif - LogTruncate(myTruncVal); + stasis_log_file->truncate(stasis_log_file, myTruncVal); #ifdef NO_CONCURRENCY pthread_mutex_unlock(&big); #endif - assert(LogTruncationPoint() >= myTruncVal); + assert(stasis_log_file->truncation_point(stasis_log_file) >= myTruncVal); } if(threshold < 3) { @@ -316,7 +320,7 @@ static void* worker_thread(void * arg) { #ifdef NO_CONCURRENCY pthread_mutex_lock(&big); #endif - LogWrite(le); + stasis_log_file->write_entry(stasis_log_file,le); #ifdef NO_CONCURRENCY pthread_mutex_unlock(&big); #endif @@ -331,10 +335,10 @@ static void* worker_thread(void * arg) { lsn_t lsn = lsns[entry]; pthread_mutex_unlock(&random_mutex); - const LogEntry * e = LogReadLSN(lsn); + const LogEntry * e = stasis_log_file->read_entry(stasis_log_file, lsn); assert(e->xid == entry+key); - FreeLogEntry(e); + freeLogEntry(e); } else { pthread_mutex_unlock(&random_mutex); } @@ -344,7 +348,7 @@ static void* worker_thread(void * arg) { /* Try to interleave requests as much as possible */ sched_yield(); - FreeLogEntry(le); + freeLogEntry(le); } @@ -391,41 +395,56 @@ void reopenLogWorkload(int truncating) { stasis_transaction_table_active_transaction_count_set(0); - LogInit(loggerType); + if(LOG_TO_FILE == loggerType) { + stasis_log_file = openLogWriter(); + } else if(LOG_TO_MEMORY == loggerType) { + stasis_log_file = open_InMemoryLog(); + } else { + assert(stasis_log_file != NULL); + } + int xid = 1; - TransactionLog l = LogTransBegin(xid); + TransactionLog l = LogTransBegin(stasis_log_file, xid); lsn_t startLSN = 0; LogEntry * entries[ENTRY_COUNT]; for(int i = 0; i < ENTRY_COUNT; i++) { - entries[i] = LogUpdate(&l, NULL, OPERATION_NOOP, NULL, 0); + entries[i] = LogUpdate(stasis_log_file, + &l, NULL, OPERATION_NOOP, NULL, 0); if(i == SYNC_POINT) { if(truncating) { - LogTruncate(entries[i]->LSN); + stasis_log_file->truncate(stasis_log_file,entries[i]->LSN); startLSN = entries[i]->LSN; } } } - LogDeinit(); - LogInit(loggerType); + stasis_log_file->deinit(stasis_log_file); - LogHandle h; + if(LOG_TO_FILE == loggerType) { + stasis_log_file = openLogWriter(); + } else if(LOG_TO_MEMORY == loggerType) { + stasis_log_file = open_InMemoryLog(); + } else { + assert(stasis_log_file != NULL); + } + + LogHandle * h; int i; if(truncating) { - h = getLogHandle(); + h = getLogHandle(stasis_log_file); i = SYNC_POINT; } else { - h = getLogHandle(); + h = getLogHandle(stasis_log_file); i = 0; } const LogEntry * e; - while((e = nextInLog(&h))) { + while((e = nextInLog(h))) { if(e->type != INTERNALLOG) { assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i])); assert(!memcmp(e, entries[i], sizeofLogEntry(entries[i]))); @@ -438,22 +457,24 @@ void reopenLogWorkload(int truncating) { LogEntry * entries2[ENTRY_COUNT]; for(int i = 0; i < ENTRY_COUNT; i++) { - entries2[i] = LogUpdate(&l, NULL, OPERATION_NOOP, NULL, 0); + entries2[i] = LogUpdate(stasis_log_file, &l, NULL, OPERATION_NOOP, + NULL, 0); if(i == SYNC_POINT) { - syncLog_LogWriter(); + stasis_log_file->force_tail(stasis_log_file); } } + freeLogHandle(h); if(truncating) { - h = getLSNHandle(startLSN); + h = getLSNHandle(stasis_log_file, startLSN); i = SYNC_POINT; } else { - h = getLogHandle(); + h = getLogHandle(stasis_log_file); i = 0; } - while((e = nextInLog(&h))) { + while((e = nextInLog(h))) { if(e->type != INTERNALLOG) { if( i < ENTRY_COUNT) { assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i])); @@ -467,10 +488,11 @@ void reopenLogWorkload(int truncating) { } } + freeLogHandle(h); assert(i == (ENTRY_COUNT * 2)); stasis_truncation_automatic = 1; - LogDeinit(); + stasis_log_file->deinit(stasis_log_file); } START_TEST(loggerReopenTest) { diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 4606f76..4264aab 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -184,7 +184,8 @@ START_TEST(operation_physical_do_undo) { // XXX This is a hack to put some stuff in the log. Otherwise, Tdeinit() fails. for(int i = 0; i < 10; i++) - LogWrite(allocCommonLogEntry(-1, -1, -1)); + stasis_log_file->write_entry(stasis_log_file, + allocCommonLogEntry(-1, -1, -1)); /** @todo need to re-think check_operations. The test is pretty broken. */ Tcommit(xid); diff --git a/utilities/logfile_dump.c b/utilities/logfile_dump.c index 6762220..e24095c 100644 --- a/utilities/logfile_dump.c +++ b/utilities/logfile_dump.c @@ -59,18 +59,18 @@ static char * logEntryToString(const LogEntry * le) { void setupOperationsTable(); int main() { - LogHandle lh; + LogHandle* lh; const LogEntry * le; setupOperationsTable(); - - if(openLogWriter()) { + stasis_log_t* log; + if(NULL == (log = openLogWriter())) { printf("Couldn't open log.\n"); } - lh = getLogHandle(); /*LSNHandle(0); */ + lh = getLogHandle(log); - while((le = nextInLog(&lh))) { + while((le = nextInLog(lh))) { char * s = logEntryToString(le); if(s) { @@ -78,8 +78,8 @@ int main() { free(s); } - FreeLogEntry(le); + freeLogEntry(le); } - + freeLogHandle(lh); } diff --git a/utilities/truncate_log.c b/utilities/truncate_log.c index 83be1e0..231e06b 100644 --- a/utilities/truncate_log.c +++ b/utilities/truncate_log.c @@ -2,7 +2,7 @@ #include int main(void) { Tinit(); - truncateNow(1); + stasis_truncation_truncate(stasis_log_file, 1); Tdeinit(); return compensation_error();