From 2dcc6598d2f07f31d768492fee60855c95085084 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Mon, 1 Dec 2008 22:45:32 +0000 Subject: [PATCH] added support for softcommit, and for compile / runtime configuration of stasis log + storefile locations --- src/stasis/bufferManager/legacy/pageFile.c | 8 +- src/stasis/flags.c | 29 ++++++ src/stasis/logger/inMemoryLog.c | 5 +- src/stasis/logger/logWriter.c | 115 ++++++++++++++------- src/stasis/logger/logger2.c | 31 +++--- src/stasis/operations/prepare.c | 6 +- src/stasis/page/raw.c | 2 +- src/stasis/pageHandle.c | 2 +- src/stasis/transactional2.c | 12 ++- src/stasis/truncation.c | 6 +- stasis/constants.h | 10 +- stasis/flags.h | 6 ++ stasis/logger/logWriter.h | 4 +- stasis/logger/logger2.h | 31 +++++- test/stasis/check_logWriter.c | 12 ++- utilities/logfile_dump.c | 5 +- 16 files changed, 198 insertions(+), 86 deletions(-) diff --git a/src/stasis/bufferManager/legacy/pageFile.c b/src/stasis/bufferManager/legacy/pageFile.c index b5a3ff8..a2acde3 100644 --- a/src/stasis/bufferManager/legacy/pageFile.c +++ b/src/stasis/bufferManager/legacy/pageFile.c @@ -94,7 +94,7 @@ static void pfPageWrite(Page * ret) { // If necessary, force the log to disk so that ret's LSN will be stable. assert(ret->LSN == stasis_page_lsn_read(ret)); - LogForce(stasis_log_file, ret->LSN); + LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL); pthread_mutex_lock(&stable_mutex); @@ -143,9 +143,11 @@ void openPageFile() { DEBUG("Opening storefile.\n"); #ifdef PAGE_FILE_O_DIRECT - stable = open (STORE_FILE, O_CREAT | O_RDWR | O_DIRECT, FILE_PERM); //S_IRWXU | S_IRWXG | S_IRWXO); + stable = open (stasis_store_file_name, + O_CREAT | O_RDWR | O_DIRECT, FILE_PERM); #else - stable = open (STORE_FILE, O_CREAT | O_RDWR, FILE_PERM);//S_IRWXU | S_IRWXG | S_IRWXO); + stable = open (stasis_store_file_name, + O_CREAT | O_RDWR, FILE_PERM); #endif if(!pageFile_isDurable) { fprintf(stderr, "\n**********\n"); diff --git a/src/stasis/flags.c b/src/stasis/flags.c index 97cb376..de67991 100644 --- a/src/stasis/flags.c +++ b/src/stasis/flags.c @@ -2,6 +2,10 @@ #include #include +#include +#include +#include + #ifdef BUFFER_MANAGER_TYPE int bufferManagerType = BUFFER_MANAGER_TYPE; #else @@ -37,3 +41,28 @@ int stasis_truncation_automatic = STASIS_TRUNCATION_AUTOMATIC; #else int stasis_truncation_automatic = 1; #endif + +#ifdef STASIS_LOG_FILE_NAME +char * stasis_log_file_name = STASIS_LOG_FILE_NAME; +#else +char * stasis_log_file_name = "logfile.txt"; +#endif + +#ifdef STASIS_STORE_FILE_NAME +char * stasis_store_file_name = STASIS_STORE_FILE_NAME; +#else +char * stasis_store_file_name = "storefile.txt"; +#endif + +#ifdef STASIS_LOG_FILE_MODE +int stasis_log_file_mode = STASIS_LOG_FILE_MODE; +#else +int stasis_log_file_mode = (O_CREAT | O_RDWR | O_SYNC); +#endif + +#ifdef STASIS_LOG_FILE_PERMISSIONS +int stasis_log_file_permissions = STASIS_LOG_FILE_PERMISSIONS; +#else +int stasis_log_file_permissions = (S_IRUSR | S_IWUSR | S_IRGRP| + S_IWGRP | S_IROTH | S_IWOTH); +#endif diff --git a/src/stasis/logger/inMemoryLog.c b/src/stasis/logger/inMemoryLog.c index 09d4a60..6992f37 100644 --- a/src/stasis/logger/inMemoryLog.c +++ b/src/stasis/logger/inMemoryLog.c @@ -42,11 +42,12 @@ static int writeLogEntry_InMemoryLog(stasis_log_t * log, LogEntry *e) { unlock(flushedLSN_lock); } -static lsn_t flushedLSN_InMemoryLog(stasis_log_t* log) { +static lsn_t flushedLSN_InMemoryLog(stasis_log_t* log, + stasis_log_force_mode_t mode) { return nextAvailableLSN; } -static void syncLog_InMemoryLog(stasis_log_t* log) { +static void syncLog_InMemoryLog(stasis_log_t* log, stasis_log_force_mode_t m){ // no-op } diff --git a/src/stasis/logger/logWriter.c b/src/stasis/logger/logWriter.c index 394f643..bb97d13 100644 --- a/src/stasis/logger/logWriter.c +++ b/src/stasis/logger/logWriter.c @@ -72,13 +72,29 @@ terms specified in this license. static FILE * logFILE = 0; static int roLogFD = 0; -static int logWriter_isDurable = 1; static lsn_t debug_lsn = -1; +static char * log_filename = 0; +static char * log_scratch_filename = 0; +static int log_filemode = 0; +static int log_fileperm = 0; +static char log_softcommit = 0; /** @see flushedLSN_LogWriter() */ -static lsn_t flushedLSN_stable; +/** + The first unstable LSN for write ahead logging purposes. +*/ +static lsn_t flushedLSN_wal; +/** + The first unstable LSN for commit purposes. This can be greater than + flushedLSN_wal. +*/ +static lsn_t flushedLSN_commit; +/** + The first LSN that hasn't made it into roLogFD. This can be greater than + flushedLSN_commit +*/ static lsn_t flushedLSN_internal; /** Invariant: No thread is writing to flushedLSN. Since @@ -88,10 +104,7 @@ static lsn_t flushedLSN_internal; static rwl * flushedLSN_lock; /** - Before writeLogEntry is called, this value is 0. Once writeLogEntry - is called, it is the next available LSN. - - @see writeLogEntry + The LSN that will be assigned to the next log entry. */ static lsn_t nextAvailableLSN; @@ -163,7 +176,7 @@ static int close_LogWriter(stasis_log_t * log); static int writeLogEntry_LogWriter(stasis_log_t* log, LogEntry * e); static inline int isDurable_LogWriter(stasis_log_t* log) { - return logWriter_isDurable; + return !log_softcommit; } static inline lsn_t firstLogEntry_LogWriter(stasis_log_t* log); @@ -172,7 +185,16 @@ static inline lsn_t nextEntry_LogWriter(stasis_log_t* log, return nextEntry(e); } -stasis_log_t* openLogWriter() { +stasis_log_t* openLogWriter(const char * filename, + int filemode, int fileperm) { + + log_filename = strdup(filename); + log_scratch_filename = malloc(strlen(log_filename) + 2); + strcpy(log_scratch_filename, log_filename); + strcat(log_scratch_filename, "~"); + log_filemode = filemode; + log_fileperm = fileperm; + log_softcommit = !(filemode & O_SYNC); stasis_log_t proto = { sizeofInternalLogEntry_LogWriter, // sizeof_internal_entry @@ -203,14 +225,7 @@ stasis_log_t* openLogWriter() { therefore all seeks) run through the second descriptor. */ int logFD; - if(logWriter_isDurable) { - logFD = open(LOG_FILE, LOG_MODE, FILE_PERM); //, S_IRWXU | S_IRWXG | S_IRWXO); - } else { - fprintf(stderr, "\n**********\n"); - fprintf (stderr, "logWriter.c: logWriter_isDurable==0; the logger will not force writes to disk.\n"); - fprintf (stderr, " Transactions will not be durable if the system crashes.\n**********\n"); - logFD = open(LOG_FILE, O_CREAT | O_RDWR, FILE_PERM);// S_IRWXU | S_IRWXG | S_IRWXO); - } + logFD = open(log_filename, log_filemode, log_fileperm); if(logFD == -1) { perror("Couldn't open log file for append.\n"); abort(); @@ -230,7 +245,7 @@ stasis_log_t* openLogWriter() { even if fflush() is used to push the changes out to disk. Therefore, we use a file descriptor, and read() instead of a FILE and fread(). */ - roLogFD = open(LOG_FILE, O_RDONLY, 0); + roLogFD = open(log_filename, O_RDONLY, 0); if(roLogFD == -1) { perror("Couldn't open log file for reads.\n"); @@ -243,8 +258,9 @@ stasis_log_t* openLogWriter() { pthread_mutex_init(&log_write_mutex, NULL); pthread_mutex_init(&nextAvailableLSN_mutex, NULL); pthread_mutex_init(&truncateLog_mutex, NULL); - - flushedLSN_stable = 0; + + flushedLSN_wal = 0; + flushedLSN_commit = 0; flushedLSN_internal = 0; /* Seek append only log to the end of the file. This is unnecessary, @@ -327,7 +343,8 @@ stasis_log_t* openLogWriter() { // Reset log_crc to zero (nextAvailableLSN immediately follows a crc // entry). - flushedLSN_stable = nextAvailableLSN; + flushedLSN_wal = nextAvailableLSN; + flushedLSN_commit = nextAvailableLSN; flushedLSN_internal = nextAvailableLSN; log_crc = 0; @@ -444,7 +461,8 @@ static void syncLogInternal() { } -static void syncLog_LogWriter(stasis_log_t * log) { +static void syncLog_LogWriter(stasis_log_t * log, + stasis_log_force_mode_t mode) { lsn_t newFlushedLSN; pthread_mutex_lock(&log_write_mutex); @@ -460,13 +478,24 @@ static void syncLog_LogWriter(stasis_log_t * log) { log_crc = 0; pthread_mutex_unlock(&log_write_mutex); - // Since we opened the logfile with O_SYNC, fflush() is sufficient. - fflush(logFILE); - // update flushedLSN after fflush returns. + fflush(logFILE); + // If we opened the logfile with O_SYNC, fflush() is sufficient. + // Otherwise, we're running in soft commit mode and need to manually force + // the log before allowing page writeback. + if(log_softcommit && mode == LOG_FORCE_WAL) { + fsync(fileno(logFILE)); + } + + // update flushedLSN after fflush returns. writelock(flushedLSN_lock, 0); - if(newFlushedLSN > flushedLSN_stable) { - flushedLSN_stable = newFlushedLSN; + if((!log_softcommit) || mode == LOG_FORCE_WAL) { + if(newFlushedLSN > flushedLSN_wal) { + flushedLSN_wal = newFlushedLSN; + } + } + if(newFlushedLSN > flushedLSN_commit) { + flushedLSN_commit = newFlushedLSN; } if(newFlushedLSN > flushedLSN_internal) { flushedLSN_internal = newFlushedLSN; @@ -475,11 +504,19 @@ static void syncLog_LogWriter(stasis_log_t * log) { writeunlock(flushedLSN_lock); } -static lsn_t flushedLSN_LogWriter(stasis_log_t* log) { +static lsn_t flushedLSN_LogWriter(stasis_log_t* log, + stasis_log_force_mode_t mode) { readlock(flushedLSN_lock, 0); - lsn_t ret = flushedLSN_stable; + lsn_t ret; + if(mode == LOG_FORCE_COMMIT) { + ret = flushedLSN_commit; + } else if(mode == LOG_FORCE_WAL) { + ret = flushedLSN_wal; + } else { + abort(); + } readunlock(flushedLSN_lock); - return ret; + return ret; } static lsn_t flushedLSNInternal() { readlock(flushedLSN_lock, 0); @@ -490,14 +527,15 @@ static lsn_t flushedLSNInternal() { static int close_LogWriter(stasis_log_t* log) { /* Get the whole thing to the disk before closing it. */ - syncLog_LogWriter(log); + syncLog_LogWriter(log, LOG_FORCE_WAL); fclose(logFILE); close(roLogFD); logFILE = NULL; roLogFD = 0; - flushedLSN_stable = 0; + flushedLSN_wal = 0; + flushedLSN_commit = 0; flushedLSN_internal = 0; nextAvailableLSN = 0; global_offset = 0; @@ -512,12 +550,14 @@ static int close_LogWriter(stasis_log_t* log) { free(buffer); buffer = 0; log_crc = 0; + free(log_filename); + free(log_scratch_filename); free(log); return 0; } void deleteLogWriter() { - remove(LOG_FILE); + remove(log_filename); } static LogEntry * readLogEntry() { @@ -680,7 +720,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { } /* w+ = truncate, and open for writing. */ - tmpLog = fopen(LOG_FILE_SCRATCH, "w+"); + tmpLog = fopen(log_scratch_filename, "w+"); if (tmpLog==NULL) { pthread_mutex_unlock(&truncateLog_mutex); @@ -780,7 +820,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { close(roLogFD); fclose(tmpLog); - if(rename(LOG_FILE_SCRATCH, LOG_FILE)) { + if(rename(log_scratch_filename, log_filename)) { pthread_mutex_unlock(&log_read_mutex); pthread_mutex_unlock(&log_write_mutex); pthread_mutex_unlock(&truncateLog_mutex); @@ -791,8 +831,9 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { // printf("Truncation complete.\n"); fflush(stdout); } - - int logFD = open(LOG_FILE, O_CREAT | O_RDWR | O_SYNC, FILE_PERM); //S_IRWXU | S_IRWXG | S_IRWXO); + + int logFD = open(log_filename, log_filemode, log_fileperm); + if(logFD == -1) { perror("Couldn't open log file for append.\n"); abort(); @@ -824,7 +865,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { } } - roLogFD = open(LOG_FILE, O_RDONLY, 0); + roLogFD = open(log_filename, O_RDONLY, 0); if(roLogFD == -1) { perror("Couldn't open log file for reads.\n"); diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index 2ec716c..7d8adf0 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -165,11 +165,11 @@ lsn_t LogDummyCLR(stasis_log_t* log, int xid, lsn_t prevLSN, return ret; } -static void groupForce(stasis_log_t* log, lsn_t lsn); +static void groupCommit(stasis_log_t* log, lsn_t lsn); lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l) { lsn_t lsn = LogTransCommon(log, l, XCOMMIT); - groupForce(log, lsn); + groupCommit(log, lsn); return lsn; } @@ -178,15 +178,22 @@ lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) { } lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) { lsn_t lsn = LogTransCommonPrepare(log, l); - groupForce(log, lsn); + groupCommit(log, lsn); return lsn; } -void LogForce(stasis_log_t* log, lsn_t lsn) { - groupForce(log, lsn); +void LogForce(stasis_log_t* log, lsn_t lsn, + stasis_log_force_mode_t mode) { + if(mode == LOG_FORCE_COMMIT) { + groupCommit(log, lsn); + } else { + if(log->first_unstable_lsn(log,mode) >= lsn) { + log->force_tail(log,mode); + } + } } -static void groupForce(stasis_log_t* log, lsn_t lsn) { +static void groupCommit(stasis_log_t* log, lsn_t lsn) { static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER; @@ -194,7 +201,7 @@ static void groupForce(stasis_log_t* log, lsn_t lsn) { struct timespec timeout; pthread_mutex_lock(&check_commit); - if(log->first_unstable_lsn(log) >= lsn) { + if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn) { pthread_mutex_unlock(&check_commit); return; } @@ -220,19 +227,19 @@ static void groupForce(stasis_log_t* log, lsn_t lsn) { __FILE__, __LINE__); break; } - if(log->first_unstable_lsn(log) >= lsn) { + if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn) { pendingCommits--; pthread_mutex_unlock(&check_commit); return; } } } - if(log->first_unstable_lsn(log) < lsn) { - log->force_tail(log); - assert(log->first_unstable_lsn(log) >= lsn); + if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) <= lsn) { + log->force_tail(log, LOG_FORCE_COMMIT); + assert(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn); pthread_cond_broadcast(&tooFewXacts); } - assert(log->first_unstable_lsn(log) >= lsn); + assert(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn); pendingCommits--; pthread_mutex_unlock(&check_commit); return; diff --git a/src/stasis/operations/prepare.c b/src/stasis/operations/prepare.c index 566f645..7495d0a 100644 --- a/src/stasis/operations/prepare.c +++ b/src/stasis/operations/prepare.c @@ -55,14 +55,14 @@ terms specified in this license. recordid prepare_bogus_rec = { 0, 0, 0}; static int op_prepare(const LogEntry * e, Page * p) { - LogForce(stasis_log_file, e->LSN); + LogForce(stasis_log_file, e->LSN, LOG_FORCE_COMMIT); return 0; } -Operation getPrepare() { +Operation getPrepare() { Operation o = { OPERATION_PREPARE, /* id */ - OPERATION_NOOP, + OPERATION_NOOP, &op_prepare /* Function */ }; return o; diff --git a/src/stasis/page/raw.c b/src/stasis/page/raw.c index dd610b6..40be361 100644 --- a/src/stasis/page/raw.c +++ b/src/stasis/page/raw.c @@ -9,7 +9,7 @@ recovery. */ void rawPageInferMetadata(Page * p) { - p->LSN = stasis_log_file->first_unstable_lsn(stasis_log_file); + p->LSN = stasis_log_file->first_unstable_lsn(stasis_log_file, LOG_FORCE_WAL); } byte* rawPageGetData(int xid, Page * p) { diff --git a/src/stasis/pageHandle.c b/src/stasis/pageHandle.c index 0b2b96a..e528fbe 100644 --- a/src/stasis/pageHandle.c +++ b/src/stasis/pageHandle.c @@ -30,7 +30,7 @@ static void phWrite(Page * ret) { // or we'll deadlock. writelock(ret->rwlatch,0); stasis_page_flushed(ret); - LogForce(stasis_log_file, ret->LSN); + LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL); int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); if(err) { printf("Couldn't write to page file: %s\n", strerror(err)); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index f4090cc..3b5ff17 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -141,7 +141,9 @@ int Tinit() { setupOperationsTable(); dirtyPagesInit(); if(LOG_TO_FILE == loggerType) { - stasis_log_file = openLogWriter(); + stasis_log_file = openLogWriter(stasis_log_file_name, + stasis_log_file_mode, + stasis_log_file_permissions); } else if(LOG_TO_MEMORY == loggerType) { stasis_log_file = open_InMemoryLog(); } else { @@ -172,7 +174,7 @@ int Tinit() { switch(bufferManagerFileHandleType) { case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: { struct sf_args * slow_arg = malloc(sizeof(sf_args)); - slow_arg->filename = STORE_FILE; + slow_arg->filename = stasis_store_file_name; slow_arg->openMode = openMode; @@ -203,12 +205,14 @@ int Tinit() { } break; case BUFFER_MANAGER_FILE_HANDLE_FILE: { stasis_handle_t * pageFile = - stasis_handle_open_file(0, STORE_FILE, openMode, FILE_PERM); + stasis_handle_open_file(0, stasis_store_file_name, + openMode, FILE_PERM); pageHandleOpen(pageFile); } break; case BUFFER_MANAGER_FILE_HANDLE_PFILE: { stasis_handle_t * pageFile = - stasis_handle_open_pfile(0, STORE_FILE, openMode, FILE_PERM); + stasis_handle_open_pfile(0, stasis_store_file_name, + openMode, FILE_PERM); pageHandleOpen(pageFile); } break; case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: { diff --git a/src/stasis/truncation.c b/src/stasis/truncation.c index 45c7811..6512e54 100644 --- a/src/stasis/truncation.c +++ b/src/stasis/truncation.c @@ -175,7 +175,7 @@ static void* stasis_truncation_thread_worker(void* logp) { stasis_log_t * log = logp; pthread_mutex_lock(&shutdown_mutex); while(initialized) { - if(log->first_unstable_lsn(log) - log->truncation_point(log) + if(log->first_unstable_lsn(log, LOG_FORCE_WAL) - log->truncation_point(log) > TARGET_LOG_SIZE) { stasis_truncation_truncate(log, 0); } @@ -211,7 +211,7 @@ int stasis_truncation_truncate(stasis_log_t* log, int force) { lsn_t page_rec_lsn = dirtyPages_minRecLSN(); lsn_t xact_rec_lsn = transactions_minRecLSN(); - lsn_t flushed_lsn = log->first_unstable_lsn(log); + lsn_t flushed_lsn = log->first_unstable_lsn(log, LOG_FORCE_WAL); lsn_t rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn; rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn; @@ -226,7 +226,7 @@ int stasis_truncation_truncate(stasis_log_t* log, int force) { log->truncate(log, rec_lsn); return 1; } else { - lsn_t flushed = log->first_unstable_lsn(log); + lsn_t flushed = log->first_unstable_lsn(log, LOG_FORCE_WAL); if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) { //fprintf(stderr, "Flushing dirty buffers: rec_lsn = %ld log_trunc = %ld flushed = %ld\n", rec_lsn, log_trunc, flushed); dirtyPages_flush(); diff --git a/stasis/constants.h b/stasis/constants.h index 3261cae..598cbef 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -58,13 +58,7 @@ terms specified in this license. /*#define DEBUG 1*/ -#define LOG_FILE "logfile.txt" -#define LOG_FILE_SCRATCH "logfile.txt~" -#define STORE_FILE "storefile.txt" -#define BLOB0_FILE "blob0_file.txt" -#define BLOB1_FILE "blob1_file.txt" - -/* +/* define error codes */ #define LLADD_DEADLOCK -1 @@ -243,7 +237,7 @@ extern const short SLOT_TYPE_LENGTHS[]; #define TALLOC_REGION_SIZE 100 // Pages #define FILE_PERM (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH) -#define LOG_MODE (O_CREAT | O_RDWR | O_SYNC) +//#define LOG_MODE (O_CREAT | O_RDWR | O_SYNC) #define MAX_LSM_COMPARATORS 16 #define MAX_LSM_PAGE_INITIALIZERS 256 diff --git a/stasis/flags.h b/stasis/flags.h index 65c747c..3c00288 100644 --- a/stasis/flags.h +++ b/stasis/flags.h @@ -57,4 +57,10 @@ extern int stasis_suppress_unclean_shutdown_warnings; */ extern int stasis_truncation_automatic; +extern char * stasis_log_file_name; +extern int stasis_log_file_mode; +extern int stasis_log_file_permissions; + +extern char * stasis_store_file_name; + #endif diff --git a/stasis/logger/logWriter.h b/stasis/logger/logWriter.h index 3fe5d1f..c772c17 100644 --- a/stasis/logger/logWriter.h +++ b/stasis/logger/logWriter.h @@ -82,7 +82,9 @@ BEGIN_C_DECLS @return NULL on error */ -stasis_log_t* openLogWriter(); +stasis_log_t* openLogWriter(const char * filename, + int filemode, + int fileperm); /** Actually deletes the log file that may have been written to disk! Danger!! diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index 9b4e3c0..0e35288 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -66,6 +66,10 @@ typedef int (guard_fcn_t)(const LogEntry *, void *); typedef struct stasis_log_t stasis_log_t; +typedef enum { + LOG_FORCE_COMMIT, LOG_FORCE_WAL +} stasis_log_force_mode_t; + /** Contains the state needed by the logging layer to perform operations on a transaction. @@ -122,13 +126,17 @@ struct stasis_log_t { log entry that has not been flushed to disk. If the entire log is flushed, this function returns the LSN of the entry that will be allocated the next time the log is appended to. + + @param log The log file, which may or may not support durability. + @param mode The mode in which the log entries must have been forced. */ - lsn_t (*first_unstable_lsn)(struct stasis_log_t* log); + lsn_t (*first_unstable_lsn)(struct stasis_log_t* log, + stasis_log_force_mode_t mode); /** Force any enqueued, unwritten entries to disk */ - void (*force_tail)(struct stasis_log_t* log); + void (*force_tail)(struct stasis_log_t* log, stasis_log_force_mode_t mode); /** @param lsn The first lsn that will be available after truncation. @@ -147,7 +155,7 @@ struct stasis_log_t { */ int (*deinit)(struct stasis_log_t* log); - int (*is_durable)(); + int (*is_durable)(struct stasis_log_t* log); }; /** @@ -155,8 +163,23 @@ struct stasis_log_t { */ extern stasis_log_t* stasis_log_file; +/** + Synchronously make a prefix of the log durable. -void LogForce(stasis_log_t* log, lsn_t lsn); + This method uses group commit to reduce the number of calls to + force_tail(). + + Durability is guaranteed in an implementation-specific way. + + @param log A log that already contains the entries to be forced to disk. + @param lsn Log entries up to and including the one that overlaps lsn will + be durable after this call returns. + @param mode The durability mode associated with this call. + + @see stasis_log_force_mode_t + @see logger2.h for information about force_tail(). + */ +void LogForce(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode); /** Inform the logging layer that a new transaction has begun, and diff --git a/test/stasis/check_logWriter.c b/test/stasis/check_logWriter.c index b63fb4d..40e15ad 100644 --- a/test/stasis/check_logWriter.c +++ b/test/stasis/check_logWriter.c @@ -64,8 +64,6 @@ terms specified in this license. #define LOG_NAME "check_logWriter.log" -//static int logType = LOG_TO_MEMORY; - static void setup_log() { int i; lsn_t prevLSN = -1; @@ -396,7 +394,9 @@ void reopenLogWorkload(int truncating) { stasis_transaction_table_active_transaction_count_set(0); if(LOG_TO_FILE == loggerType) { - stasis_log_file = openLogWriter(); + stasis_log_file = openLogWriter(stasis_log_file_name, + stasis_log_file_mode, + stasis_log_file_permissions); } else if(LOG_TO_MEMORY == loggerType) { stasis_log_file = open_InMemoryLog(); } else { @@ -425,7 +425,9 @@ void reopenLogWorkload(int truncating) { stasis_log_file->deinit(stasis_log_file); if(LOG_TO_FILE == loggerType) { - stasis_log_file = openLogWriter(); + stasis_log_file = openLogWriter(stasis_log_file_name, + stasis_log_file_mode, + stasis_log_file_permissions); } else if(LOG_TO_MEMORY == loggerType) { stasis_log_file = open_InMemoryLog(); } else { @@ -460,7 +462,7 @@ void reopenLogWorkload(int truncating) { entries2[i] = LogUpdate(stasis_log_file, &l, NULL, OPERATION_NOOP, NULL, 0); if(i == SYNC_POINT) { - stasis_log_file->force_tail(stasis_log_file); + stasis_log_file->force_tail(stasis_log_file, LOG_FORCE_COMMIT); } } diff --git a/utilities/logfile_dump.c b/utilities/logfile_dump.c index e24095c..ea20144 100644 --- a/utilities/logfile_dump.c +++ b/utilities/logfile_dump.c @@ -5,7 +5,6 @@ #include #include - static char * logEntryToString(const LogEntry * le) { char * ret = NULL; int err; @@ -64,7 +63,9 @@ int main() { setupOperationsTable(); stasis_log_t* log; - if(NULL == (log = openLogWriter())) { + if(NULL == (log = openLogWriter(stasis_log_file_name, + stasis_log_file_mode, + stasis_log_file_permissions))) { printf("Couldn't open log.\n"); }