added support for softcommit, and for compile / runtime configuration of stasis log + storefile locations
This commit is contained in:
parent
17d4bb3ed4
commit
2dcc6598d2
16 changed files with 198 additions and 86 deletions
|
@ -94,7 +94,7 @@ static void pfPageWrite(Page * ret) {
|
|||
// If necessary, force the log to disk so that ret's LSN will be stable.
|
||||
|
||||
assert(ret->LSN == stasis_page_lsn_read(ret));
|
||||
LogForce(stasis_log_file, ret->LSN);
|
||||
LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL);
|
||||
|
||||
pthread_mutex_lock(&stable_mutex);
|
||||
|
||||
|
@ -143,9 +143,11 @@ void openPageFile() {
|
|||
DEBUG("Opening storefile.\n");
|
||||
|
||||
#ifdef PAGE_FILE_O_DIRECT
|
||||
stable = open (STORE_FILE, O_CREAT | O_RDWR | O_DIRECT, FILE_PERM); //S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
stable = open (stasis_store_file_name,
|
||||
O_CREAT | O_RDWR | O_DIRECT, FILE_PERM);
|
||||
#else
|
||||
stable = open (STORE_FILE, O_CREAT | O_RDWR, FILE_PERM);//S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
stable = open (stasis_store_file_name,
|
||||
O_CREAT | O_RDWR, FILE_PERM);
|
||||
#endif
|
||||
if(!pageFile_isDurable) {
|
||||
fprintf(stderr, "\n**********\n");
|
||||
|
|
|
@ -2,6 +2,10 @@
|
|||
#include <stasis/flags.h>
|
||||
#include <stasis/constants.h>
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#ifdef BUFFER_MANAGER_TYPE
|
||||
int bufferManagerType = BUFFER_MANAGER_TYPE;
|
||||
#else
|
||||
|
@ -37,3 +41,28 @@ int stasis_truncation_automatic = STASIS_TRUNCATION_AUTOMATIC;
|
|||
#else
|
||||
int stasis_truncation_automatic = 1;
|
||||
#endif
|
||||
|
||||
#ifdef STASIS_LOG_FILE_NAME
|
||||
char * stasis_log_file_name = STASIS_LOG_FILE_NAME;
|
||||
#else
|
||||
char * stasis_log_file_name = "logfile.txt";
|
||||
#endif
|
||||
|
||||
#ifdef STASIS_STORE_FILE_NAME
|
||||
char * stasis_store_file_name = STASIS_STORE_FILE_NAME;
|
||||
#else
|
||||
char * stasis_store_file_name = "storefile.txt";
|
||||
#endif
|
||||
|
||||
#ifdef STASIS_LOG_FILE_MODE
|
||||
int stasis_log_file_mode = STASIS_LOG_FILE_MODE;
|
||||
#else
|
||||
int stasis_log_file_mode = (O_CREAT | O_RDWR | O_SYNC);
|
||||
#endif
|
||||
|
||||
#ifdef STASIS_LOG_FILE_PERMISSIONS
|
||||
int stasis_log_file_permissions = STASIS_LOG_FILE_PERMISSIONS;
|
||||
#else
|
||||
int stasis_log_file_permissions = (S_IRUSR | S_IWUSR | S_IRGRP|
|
||||
S_IWGRP | S_IROTH | S_IWOTH);
|
||||
#endif
|
||||
|
|
|
@ -42,11 +42,12 @@ static int writeLogEntry_InMemoryLog(stasis_log_t * log, LogEntry *e) {
|
|||
unlock(flushedLSN_lock);
|
||||
}
|
||||
|
||||
static lsn_t flushedLSN_InMemoryLog(stasis_log_t* log) {
|
||||
static lsn_t flushedLSN_InMemoryLog(stasis_log_t* log,
|
||||
stasis_log_force_mode_t mode) {
|
||||
return nextAvailableLSN;
|
||||
}
|
||||
|
||||
static void syncLog_InMemoryLog(stasis_log_t* log) {
|
||||
static void syncLog_InMemoryLog(stasis_log_t* log, stasis_log_force_mode_t m){
|
||||
// no-op
|
||||
}
|
||||
|
||||
|
|
|
@ -72,13 +72,29 @@ terms specified in this license.
|
|||
static FILE * logFILE = 0;
|
||||
static int roLogFD = 0;
|
||||
|
||||
static int logWriter_isDurable = 1;
|
||||
static lsn_t debug_lsn = -1;
|
||||
|
||||
static char * log_filename = 0;
|
||||
static char * log_scratch_filename = 0;
|
||||
static int log_filemode = 0;
|
||||
static int log_fileperm = 0;
|
||||
static char log_softcommit = 0;
|
||||
/**
|
||||
@see flushedLSN_LogWriter()
|
||||
*/
|
||||
static lsn_t flushedLSN_stable;
|
||||
/**
|
||||
The first unstable LSN for write ahead logging purposes.
|
||||
*/
|
||||
static lsn_t flushedLSN_wal;
|
||||
/**
|
||||
The first unstable LSN for commit purposes. This can be greater than
|
||||
flushedLSN_wal.
|
||||
*/
|
||||
static lsn_t flushedLSN_commit;
|
||||
/**
|
||||
The first LSN that hasn't made it into roLogFD. This can be greater than
|
||||
flushedLSN_commit
|
||||
*/
|
||||
static lsn_t flushedLSN_internal;
|
||||
/**
|
||||
Invariant: No thread is writing to flushedLSN. Since
|
||||
|
@ -88,10 +104,7 @@ static lsn_t flushedLSN_internal;
|
|||
static rwl * flushedLSN_lock;
|
||||
|
||||
/**
|
||||
Before writeLogEntry is called, this value is 0. Once writeLogEntry
|
||||
is called, it is the next available LSN.
|
||||
|
||||
@see writeLogEntry
|
||||
The LSN that will be assigned to the next log entry.
|
||||
*/
|
||||
static lsn_t nextAvailableLSN;
|
||||
|
||||
|
@ -163,7 +176,7 @@ static int close_LogWriter(stasis_log_t * log);
|
|||
static int writeLogEntry_LogWriter(stasis_log_t* log, LogEntry * e);
|
||||
|
||||
static inline int isDurable_LogWriter(stasis_log_t* log) {
|
||||
return logWriter_isDurable;
|
||||
return !log_softcommit;
|
||||
}
|
||||
static inline lsn_t firstLogEntry_LogWriter(stasis_log_t* log);
|
||||
|
||||
|
@ -172,7 +185,16 @@ static inline lsn_t nextEntry_LogWriter(stasis_log_t* log,
|
|||
return nextEntry(e);
|
||||
}
|
||||
|
||||
stasis_log_t* openLogWriter() {
|
||||
stasis_log_t* openLogWriter(const char * filename,
|
||||
int filemode, int fileperm) {
|
||||
|
||||
log_filename = strdup(filename);
|
||||
log_scratch_filename = malloc(strlen(log_filename) + 2);
|
||||
strcpy(log_scratch_filename, log_filename);
|
||||
strcat(log_scratch_filename, "~");
|
||||
log_filemode = filemode;
|
||||
log_fileperm = fileperm;
|
||||
log_softcommit = !(filemode & O_SYNC);
|
||||
|
||||
stasis_log_t proto = {
|
||||
sizeofInternalLogEntry_LogWriter, // sizeof_internal_entry
|
||||
|
@ -203,14 +225,7 @@ stasis_log_t* openLogWriter() {
|
|||
therefore all seeks) run through the second descriptor. */
|
||||
|
||||
int logFD;
|
||||
if(logWriter_isDurable) {
|
||||
logFD = open(LOG_FILE, LOG_MODE, FILE_PERM); //, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
} else {
|
||||
fprintf(stderr, "\n**********\n");
|
||||
fprintf (stderr, "logWriter.c: logWriter_isDurable==0; the logger will not force writes to disk.\n");
|
||||
fprintf (stderr, " Transactions will not be durable if the system crashes.\n**********\n");
|
||||
logFD = open(LOG_FILE, O_CREAT | O_RDWR, FILE_PERM);// S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
}
|
||||
logFD = open(log_filename, log_filemode, log_fileperm);
|
||||
if(logFD == -1) {
|
||||
perror("Couldn't open log file for append.\n");
|
||||
abort();
|
||||
|
@ -230,7 +245,7 @@ stasis_log_t* openLogWriter() {
|
|||
even if fflush() is used to push the changes out to disk.
|
||||
Therefore, we use a file descriptor, and read() instead of a FILE
|
||||
and fread(). */
|
||||
roLogFD = open(LOG_FILE, O_RDONLY, 0);
|
||||
roLogFD = open(log_filename, O_RDONLY, 0);
|
||||
|
||||
if(roLogFD == -1) {
|
||||
perror("Couldn't open log file for reads.\n");
|
||||
|
@ -244,7 +259,8 @@ stasis_log_t* openLogWriter() {
|
|||
pthread_mutex_init(&nextAvailableLSN_mutex, NULL);
|
||||
pthread_mutex_init(&truncateLog_mutex, NULL);
|
||||
|
||||
flushedLSN_stable = 0;
|
||||
flushedLSN_wal = 0;
|
||||
flushedLSN_commit = 0;
|
||||
flushedLSN_internal = 0;
|
||||
/*
|
||||
Seek append only log to the end of the file. This is unnecessary,
|
||||
|
@ -327,7 +343,8 @@ stasis_log_t* openLogWriter() {
|
|||
// Reset log_crc to zero (nextAvailableLSN immediately follows a crc
|
||||
// entry).
|
||||
|
||||
flushedLSN_stable = nextAvailableLSN;
|
||||
flushedLSN_wal = nextAvailableLSN;
|
||||
flushedLSN_commit = nextAvailableLSN;
|
||||
flushedLSN_internal = nextAvailableLSN;
|
||||
log_crc = 0;
|
||||
|
||||
|
@ -444,7 +461,8 @@ static void syncLogInternal() {
|
|||
|
||||
}
|
||||
|
||||
static void syncLog_LogWriter(stasis_log_t * log) {
|
||||
static void syncLog_LogWriter(stasis_log_t * log,
|
||||
stasis_log_force_mode_t mode) {
|
||||
lsn_t newFlushedLSN;
|
||||
|
||||
pthread_mutex_lock(&log_write_mutex);
|
||||
|
@ -460,13 +478,24 @@ static void syncLog_LogWriter(stasis_log_t * log) {
|
|||
log_crc = 0;
|
||||
|
||||
pthread_mutex_unlock(&log_write_mutex);
|
||||
// Since we opened the logfile with O_SYNC, fflush() is sufficient.
|
||||
|
||||
fflush(logFILE);
|
||||
// If we opened the logfile with O_SYNC, fflush() is sufficient.
|
||||
// Otherwise, we're running in soft commit mode and need to manually force
|
||||
// the log before allowing page writeback.
|
||||
if(log_softcommit && mode == LOG_FORCE_WAL) {
|
||||
fsync(fileno(logFILE));
|
||||
}
|
||||
|
||||
// update flushedLSN after fflush returns.
|
||||
writelock(flushedLSN_lock, 0);
|
||||
if(newFlushedLSN > flushedLSN_stable) {
|
||||
flushedLSN_stable = newFlushedLSN;
|
||||
if((!log_softcommit) || mode == LOG_FORCE_WAL) {
|
||||
if(newFlushedLSN > flushedLSN_wal) {
|
||||
flushedLSN_wal = newFlushedLSN;
|
||||
}
|
||||
}
|
||||
if(newFlushedLSN > flushedLSN_commit) {
|
||||
flushedLSN_commit = newFlushedLSN;
|
||||
}
|
||||
if(newFlushedLSN > flushedLSN_internal) {
|
||||
flushedLSN_internal = newFlushedLSN;
|
||||
|
@ -475,9 +504,17 @@ static void syncLog_LogWriter(stasis_log_t * log) {
|
|||
writeunlock(flushedLSN_lock);
|
||||
}
|
||||
|
||||
static lsn_t flushedLSN_LogWriter(stasis_log_t* log) {
|
||||
static lsn_t flushedLSN_LogWriter(stasis_log_t* log,
|
||||
stasis_log_force_mode_t mode) {
|
||||
readlock(flushedLSN_lock, 0);
|
||||
lsn_t ret = flushedLSN_stable;
|
||||
lsn_t ret;
|
||||
if(mode == LOG_FORCE_COMMIT) {
|
||||
ret = flushedLSN_commit;
|
||||
} else if(mode == LOG_FORCE_WAL) {
|
||||
ret = flushedLSN_wal;
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
readunlock(flushedLSN_lock);
|
||||
return ret;
|
||||
}
|
||||
|
@ -490,14 +527,15 @@ static lsn_t flushedLSNInternal() {
|
|||
|
||||
static int close_LogWriter(stasis_log_t* log) {
|
||||
/* Get the whole thing to the disk before closing it. */
|
||||
syncLog_LogWriter(log);
|
||||
syncLog_LogWriter(log, LOG_FORCE_WAL);
|
||||
|
||||
fclose(logFILE);
|
||||
close(roLogFD);
|
||||
logFILE = NULL;
|
||||
roLogFD = 0;
|
||||
|
||||
flushedLSN_stable = 0;
|
||||
flushedLSN_wal = 0;
|
||||
flushedLSN_commit = 0;
|
||||
flushedLSN_internal = 0;
|
||||
nextAvailableLSN = 0;
|
||||
global_offset = 0;
|
||||
|
@ -512,12 +550,14 @@ static int close_LogWriter(stasis_log_t* log) {
|
|||
free(buffer);
|
||||
buffer = 0;
|
||||
log_crc = 0;
|
||||
free(log_filename);
|
||||
free(log_scratch_filename);
|
||||
free(log);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void deleteLogWriter() {
|
||||
remove(LOG_FILE);
|
||||
remove(log_filename);
|
||||
}
|
||||
|
||||
static LogEntry * readLogEntry() {
|
||||
|
@ -680,7 +720,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
|
|||
}
|
||||
|
||||
/* w+ = truncate, and open for writing. */
|
||||
tmpLog = fopen(LOG_FILE_SCRATCH, "w+");
|
||||
tmpLog = fopen(log_scratch_filename, "w+");
|
||||
|
||||
if (tmpLog==NULL) {
|
||||
pthread_mutex_unlock(&truncateLog_mutex);
|
||||
|
@ -780,7 +820,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
|
|||
close(roLogFD);
|
||||
fclose(tmpLog);
|
||||
|
||||
if(rename(LOG_FILE_SCRATCH, LOG_FILE)) {
|
||||
if(rename(log_scratch_filename, log_filename)) {
|
||||
pthread_mutex_unlock(&log_read_mutex);
|
||||
pthread_mutex_unlock(&log_write_mutex);
|
||||
pthread_mutex_unlock(&truncateLog_mutex);
|
||||
|
@ -792,7 +832,8 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
|
|||
fflush(stdout);
|
||||
}
|
||||
|
||||
int logFD = open(LOG_FILE, O_CREAT | O_RDWR | O_SYNC, FILE_PERM); //S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
int logFD = open(log_filename, log_filemode, log_fileperm);
|
||||
|
||||
if(logFD == -1) {
|
||||
perror("Couldn't open log file for append.\n");
|
||||
abort();
|
||||
|
@ -824,7 +865,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
|
|||
}
|
||||
}
|
||||
|
||||
roLogFD = open(LOG_FILE, O_RDONLY, 0);
|
||||
roLogFD = open(log_filename, O_RDONLY, 0);
|
||||
|
||||
if(roLogFD == -1) {
|
||||
perror("Couldn't open log file for reads.\n");
|
||||
|
|
|
@ -165,11 +165,11 @@ lsn_t LogDummyCLR(stasis_log_t* log, int xid, lsn_t prevLSN,
|
|||
return ret;
|
||||
}
|
||||
|
||||
static void groupForce(stasis_log_t* log, lsn_t lsn);
|
||||
static void groupCommit(stasis_log_t* log, lsn_t lsn);
|
||||
|
||||
lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l) {
|
||||
lsn_t lsn = LogTransCommon(log, l, XCOMMIT);
|
||||
groupForce(log, lsn);
|
||||
groupCommit(log, lsn);
|
||||
return lsn;
|
||||
}
|
||||
|
||||
|
@ -178,15 +178,22 @@ lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) {
|
|||
}
|
||||
lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) {
|
||||
lsn_t lsn = LogTransCommonPrepare(log, l);
|
||||
groupForce(log, lsn);
|
||||
groupCommit(log, lsn);
|
||||
return lsn;
|
||||
}
|
||||
|
||||
void LogForce(stasis_log_t* log, lsn_t lsn) {
|
||||
groupForce(log, lsn);
|
||||
void LogForce(stasis_log_t* log, lsn_t lsn,
|
||||
stasis_log_force_mode_t mode) {
|
||||
if(mode == LOG_FORCE_COMMIT) {
|
||||
groupCommit(log, lsn);
|
||||
} else {
|
||||
if(log->first_unstable_lsn(log,mode) >= lsn) {
|
||||
log->force_tail(log,mode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void groupForce(stasis_log_t* log, lsn_t lsn) {
|
||||
static void groupCommit(stasis_log_t* log, lsn_t lsn) {
|
||||
static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
|
||||
static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
|
||||
|
||||
|
@ -194,7 +201,7 @@ static void groupForce(stasis_log_t* log, lsn_t lsn) {
|
|||
struct timespec timeout;
|
||||
|
||||
pthread_mutex_lock(&check_commit);
|
||||
if(log->first_unstable_lsn(log) >= lsn) {
|
||||
if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn) {
|
||||
pthread_mutex_unlock(&check_commit);
|
||||
return;
|
||||
}
|
||||
|
@ -220,19 +227,19 @@ static void groupForce(stasis_log_t* log, lsn_t lsn) {
|
|||
__FILE__, __LINE__);
|
||||
break;
|
||||
}
|
||||
if(log->first_unstable_lsn(log) >= lsn) {
|
||||
if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn) {
|
||||
pendingCommits--;
|
||||
pthread_mutex_unlock(&check_commit);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(log->first_unstable_lsn(log) < lsn) {
|
||||
log->force_tail(log);
|
||||
assert(log->first_unstable_lsn(log) >= lsn);
|
||||
if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) <= lsn) {
|
||||
log->force_tail(log, LOG_FORCE_COMMIT);
|
||||
assert(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn);
|
||||
pthread_cond_broadcast(&tooFewXacts);
|
||||
}
|
||||
assert(log->first_unstable_lsn(log) >= lsn);
|
||||
assert(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn);
|
||||
pendingCommits--;
|
||||
pthread_mutex_unlock(&check_commit);
|
||||
return;
|
||||
|
|
|
@ -55,7 +55,7 @@ terms specified in this license.
|
|||
recordid prepare_bogus_rec = { 0, 0, 0};
|
||||
|
||||
static int op_prepare(const LogEntry * e, Page * p) {
|
||||
LogForce(stasis_log_file, e->LSN);
|
||||
LogForce(stasis_log_file, e->LSN, LOG_FORCE_COMMIT);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
recovery.
|
||||
*/
|
||||
void rawPageInferMetadata(Page * p) {
|
||||
p->LSN = stasis_log_file->first_unstable_lsn(stasis_log_file);
|
||||
p->LSN = stasis_log_file->first_unstable_lsn(stasis_log_file, LOG_FORCE_WAL);
|
||||
}
|
||||
|
||||
byte* rawPageGetData(int xid, Page * p) {
|
||||
|
|
|
@ -30,7 +30,7 @@ static void phWrite(Page * ret) {
|
|||
// or we'll deadlock.
|
||||
writelock(ret->rwlatch,0);
|
||||
stasis_page_flushed(ret);
|
||||
LogForce(stasis_log_file, ret->LSN);
|
||||
LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL);
|
||||
int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE);
|
||||
if(err) {
|
||||
printf("Couldn't write to page file: %s\n", strerror(err));
|
||||
|
|
|
@ -141,7 +141,9 @@ int Tinit() {
|
|||
setupOperationsTable();
|
||||
dirtyPagesInit();
|
||||
if(LOG_TO_FILE == loggerType) {
|
||||
stasis_log_file = openLogWriter();
|
||||
stasis_log_file = openLogWriter(stasis_log_file_name,
|
||||
stasis_log_file_mode,
|
||||
stasis_log_file_permissions);
|
||||
} else if(LOG_TO_MEMORY == loggerType) {
|
||||
stasis_log_file = open_InMemoryLog();
|
||||
} else {
|
||||
|
@ -172,7 +174,7 @@ int Tinit() {
|
|||
switch(bufferManagerFileHandleType) {
|
||||
case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: {
|
||||
struct sf_args * slow_arg = malloc(sizeof(sf_args));
|
||||
slow_arg->filename = STORE_FILE;
|
||||
slow_arg->filename = stasis_store_file_name;
|
||||
|
||||
slow_arg->openMode = openMode;
|
||||
|
||||
|
@ -203,12 +205,14 @@ int Tinit() {
|
|||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_FILE: {
|
||||
stasis_handle_t * pageFile =
|
||||
stasis_handle_open_file(0, STORE_FILE, openMode, FILE_PERM);
|
||||
stasis_handle_open_file(0, stasis_store_file_name,
|
||||
openMode, FILE_PERM);
|
||||
pageHandleOpen(pageFile);
|
||||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_PFILE: {
|
||||
stasis_handle_t * pageFile =
|
||||
stasis_handle_open_pfile(0, STORE_FILE, openMode, FILE_PERM);
|
||||
stasis_handle_open_pfile(0, stasis_store_file_name,
|
||||
openMode, FILE_PERM);
|
||||
pageHandleOpen(pageFile);
|
||||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: {
|
||||
|
|
|
@ -175,7 +175,7 @@ static void* stasis_truncation_thread_worker(void* logp) {
|
|||
stasis_log_t * log = logp;
|
||||
pthread_mutex_lock(&shutdown_mutex);
|
||||
while(initialized) {
|
||||
if(log->first_unstable_lsn(log) - log->truncation_point(log)
|
||||
if(log->first_unstable_lsn(log, LOG_FORCE_WAL) - log->truncation_point(log)
|
||||
> TARGET_LOG_SIZE) {
|
||||
stasis_truncation_truncate(log, 0);
|
||||
}
|
||||
|
@ -211,7 +211,7 @@ int stasis_truncation_truncate(stasis_log_t* log, int force) {
|
|||
|
||||
lsn_t page_rec_lsn = dirtyPages_minRecLSN();
|
||||
lsn_t xact_rec_lsn = transactions_minRecLSN();
|
||||
lsn_t flushed_lsn = log->first_unstable_lsn(log);
|
||||
lsn_t flushed_lsn = log->first_unstable_lsn(log, LOG_FORCE_WAL);
|
||||
|
||||
lsn_t rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
|
||||
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
|
||||
|
@ -226,7 +226,7 @@ int stasis_truncation_truncate(stasis_log_t* log, int force) {
|
|||
log->truncate(log, rec_lsn);
|
||||
return 1;
|
||||
} else {
|
||||
lsn_t flushed = log->first_unstable_lsn(log);
|
||||
lsn_t flushed = log->first_unstable_lsn(log, LOG_FORCE_WAL);
|
||||
if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) {
|
||||
//fprintf(stderr, "Flushing dirty buffers: rec_lsn = %ld log_trunc = %ld flushed = %ld\n", rec_lsn, log_trunc, flushed);
|
||||
dirtyPages_flush();
|
||||
|
|
|
@ -58,12 +58,6 @@ terms specified in this license.
|
|||
|
||||
/*#define DEBUG 1*/
|
||||
|
||||
#define LOG_FILE "logfile.txt"
|
||||
#define LOG_FILE_SCRATCH "logfile.txt~"
|
||||
#define STORE_FILE "storefile.txt"
|
||||
#define BLOB0_FILE "blob0_file.txt"
|
||||
#define BLOB1_FILE "blob1_file.txt"
|
||||
|
||||
/*
|
||||
define error codes
|
||||
*/
|
||||
|
@ -243,7 +237,7 @@ extern const short SLOT_TYPE_LENGTHS[];
|
|||
#define TALLOC_REGION_SIZE 100 // Pages
|
||||
|
||||
#define FILE_PERM (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)
|
||||
#define LOG_MODE (O_CREAT | O_RDWR | O_SYNC)
|
||||
//#define LOG_MODE (O_CREAT | O_RDWR | O_SYNC)
|
||||
|
||||
#define MAX_LSM_COMPARATORS 16
|
||||
#define MAX_LSM_PAGE_INITIALIZERS 256
|
||||
|
|
|
@ -57,4 +57,10 @@ extern int stasis_suppress_unclean_shutdown_warnings;
|
|||
*/
|
||||
extern int stasis_truncation_automatic;
|
||||
|
||||
extern char * stasis_log_file_name;
|
||||
extern int stasis_log_file_mode;
|
||||
extern int stasis_log_file_permissions;
|
||||
|
||||
extern char * stasis_store_file_name;
|
||||
|
||||
#endif
|
||||
|
|
|
@ -82,7 +82,9 @@ BEGIN_C_DECLS
|
|||
|
||||
@return NULL on error
|
||||
*/
|
||||
stasis_log_t* openLogWriter();
|
||||
stasis_log_t* openLogWriter(const char * filename,
|
||||
int filemode,
|
||||
int fileperm);
|
||||
|
||||
/**
|
||||
Actually deletes the log file that may have been written to disk! Danger!!
|
||||
|
|
|
@ -66,6 +66,10 @@ typedef int (guard_fcn_t)(const LogEntry *, void *);
|
|||
|
||||
typedef struct stasis_log_t stasis_log_t;
|
||||
|
||||
typedef enum {
|
||||
LOG_FORCE_COMMIT, LOG_FORCE_WAL
|
||||
} stasis_log_force_mode_t;
|
||||
|
||||
/**
|
||||
Contains the state needed by the logging layer to perform
|
||||
operations on a transaction.
|
||||
|
@ -122,13 +126,17 @@ struct stasis_log_t {
|
|||
log entry that has not been flushed to disk. If the entire log
|
||||
is flushed, this function returns the LSN of the entry that will
|
||||
be allocated the next time the log is appended to.
|
||||
|
||||
@param log The log file, which may or may not support durability.
|
||||
@param mode The mode in which the log entries must have been forced.
|
||||
*/
|
||||
lsn_t (*first_unstable_lsn)(struct stasis_log_t* log);
|
||||
lsn_t (*first_unstable_lsn)(struct stasis_log_t* log,
|
||||
stasis_log_force_mode_t mode);
|
||||
|
||||
/**
|
||||
Force any enqueued, unwritten entries to disk
|
||||
*/
|
||||
void (*force_tail)(struct stasis_log_t* log);
|
||||
void (*force_tail)(struct stasis_log_t* log, stasis_log_force_mode_t mode);
|
||||
|
||||
/**
|
||||
@param lsn The first lsn that will be available after truncation.
|
||||
|
@ -147,7 +155,7 @@ struct stasis_log_t {
|
|||
*/
|
||||
int (*deinit)(struct stasis_log_t* log);
|
||||
|
||||
int (*is_durable)();
|
||||
int (*is_durable)(struct stasis_log_t* log);
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -155,8 +163,23 @@ struct stasis_log_t {
|
|||
*/
|
||||
extern stasis_log_t* stasis_log_file;
|
||||
|
||||
/**
|
||||
Synchronously make a prefix of the log durable.
|
||||
|
||||
void LogForce(stasis_log_t* log, lsn_t lsn);
|
||||
This method uses group commit to reduce the number of calls to
|
||||
force_tail().
|
||||
|
||||
Durability is guaranteed in an implementation-specific way.
|
||||
|
||||
@param log A log that already contains the entries to be forced to disk.
|
||||
@param lsn Log entries up to and including the one that overlaps lsn will
|
||||
be durable after this call returns.
|
||||
@param mode The durability mode associated with this call.
|
||||
|
||||
@see stasis_log_force_mode_t
|
||||
@see logger2.h for information about force_tail().
|
||||
*/
|
||||
void LogForce(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode);
|
||||
|
||||
/**
|
||||
Inform the logging layer that a new transaction has begun, and
|
||||
|
|
|
@ -64,8 +64,6 @@ terms specified in this license.
|
|||
|
||||
#define LOG_NAME "check_logWriter.log"
|
||||
|
||||
//static int logType = LOG_TO_MEMORY;
|
||||
|
||||
static void setup_log() {
|
||||
int i;
|
||||
lsn_t prevLSN = -1;
|
||||
|
@ -396,7 +394,9 @@ void reopenLogWorkload(int truncating) {
|
|||
stasis_transaction_table_active_transaction_count_set(0);
|
||||
|
||||
if(LOG_TO_FILE == loggerType) {
|
||||
stasis_log_file = openLogWriter();
|
||||
stasis_log_file = openLogWriter(stasis_log_file_name,
|
||||
stasis_log_file_mode,
|
||||
stasis_log_file_permissions);
|
||||
} else if(LOG_TO_MEMORY == loggerType) {
|
||||
stasis_log_file = open_InMemoryLog();
|
||||
} else {
|
||||
|
@ -425,7 +425,9 @@ void reopenLogWorkload(int truncating) {
|
|||
stasis_log_file->deinit(stasis_log_file);
|
||||
|
||||
if(LOG_TO_FILE == loggerType) {
|
||||
stasis_log_file = openLogWriter();
|
||||
stasis_log_file = openLogWriter(stasis_log_file_name,
|
||||
stasis_log_file_mode,
|
||||
stasis_log_file_permissions);
|
||||
} else if(LOG_TO_MEMORY == loggerType) {
|
||||
stasis_log_file = open_InMemoryLog();
|
||||
} else {
|
||||
|
@ -460,7 +462,7 @@ void reopenLogWorkload(int truncating) {
|
|||
entries2[i] = LogUpdate(stasis_log_file, &l, NULL, OPERATION_NOOP,
|
||||
NULL, 0);
|
||||
if(i == SYNC_POINT) {
|
||||
stasis_log_file->force_tail(stasis_log_file);
|
||||
stasis_log_file->force_tail(stasis_log_file, LOG_FORCE_COMMIT);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
#include <stasis/logger/logHandle.h>
|
||||
#include <stasis/logger/logWriter.h>
|
||||
|
||||
|
||||
static char * logEntryToString(const LogEntry * le) {
|
||||
char * ret = NULL;
|
||||
int err;
|
||||
|
@ -64,7 +63,9 @@ int main() {
|
|||
|
||||
setupOperationsTable();
|
||||
stasis_log_t* log;
|
||||
if(NULL == (log = openLogWriter())) {
|
||||
if(NULL == (log = openLogWriter(stasis_log_file_name,
|
||||
stasis_log_file_mode,
|
||||
stasis_log_file_permissions))) {
|
||||
printf("Couldn't open log.\n");
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue