log refactoring

This commit is contained in:
Sears Russell 2008-12-01 19:48:59 +00:00
parent d9b00d457d
commit 17d4bb3ed4
27 changed files with 649 additions and 672 deletions

View file

@ -115,7 +115,7 @@ static void bufManReleasePage (Page * p) {
}
static Page * bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
Page * ret;
int spin = 0;

View file

@ -94,7 +94,7 @@ static void pfPageWrite(Page * ret) {
// If necessary, force the log to disk so that ret's LSN will be stable.
assert(ret->LSN == stasis_page_lsn_read(ret));
LogForce(ret->LSN);
LogForce(stasis_log_file, ret->LSN);
pthread_mutex_lock(&stable_mutex);

View file

@ -8,17 +8,8 @@ static lsn_t globalOffset;
static rwl * globalOffset_lock;
static LogEntry ** buffer;
static lsn_t bufferLen;
int open_InMemoryLog() {
flushedLSN_lock = initlock();
globalOffset_lock = initlock();
globalOffset = 0;
nextAvailableLSN = 0;
buffer = malloc(4096 * 1024 * sizeof (LogEntry *));
bufferLen =4096 * 1024;
return 0;
}
int writeLogEntry_InMemoryLog(LogEntry *e) {
static int writeLogEntry_InMemoryLog(stasis_log_t * log, LogEntry *e) {
writelock(flushedLSN_lock, 0);
lsn_t bufferOffset;
@ -33,6 +24,7 @@ int writeLogEntry_InMemoryLog(LogEntry *e) {
done = 1;
}
} while (!done);
return 0;
e->LSN = nextAvailableLSN;
@ -48,22 +40,21 @@ int writeLogEntry_InMemoryLog(LogEntry *e) {
unlock(globalOffset_lock);
unlock(flushedLSN_lock);
return 0;
}
lsn_t flushedLSN_InMemoryLog() {
static lsn_t flushedLSN_InMemoryLog(stasis_log_t* log) {
return nextAvailableLSN;
}
void syncLog_InMemoryLog() {
static void syncLog_InMemoryLog(stasis_log_t* log) {
// no-op
}
lsn_t nextEntry_InMemoryLog(const LogEntry * e) {
static lsn_t nextEntry_InMemoryLog(stasis_log_t * log, const LogEntry * e) {
return e->LSN + 1;
}
int truncateLog_InMemoryLog(lsn_t lsn) {
static int truncateLog_InMemoryLog(stasis_log_t * log, lsn_t lsn) {
writelock(flushedLSN_lock,1);
writelock(globalOffset_lock,1);
@ -85,11 +76,11 @@ int truncateLog_InMemoryLog(lsn_t lsn) {
return 0;
}
lsn_t firstLogEntry_InMemoryLog() {
static lsn_t firstLogEntry_InMemoryLog() {
return globalOffset;
}
void close_InMemoryLog() {
static int close_InMemoryLog(stasis_log_t * log) {
if(buffer) {
lsn_t firstEmptyOffset = nextAvailableLSN-globalOffset;
for(lsn_t i = 0; i < firstEmptyOffset; i++) {
@ -103,10 +94,12 @@ void close_InMemoryLog() {
buffer = 0;
}
free (log);
return 0;
}
LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn) {
static const LogEntry * readLSNEntry_InMemoryLog(stasis_log_t* log,
lsn_t lsn) {
// printf("lsn: %ld\n", lsn);
if(lsn >= nextAvailableLSN) { return 0; }
assert(lsn-globalOffset >= 0 && lsn-globalOffset< bufferLen);
@ -123,6 +116,32 @@ LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn) {
//printf("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN);
return ret;
}
long sizeofInternalLogEntry_InMemoryLog(const LogEntry * e) {
static lsn_t sizeofInternalLogEntry_InMemoryLog(stasis_log_t* log,
const LogEntry * e) {
abort();
}
static int isDurable_InMemoryLog(stasis_log_t*log) { return 0; }
stasis_log_t* open_InMemoryLog() {
flushedLSN_lock = initlock();
globalOffset_lock = initlock();
globalOffset = 0;
nextAvailableLSN = 0;
buffer = malloc(4096 * 1024 * sizeof (LogEntry *));
bufferLen =4096 * 1024;
static stasis_log_t proto = {
sizeofInternalLogEntry_InMemoryLog, // sizeof_internal_entry
writeLogEntry_InMemoryLog,// write_entry
readLSNEntry_InMemoryLog, // read_entry
nextEntry_InMemoryLog,// next_entry
flushedLSN_InMemoryLog, // first_unstable_lsn
syncLog_InMemoryLog, // force_tail
truncateLog_InMemoryLog, // truncate
firstLogEntry_InMemoryLog,// truncation_point
close_InMemoryLog, // deinit
isDurable_InMemoryLog// is_durable
};
stasis_log_t* log = malloc(sizeof(*log));
memcpy(log,&proto, sizeof(proto));
return log;
}

View file

@ -123,11 +123,13 @@ LogEntry * allocCLRLogEntry(const LogEntry * old_e) {
return (LogEntry*)ret;
}
void freeLogEntry(const LogEntry* e) {
free((void*)e);
}
long sizeofLogEntry(const LogEntry * log) {
switch (log->type) {
lsn_t sizeofLogEntry(const LogEntry * e) {
switch (e->type) {
case CLRLOG:
{
return sizeof(CLRLogEntry);
@ -135,10 +137,10 @@ long sizeofLogEntry(const LogEntry * log) {
case UPDATELOG:
{
return sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry) + log->update.arg_size;
sizeof(UpdateLogEntry) + e->update.arg_size;
}
case INTERNALLOG:
return LoggerSizeOfInternalLogEntry(log);
return stasis_log_file->sizeof_internal_entry(stasis_log_file, e);
case XPREPARE:
return sizeof(struct __raw_log_entry)+sizeof(lsn_t);
default:

View file

@ -46,6 +46,16 @@ terms specified in this license.
#include <stasis/logger/logHandle.h>
struct LogHandle {
/** The LSN of the log entry that we would return if next is called. */
lsn_t next_offset;
/** The LSN of the log entry that we would return if previous is called. */
lsn_t prev_offset;
guard_fcn_t* guard;
void* guard_state;
stasis_log_t* log;
};
/**
Sets the next and prev field of h, but does not set h.file_offset.
That should probably be set before calling this function.
@ -55,35 +65,39 @@ static void set_offsets(LogHandle * h, const LogEntry * e);
/*-------------------------------------------------------*/
LogHandle getLogHandle() {
LogHandle* getLogHandle(stasis_log_t* log) {
lsn_t lsn = LogTruncationPoint();
lsn_t lsn = log->truncation_point(log);
return getGuardedHandle(lsn, NULL, NULL);
return getGuardedHandle(log, lsn, NULL, NULL);
}
LogHandle getLSNHandle(lsn_t lsn) {
return getGuardedHandle(lsn, NULL, NULL);
LogHandle* getLSNHandle(stasis_log_t * log, lsn_t lsn) {
return getGuardedHandle(log, lsn, NULL, NULL);
}
LogHandle getGuardedHandle(lsn_t lsn, guard_fcn_t * guard, void * guard_state) {
LogHandle ret;
ret.next_offset = lsn;
ret.prev_offset = lsn;
ret.guard = guard;
ret.guard_state = guard_state;
LogHandle* getGuardedHandle(stasis_log_t* log, lsn_t lsn,
guard_fcn_t * guard, void * guard_state) {
LogHandle* ret = malloc(sizeof(*ret));
ret->next_offset = lsn;
ret->prev_offset = lsn;
ret->guard = guard;
ret->guard_state = guard_state;
ret->log = log;
return ret;
}
void freeLogHandle(LogHandle* lh) {
free(lh);
}
const LogEntry * nextInLog(LogHandle * h) {
const LogEntry * ret = LogReadLSN(h->next_offset);
const LogEntry * ret = h->log->read_entry(h->log,h->next_offset);
if(ret != NULL) {
set_offsets(h, ret);
}
if(h->guard) {
if(!(h->guard(ret, h->guard_state))) {
FreeLogEntry(ret);
freeLogEntry(ret);
ret = NULL;
}
}
@ -95,12 +109,12 @@ const LogEntry * nextInLog(LogHandle * h) {
const LogEntry * previousInTransaction(LogHandle * h) {
const LogEntry * ret = NULL;
if(h->prev_offset > 0) {
ret = LogReadLSN(h->prev_offset);
ret = h->log->read_entry(h->log, h->prev_offset);
set_offsets(h, ret);
if(h->guard) {
if(!h->guard(ret, h->guard_state)) {
FreeLogEntry(ret);
freeLogEntry(ret);
ret = NULL;
}
}
@ -111,6 +125,6 @@ const LogEntry * previousInTransaction(LogHandle * h) {
}
static void set_offsets(LogHandle * h, const LogEntry * e) {
h->next_offset = LogNextEntry(e);
h->next_offset = h->log->next_entry(h->log, e);
h->prev_offset = e->prevLSN;
}

View file

@ -66,13 +66,14 @@ terms specified in this license.
#include <stasis/bufferManager.h>
/**
@todo Should the log file be global?
/**
@todo remove all static vairables from logWriter.c
*/
static FILE * log = 0;
static FILE * logFILE = 0;
static int roLogFD = 0;
int logWriter_isDurable = 1;
static int logWriter_isDurable = 1;
static lsn_t debug_lsn = -1;
/**
@see flushedLSN_LogWriter()
@ -106,27 +107,26 @@ static lsn_t global_offset;
It is also used by truncateLog to block read requests while
rename() is called.
*/
pthread_mutex_t log_read_mutex;
static pthread_mutex_t log_read_mutex;
/**
Invariant: Any thread writing to the file must hold this lock. The
log truncation thread hold this lock from the point where it copies
the tail of the old log to the new log, until after the rename call
returns.
*/
pthread_mutex_t log_write_mutex;
returns.
*/
static pthread_mutex_t log_write_mutex;
/**
This mutex protects nextAvailableLSN, which has its own mutex
because the routines for reading and writing log entries both need
to acquire it, but only for a relatively short time.
*/
pthread_mutex_t nextAvailableLSN_mutex;
static pthread_mutex_t nextAvailableLSN_mutex;
/**
Invariant: We only want one thread in truncateLog at a time.
*/
pthread_mutex_t truncateLog_mutex;
static pthread_mutex_t truncateLog_mutex;
static char * buffer;
@ -145,14 +145,56 @@ static unsigned int log_crc;
static inline void update_log_crc(const LogEntry * le, unsigned int * crc) {
*crc = stasis_crc32(le, sizeofLogEntry(le), *crc);
}
static LogEntry * readLogEntry();
// internal methods
static void syncLogInternal();
int openLogWriter() {
static LogEntry* readLogEntry();
static inline lsn_t nextEntry(const LogEntry* e) {
return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t);
}
// implementations of log api methods
static lsn_t sizeofInternalLogEntry_LogWriter(stasis_log_t * log,
const LogEntry * e);
static void syncLog_LogWriter();
static lsn_t flushedLSN_LogWriter();
static const LogEntry* readLSNEntry_LogWriter(stasis_log_t * log, lsn_t lsn);
static int truncateLog_LogWriter(stasis_log_t* log, lsn_t lsn);
static int close_LogWriter(stasis_log_t * log);
static int writeLogEntry_LogWriter(stasis_log_t* log, LogEntry * e);
static inline int isDurable_LogWriter(stasis_log_t* log) {
return logWriter_isDurable;
}
static inline lsn_t firstLogEntry_LogWriter(stasis_log_t* log);
static inline lsn_t nextEntry_LogWriter(stasis_log_t* log,
const LogEntry* e) {
return nextEntry(e);
}
stasis_log_t* openLogWriter() {
stasis_log_t proto = {
sizeofInternalLogEntry_LogWriter, // sizeof_internal_entry
writeLogEntry_LogWriter,// write_entry
readLSNEntry_LogWriter, // read_entry
nextEntry_LogWriter,// next_entry
flushedLSN_LogWriter, // first_unstable_lsn
syncLog_LogWriter, // force_tail
truncateLog_LogWriter, // truncate
firstLogEntry_LogWriter,// truncation_point
close_LogWriter, // deinit
isDurable_LogWriter // is_durable
};
stasis_log_t* log = malloc(sizeof(*log));
memcpy(log,&proto, sizeof(proto));
// XXX hack; we call things that call into this object during init!
stasis_log_file = log;
buffer = malloc(BUFSIZE);
if(!buffer) { return LLADD_NO_MEM; }
if(!buffer) { return 0; /*LLADD_NO_MEM;*/ }
/* The file is opened twice for a reason. fseek() seems to call
fflush() under Linux, which normally would be a minor problem.
@ -173,16 +215,16 @@ int openLogWriter() {
perror("Couldn't open log file for append.\n");
abort();
}
log = fdopen(logFD, "w+");
logFILE = fdopen(logFD, "w+");
if (log==NULL) {
if (logFILE==NULL) {
perror("Couldn't open log file");
abort();
return LLADD_IO_ERROR;
return 0; //LLADD_IO_ERROR;
}
/* Increase the length of log's buffer, since it's in O_SYNC mode. */
setbuffer(log, buffer, BUFSIZE);
setbuffer(logFILE, buffer, BUFSIZE);
/* fread() doesn't notice when another handle writes to its file,
even if fflush() is used to push the changes out to disk.
@ -210,31 +252,31 @@ int openLogWriter() {
length of the file.
*/
if (myFseek(log, 0, SEEK_END)==0) {
if (myFseek(logFILE, 0, SEEK_END)==0) {
/*if file is empty, write an LSN at the 0th position. LSN 0 is
invalid, and this prevents us from using it. Also, the LSN at
this position is used after log truncation to store the
global offset for the truncated log.
*/
global_offset = 0;
size_t nmemb = fwrite(&global_offset, sizeof(lsn_t), 1, log);
size_t nmemb = fwrite(&global_offset, sizeof(lsn_t), 1, logFILE);
if(nmemb != 1) {
perror("Couldn't start new log file!");
return LLADD_IO_ERROR;
return 0; //LLADD_IO_ERROR;
}
} else {
off_t newPosition = lseek(roLogFD, 0, SEEK_SET);
if(newPosition == -1) {
perror("Could not seek to head of log");
return LLADD_IO_ERROR;
return 0; //LLADD_IO_ERROR;
}
ssize_t bytesRead = read(roLogFD, &global_offset, sizeof(lsn_t));
if(bytesRead != sizeof(lsn_t)) {
printf("Could not read log header.");
return LLADD_IO_ERROR;
return 0;//LLADD_IO_ERROR;
}
}
@ -256,28 +298,28 @@ int openLogWriter() {
while((le = readLogEntry())) {
if(le->type == INTERNALLOG) {
if (!(le->prevLSN) || (crc == (unsigned int) le->prevLSN)) {
nextAvailableLSN = nextEntry_LogWriter(le); //le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);
nextAvailableLSN = nextEntry(le);
crc = 0;
} else {
printf("Log corruption: %x != %x (lsn = %lld)\n", (unsigned int) le->prevLSN, crc, le->LSN);
// The log wasn't successfully forced to this point; discard
// everything after the last CRC.
FreeLogEntry(le);
freeLogEntry(le);
break;
}
} else {
update_log_crc(le, &crc);
}
FreeLogEntry(le);
freeLogEntry(le);
}
if(ftruncate(fileno(log), nextAvailableLSN-global_offset) == -1) {
if(ftruncate(fileno(logFILE), nextAvailableLSN-global_offset) == -1) {
perror("Couldn't discard junk at end of log");
}
// If there was trailing garbage at the end of the log, overwrite
// it.
if(myFseek(log, nextAvailableLSN-global_offset, SEEK_SET) != nextAvailableLSN-global_offset) {
if(myFseek(logFILE, nextAvailableLSN-global_offset, SEEK_SET) != nextAvailableLSN-global_offset) {
perror("Error repositioning log");
abort();
}
@ -289,7 +331,7 @@ int openLogWriter() {
flushedLSN_internal = nextAvailableLSN;
log_crc = 0;
return 0;
return log;
}
@ -312,7 +354,7 @@ int openLogWriter() {
*/
static int writeLogEntryUnlocked(LogEntry * e) {
static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) {
const lsn_t size = sizeofLogEntry(e);
@ -329,12 +371,13 @@ static int writeLogEntryUnlocked(LogEntry * e) {
// assert(e->LSN == (current_offset + global_offset));
// off_t oldOffset = ftell(log);
size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, log);
size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, logFILE);
if(nmemb != 1) {
if(feof(log)) { abort(); /* feof makes no sense here */ }
if(ferror(log)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log));
if(feof(logFILE)) { abort(); /* feof makes no sense here */ }
if(ferror(logFILE)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n",
ferror(logFILE));
abort();
}
abort();
@ -348,15 +391,15 @@ static int writeLogEntryUnlocked(LogEntry * e) {
// current_offset = ftell(log);
// assert(e->LSN == (current_offset + global_offset - sizeof(lsn_t)));
nmemb = fwrite(e, size, 1, log);
nmemb = fwrite(e, size, 1, logFILE);
// current_offset = ftell(log);
// assert(e->LSN == current_offset + global_offset - sizeof(lsn_t) - size);
if(nmemb != 1) {
if(feof(log)) { abort(); /* feof makes no sense here */ }
if(ferror(log)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log));
if(feof(logFILE)) { abort(); /* feof makes no sense here */ }
if(ferror(logFILE)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(logFILE));
abort();
}
abort();
@ -366,20 +409,21 @@ static int writeLogEntryUnlocked(LogEntry * e) {
pthread_mutex_lock(&nextAvailableLSN_mutex);
assert(nextAvailableLSN == e->LSN);
nextAvailableLSN = nextEntry_LogWriter(e);
nextAvailableLSN = nextEntry(e);
pthread_mutex_unlock(&nextAvailableLSN_mutex);
return 0;
}
int writeLogEntry(LogEntry * e) {
static int writeLogEntry_LogWriter(stasis_log_t* log, LogEntry * e) {
pthread_mutex_lock(&log_write_mutex);
int ret = writeLogEntryUnlocked(e);
int ret = writeLogEntryUnlocked(log, e);
pthread_mutex_unlock(&log_write_mutex);
return ret;
}
long sizeofInternalLogEntry_LogWriter(const LogEntry * e) {
static lsn_t sizeofInternalLogEntry_LogWriter(stasis_log_t * log,
const LogEntry * e) {
return sizeof(struct __raw_log_entry);
}
@ -390,7 +434,7 @@ static void syncLogInternal() {
newFlushedLSN = nextAvailableLSN;
if(newFlushedLSN > flushedLSN_internal) {
pthread_mutex_unlock(&nextAvailableLSN_mutex);
fflush(log);
fflush(logFILE);
writelock(flushedLSN_lock, 0);
}
if(newFlushedLSN > flushedLSN_internal) {
@ -400,7 +444,7 @@ static void syncLogInternal() {
}
void syncLog_LogWriter() {
static void syncLog_LogWriter(stasis_log_t * log) {
lsn_t newFlushedLSN;
pthread_mutex_lock(&log_write_mutex);
@ -410,14 +454,14 @@ void syncLog_LogWriter() {
pthread_mutex_unlock(&nextAvailableLSN_mutex);
LogEntry * crc_entry = allocCommonLogEntry(log_crc, -1, INTERNALLOG);
writeLogEntryUnlocked(crc_entry);
writeLogEntryUnlocked(log, crc_entry);
free(crc_entry);
// Reset log_crc to zero each time a crc entry is written.
log_crc = 0;
pthread_mutex_unlock(&log_write_mutex);
// Since we opened the logfile with O_SYNC, fflush() is sufficient.
fflush(log);
fflush(logFILE);
// update flushedLSN after fflush returns.
writelock(flushedLSN_lock, 0);
@ -431,7 +475,7 @@ void syncLog_LogWriter() {
writeunlock(flushedLSN_lock);
}
lsn_t flushedLSN_LogWriter() {
static lsn_t flushedLSN_LogWriter(stasis_log_t* log) {
readlock(flushedLSN_lock, 0);
lsn_t ret = flushedLSN_stable;
readunlock(flushedLSN_lock);
@ -444,20 +488,20 @@ static lsn_t flushedLSNInternal() {
return ret;
}
void closeLogWriter() {
static int close_LogWriter(stasis_log_t* log) {
/* Get the whole thing to the disk before closing it. */
syncLog_LogWriter();
syncLog_LogWriter(log);
fclose(log);
fclose(logFILE);
close(roLogFD);
log = NULL;
logFILE = NULL;
roLogFD = 0;
flushedLSN_stable = 0;
flushedLSN_internal = 0;
nextAvailableLSN = 0;
global_offset = 0;
/* Free locks. */
deletelock(flushedLSN_lock);
@ -465,15 +509,17 @@ void closeLogWriter() {
pthread_mutex_destroy(&log_write_mutex);
pthread_mutex_destroy(&nextAvailableLSN_mutex);
pthread_mutex_destroy(&truncateLog_mutex);
free(buffer);
free(buffer);
buffer = 0;
log_crc = 0;
free(log);
return 0;
}
void deleteLogWriter() {
remove(LOG_FILE);
}
lsn_t debug_lsn = -1;
static LogEntry * readLogEntry() {
LogEntry * ret = 0;
lsn_t size;
@ -549,7 +595,7 @@ static LogEntry * readLogEntry() {
}
//static lsn_t lastPosition_readLSNEntry = -1;
LogEntry * readLSNEntry_LogWriter(const lsn_t LSN) {
const LogEntry * readLSNEntry_LogWriter(stasis_log_t * log, const lsn_t LSN) {
LogEntry * ret;
pthread_mutex_lock(&nextAvailableLSN_mutex);
@ -591,12 +637,36 @@ LogEntry * readLSNEntry_LogWriter(const lsn_t LSN) {
return ret;
}
/**
Truncates the log file. In the single-threaded case, this works as
follows:
int truncateLog_LogWriter(lsn_t LSN) {
First, the LSN passed to this function, minus sizeof(lsn_t) is
written to a new file, called logfile.txt~. (If logfile.txt~
already exists, then it is truncated.)
Next, the contents of the log, starting with the LSN passed into
this function are copied to logfile.txt~
Finally, logfile.txt~ is moved on top of logfile.txt
As long as the move system call is atomic, this function should
maintain the system's durability.
The multithreaded case is a bit more complicated, as we need
to deal with latching:
With no lock, copy the log. Upon completion, if the log has grown,
then copy the part that remains. Next, obtain a read/write latch
on the logfile, and copy any remaining portions of the log.
Perform the move, and release the latch.
*/
int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
FILE *tmpLog;
const LogEntry * le;
LogHandle lh;
LogHandle* lh;
lsn_t size;
@ -634,23 +704,23 @@ int truncateLog_LogWriter(lsn_t LSN) {
*/
pthread_mutex_lock(&log_write_mutex);
fflush(log);
fflush(logFILE);
lh = getLSNHandle(LSN);
lh = getLSNHandle(log, LSN);
lsn_t lengthOfCopiedLog = 0;
int firstInternalEntry = 1;
lsn_t nextLSN = 0;
while((le = nextInLog(&lh))) {
while((le = nextInLog(lh))) {
size = sizeofLogEntry(le);
if(nextLSN) {
assert(nextLSN == le->LSN);
}
nextLSN = nextEntry_LogWriter(le);
nextLSN = nextEntry_LogWriter(log, le);
if(firstInternalEntry && le->type == INTERNALLOG) {
LogEntry * firstCRC = malloc(size);
memcpy(firstCRC, le, size);
FreeLogEntry(le);
freeLogEntry(le);
firstCRC->prevLSN = 0;
le = firstCRC;
}
@ -663,9 +733,10 @@ int truncateLog_LogWriter(lsn_t LSN) {
free((void*)le); // remove const qualifier + free
firstInternalEntry = 0;
} else {
FreeLogEntry(le);
freeLogEntry(le);
}
}
}
freeLogHandle(lh);
LogEntry * crc_entry = allocCommonLogEntry(0, -1, INTERNALLOG);
assert(crc_entry->prevLSN == 0);
@ -677,7 +748,7 @@ int truncateLog_LogWriter(lsn_t LSN) {
size = sizeofLogEntry(crc_entry);
nextAvailableLSN = nextEntry_LogWriter(crc_entry);
nextAvailableLSN = nextEntry_LogWriter(log, crc_entry);
log_crc = 0;
@ -705,15 +776,15 @@ int truncateLog_LogWriter(lsn_t LSN) {
/* closeLogWriter calls sync, and does some extra stuff that we don't want, so we
basicly re-implement closeLogWriter and openLogWriter here...
*/
fclose(log);
fclose(logFILE);
close(roLogFD);
fclose(tmpLog);
fclose(tmpLog);
if(rename(LOG_FILE_SCRATCH, LOG_FILE)) {
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
pthread_mutex_unlock(&truncateLog_mutex);
perror("Error replacing old log file with new log file");
return LLADD_IO_ERROR;
} else {
@ -726,9 +797,9 @@ int truncateLog_LogWriter(lsn_t LSN) {
perror("Couldn't open log file for append.\n");
abort();
}
log = fdopen(logFD, "w+");
logFILE = fdopen(logFD, "w+");
if (log==NULL) {
if (logFILE==NULL) {
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
pthread_mutex_unlock(&truncateLog_mutex);
@ -738,12 +809,12 @@ int truncateLog_LogWriter(lsn_t LSN) {
return LLADD_IO_ERROR;
}
setbuffer(log, buffer, BUFSIZE);
setbuffer(logFILE, buffer, BUFSIZE);
global_offset = LSN - sizeof(lsn_t);
lsn_t logPos;
if((logPos = myFseek(log, 0, SEEK_END)) != nextAvailableLSN - global_offset) {
if((logPos = myFseek(logFILE, 0, SEEK_END)) != nextAvailableLSN - global_offset) {
if(logPos == -1) {
perror("Truncation couldn't seek");
} else {
@ -761,8 +832,6 @@ int truncateLog_LogWriter(lsn_t LSN) {
return LLADD_IO_ERROR;
}
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
pthread_mutex_unlock(&truncateLog_mutex);
@ -771,8 +840,8 @@ int truncateLog_LogWriter(lsn_t LSN) {
}
lsn_t firstLogEntry() {
assert(log);
lsn_t firstLogEntry_LogWriter(stasis_log_t* log) {
assert(logFILE);
pthread_mutex_lock(&log_read_mutex); // for global offset...
lsn_t ret = global_offset + sizeof(lsn_t);
pthread_mutex_unlock(&log_read_mutex);

View file

@ -61,147 +61,37 @@ terms specified in this license.
#include <stasis/logger/inMemoryLog.h>
#include <stasis/page.h>
/**
@todo loggerType should go away.
*/
#ifdef USE_LOGGER
int loggerType = USE_LOGGER;
#else
int loggerType = LOG_TO_FILE;
#endif
/**
@todo stasis_log_file should be in transactional2.c, and not global
*/
stasis_log_t* stasis_log_file = 0;
static int pendingCommits;
static int syncLogCount;
long LoggerSizeOfInternalLogEntry(const LogEntry * e) {
if(loggerType == LOG_TO_FILE) {
return sizeofInternalLogEntry_LogWriter(e);
} else if (loggerType == LOG_TO_MEMORY) {
return sizeofInternalLogEntry_InMemoryLog(e);
} else {
// we dont have an appropriate implementation, or weren't initialized...
abort();
}
}
void LogWrite(LogEntry * e) {
if(loggerType == LOG_TO_FILE) {
writeLogEntry(e);
} else if (loggerType == LOG_TO_MEMORY) {
writeLogEntry_InMemoryLog(e);
} else {
abort();
}
return;
}
int LogInit(int logType) {
loggerType = logType;
pendingCommits = 0;
syncLogCount = 0;
if(LOG_TO_FILE == logType) {
openLogWriter();
} else if(LOG_TO_MEMORY == logType) {
open_InMemoryLog();
} else {
return -1;
}
return 0;
}
int LogDeinit() {
if(LOG_TO_FILE == loggerType) {
closeLogWriter();
} else if(LOG_TO_MEMORY == loggerType) {
close_InMemoryLog();
} else {
abort();
}
return 0;
}
void LogTruncate(lsn_t lsn) {
if(LOG_TO_FILE == loggerType) {
truncateLog_LogWriter(lsn);
} else if(LOG_TO_MEMORY == loggerType) {
truncateLog_InMemoryLog(lsn);
} else {
abort();
}
}
lsn_t LogFlushedLSN() {
lsn_t ret;
if(LOG_TO_FILE == loggerType) {
ret = flushedLSN_LogWriter();
} else if(LOG_TO_MEMORY == loggerType) {
ret = flushedLSN_InMemoryLog();
} else {
abort();
}
return ret;
}
lsn_t LogTruncationPoint() {
lsn_t ret;
if(LOG_TO_FILE == loggerType) {
ret = firstLogEntry();
} else if(LOG_TO_MEMORY == loggerType) {
ret = firstLogEntry_InMemoryLog();
} else {
abort();
}
return ret;
}
const LogEntry * LogReadLSN(lsn_t lsn) {
LogEntry * ret;
if(LOG_TO_FILE == loggerType) {
ret = readLSNEntry_LogWriter(lsn);
} else if(LOG_TO_MEMORY == loggerType) {
ret = readLSNEntry_InMemoryLog(lsn);
} else {
abort();
}
return ret;
}
lsn_t LogNextEntry(const LogEntry * e) {
lsn_t ret;
if(LOG_TO_FILE == loggerType) {
ret = nextEntry_LogWriter(e);
} else if(LOG_TO_MEMORY == loggerType) {
ret = nextEntry_InMemoryLog(e);
} else {
abort();
}
return ret;
}
void FreeLogEntry(const LogEntry * e) {
if(LOG_TO_FILE == loggerType) {
free((void*)e);
} else if(LOG_TO_MEMORY == loggerType) {
free((void*)e);
} else {
abort();
}
}
TransactionLog LogTransBegin(int xid) {
TransactionLog LogTransBegin(stasis_log_t* log, int xid) {
TransactionLog tl;
tl.xid = xid;
DEBUG("Log Begin %d\n", xid);
tl.prevLSN = -1;
tl.recLSN = -1;
return tl;
}
static lsn_t LogTransCommon(TransactionLog * l, int type) {
static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) {
LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type);
lsn_t ret;
LogWrite(e);
log->write_entry(log, e);
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
@ -210,40 +100,101 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) {
ret = e->LSN;
FreeLogEntry(e);
freeLogEntry(e);
return ret;
}
static lsn_t LogTransCommonPrepare(TransactionLog * l) {
static lsn_t LogTransCommonPrepare(stasis_log_t* log, TransactionLog * l) {
LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN);
lsn_t ret;
DEBUG("Log prepare xid = %d prevlsn = %lld reclsn = %lld, %lld\n",e->xid,e->prevLSN,l->recLSN, getPrepareRecLSN(e));
LogWrite(e);
DEBUG("Log prepare xid = %d prevlsn = %lld reclsn = %lld, %lld\n",
e->xid, e->prevLSN, l->recLSN, getPrepareRecLSN(e));
log->write_entry(log, e);
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n",
e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
ret = e->LSN;
FreeLogEntry(e);
freeLogEntry(e);
return ret;
}
static void groupForce(lsn_t l) {
LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l,
Page * p, unsigned int op,
const byte * arg, size_t arg_size) {
LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op,
p ? p->id : INVALID_PAGE,
arg, arg_size);
log->write_entry(log, e);
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size);
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
return e;
}
lsn_t LogCLR(stasis_log_t* log, const LogEntry * old_e) {
LogEntry * e = allocCLRLogEntry(old_e);
log->write_entry(log, e);
DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid,
e->LSN, LSN, prevLSN);
lsn_t ret = e->LSN;
freeLogEntry(e);
return ret;
}
lsn_t LogDummyCLR(stasis_log_t* log, int xid, lsn_t prevLSN,
lsn_t compensatedLSN) {
LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP,
INVALID_PAGE, NULL, 0);
e->LSN = compensatedLSN;
lsn_t ret = LogCLR(log, e);
freeLogEntry(e);
return ret;
}
static void groupForce(stasis_log_t* log, lsn_t lsn);
lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l) {
lsn_t lsn = LogTransCommon(log, l, XCOMMIT);
groupForce(log, lsn);
return lsn;
}
lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) {
return LogTransCommon(log, l, XABORT);
}
lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) {
lsn_t lsn = LogTransCommonPrepare(log, l);
groupForce(log, lsn);
return lsn;
}
void LogForce(stasis_log_t* log, lsn_t lsn) {
groupForce(log, lsn);
}
static void groupForce(stasis_log_t* log, lsn_t lsn) {
static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
struct timeval now;
struct timespec timeout;
pthread_mutex_lock(&check_commit);
if(LogFlushedLSN() >= l) {
if(log->first_unstable_lsn(log) >= lsn) {
pthread_mutex_unlock(&check_commit);
return;
}
@ -264,92 +215,25 @@ static void groupForce(lsn_t l) {
int retcode;
while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) {
if(retcode != 0) {
printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by a signal in groupCommit(). Acting as though it timed out.\n", __FILE__, __LINE__);
printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by "
"a signal in groupCommit(). Acting as though it timed out.\n",
__FILE__, __LINE__);
break;
}
if(LogFlushedLSN() >= l) {
if(log->first_unstable_lsn(log) >= lsn) {
pendingCommits--;
pthread_mutex_unlock(&check_commit);
return;
}
}
}
if(LogFlushedLSN() < l) {
if(LOG_TO_FILE == loggerType) {
syncLog_LogWriter();
} else if (LOG_TO_MEMORY == loggerType) {
syncLog_InMemoryLog();
} else {
abort();
}
assert(LogFlushedLSN() >= lsn);
syncLogCount++;
}
if(log->first_unstable_lsn(log) < lsn) {
log->force_tail(log);
assert(log->first_unstable_lsn(log) >= lsn);
pthread_cond_broadcast(&tooFewXacts);
}
assert(LogFlushedLSN() >= l);
assert(log->first_unstable_lsn(log) >= lsn);
pendingCommits--;
pthread_mutex_unlock(&check_commit);
return;
}
static lsn_t groupCommit(TransactionLog * l) {
lsn_t ret = LogTransCommon(l, XCOMMIT);
groupForce(ret);
return ret;
}
static lsn_t groupPrepare(TransactionLog * l) {
lsn_t ret = LogTransCommonPrepare(l);
groupForce(ret);
return ret;
}
void LogForce(lsn_t lsn) {
groupForce(lsn);
}
lsn_t LogTransCommit(TransactionLog * l) {
return groupCommit(l);
}
lsn_t LogTransAbort(TransactionLog * l) {
return LogTransCommon(l, XABORT);
}
lsn_t LogTransPrepare(TransactionLog * l) {
return groupPrepare(l);
}
LogEntry * LogUpdate(TransactionLog * l, Page * p, unsigned int op,
const byte * arg, size_t arg_size) {
LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op,
p ? p->id : INVALID_PAGE,
arg, arg_size);
LogWrite(e);
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size);
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
return e;
}
lsn_t LogCLR(const LogEntry * old_e) {
LogEntry * e = allocCLRLogEntry(old_e);
LogWrite(e);
DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid,
e->LSN, LSN, prevLSN);
lsn_t ret = e->LSN;
FreeLogEntry(e);
return ret;
}
lsn_t LogDummyCLR(int xid, lsn_t prevLSN, lsn_t compensatedLSN) {
LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP,
INVALID_PAGE, NULL, 0);
e->LSN = compensatedLSN;
lsn_t ret = LogCLR(e);
FreeLogEntry(e);
return ret;
}

View file

@ -55,7 +55,7 @@ terms specified in this license.
recordid prepare_bogus_rec = { 0, 0, 0};
static int op_prepare(const LogEntry * e, Page * p) {
LogForce(e->LSN);
LogForce(stasis_log_file, e->LSN);
return 0;
}

View file

@ -160,7 +160,7 @@ void regionsInit() {
writelock(p->rwlatch,0);
op_alloc_boundary_tag(e,p);
unlock(p->rwlatch);
FreeLogEntry(e);
freeLogEntry(e);
}
holding_mutex = 0;
releasePage(p);

View file

@ -8,8 +8,8 @@
XXX rawPageInferMetadata is wrong; setting lsn to LogFlushedLSN() breaks
recovery.
*/
void rawPageInferMetadata(Page * p) {
p->LSN = LogFlushedLSN();
void rawPageInferMetadata(Page * p) {
p->LSN = stasis_log_file->first_unstable_lsn(stasis_log_file);
}
byte* rawPageGetData(int xid, Page * p) {

View file

@ -30,7 +30,7 @@ static void phWrite(Page * ret) {
// or we'll deadlock.
writelock(ret->rwlatch,0);
stasis_page_flushed(ret);
LogForce(ret->LSN);
LogForce(stasis_log_file, ret->LSN);
int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE);
if(err) {
printf("Couldn't write to page file: %s\n", strerror(err));

View file

@ -49,20 +49,20 @@ static pthread_mutex_t rollback_mutex = PTHREAD_MUTEX_INITIALIZER;
no longer reads the pages in, there's no longer any reason to build
the list of dirty pages.
*/
static void Analysis() {
static void Analysis(stasis_log_t* log) {
DEBUG("Recovery: Analysis\n");
const LogEntry * e;
LogHandle lh = getLogHandle();
LogHandle* lh = getLogHandle(log);
/** After recovery, we need to know what the highest XID in the
log was so that we don't accidentally reuse XID's. This keeps
track of that value. */
int highestXid = 0;
while((e = nextInLog(&lh))) {
while((e = nextInLog(lh))) {
lsn_t * xactLSN = (lsn_t*)pblHtLookup(transactionLSN, &(e->xid), sizeof(int));
@ -148,8 +148,9 @@ static void Analysis() {
default:
abort();
}
FreeLogEntry(e);
freeLogEntry(e);
}
freeLogHandle(lh);
stasis_transaction_table_max_transaction_id_set(highestXid);
}
@ -170,13 +171,13 @@ static void Analysis() {
Y (NTA replaces physical undo)
*/
static void Redo() {
LogHandle lh = getLogHandle();
static void Redo(stasis_log_t* log) {
LogHandle* lh = getLogHandle(log);
const LogEntry * e;
DEBUG("Recovery: Redo\n");
while((e = nextInLog(&lh))) {
while((e = nextInLog(lh))) {
// Is this log entry part of a transaction that needs to be redone?
if(pblHtLookup(transactionLSN, &(e->xid), sizeof(int)) != NULL) {
if(e->type != INTERNALLOG) {
@ -194,7 +195,9 @@ static void Redo() {
} break;
case CLRLOG:
{
const LogEntry *ce = LogReadLSN(((CLRLogEntry*)e)->clr.compensated_lsn);
const LogEntry *ce =
log->read_entry(log,((CLRLogEntry*)e)->clr.compensated_lsn);
if(ce->update.page == INVALID_PAGE) {
// logical redo of end of NTA; no-op
} else {
@ -207,7 +210,7 @@ static void Redo() {
unlock(p->rwlatch);
releasePage(p);
}
FreeLogEntry(ce);
freeLogEntry(ce);
} break;
case XCOMMIT:
{
@ -233,11 +236,13 @@ static void Redo() {
abort();
}
}
FreeLogEntry(e);
freeLogEntry(e);
}
freeLogHandle(lh);
}
static void Undo(int recovery) {
LogHandle lh;
static void Undo(stasis_log_t* log, int recovery) {
LogHandle* lh;
DEBUG("Recovery: Undo\n");
@ -247,7 +252,7 @@ static void Undo(int recovery) {
DEBUG("Undoing LSN %ld\n", (long int)rollback);
lh = getLSNHandle(rollback);
lh = getLSNHandle(log, rollback);
int thisXid = -1;
@ -255,7 +260,7 @@ static void Undo(int recovery) {
int reallyAborted = 0;
// Have we reached a XPREPARE that we should pay attention to?
int prepared = 0;
while((!prepared) && (e = previousInTransaction(&lh))) {
while((!prepared) && (e = previousInTransaction(lh))) {
thisXid = e->xid;
switch(e->type) {
case UPDATELOG:
@ -278,7 +283,7 @@ static void Undo(int recovery) {
}
// Log a CLR for this entry
lsn_t clr_lsn = LogCLR(e);
lsn_t clr_lsn = LogCLR(log, e);
DEBUG("logged clr\n");
stasis_transaction_table_roll_forward(e->xid, e->LSN, e->prevLSN);
@ -296,7 +301,8 @@ static void Undo(int recovery) {
}
case CLRLOG:
{
const LogEntry * ce = LogReadLSN(((CLRLogEntry*)e)->clr.compensated_lsn);
const LogEntry * ce
= log->read_entry(log, ((CLRLogEntry*)e)->clr.compensated_lsn);
if(ce->update.page == INVALID_PAGE) {
DEBUG("logical clr\n");
undoUpdate(ce, 0, 0); // logical undo; effective LSN doesn't matter
@ -304,7 +310,7 @@ static void Undo(int recovery) {
DEBUG("physical clr: op %d lsn %lld\n", ce->update.funcID, ce->LSN);
// no-op. Already undone during redo. This would redo the original op.
}
FreeLogEntry(ce);
freeLogEntry(ce);
}
break;
case XABORT:
@ -335,7 +341,7 @@ static void Undo(int recovery) {
e->type, e->xid, e->LSN);
abort();
}
FreeLogEntry(e);
freeLogEntry(e);
}
if(!prepared) {
if(recovery) {
@ -345,18 +351,19 @@ static void Undo(int recovery) {
globalLockManager.abort(thisXid);
}
}
freeLogHandle(lh);
}
}
void InitiateRecovery() {
void stasis_recovery_initiate(stasis_log_t* log) {
transactionLSN = pblHtCreate();
DEBUG("Analysis started\n");
Analysis();
Analysis(log);
DEBUG("Redo started\n");
Redo();
Redo(log);
DEBUG("Undo started\n");
TallocPostInit();
Undo(1);
Undo(log,1);
DEBUG("Recovery complete.\n");
for(void * it = pblHtFirst(transactionLSN); it; it = pblHtNext(transactionLSN)) {
@ -369,7 +376,7 @@ void InitiateRecovery() {
}
void undoTrans(TransactionLog transaction) {
void undoTrans(stasis_log_t* log, TransactionLog transaction) {
pthread_mutex_lock(&rollback_mutex);
assert(!rollbackLSNs);
@ -381,7 +388,7 @@ void undoTrans(TransactionLog transaction) {
/* Nothing to undo. (Happens for read-only xacts.) */
}
Undo(0);
Undo(log, 0);
if(rollbackLSNs) {
destroyList(&rollbackLSNs);
}

View file

@ -18,6 +18,9 @@
#include <stasis/logger/logger2.h>
#include <stasis/logger/logWriter.h>
#include <stasis/logger/inMemoryLog.h>
#include <stasis/truncation.h>
#include <stasis/io/handle.h>
#include <stasis/blobManager.h> // XXX remove this, move Tread() to set.c
@ -137,7 +140,13 @@ int Tinit() {
setupOperationsTable();
dirtyPagesInit();
LogInit(loggerType);
if(LOG_TO_FILE == loggerType) {
stasis_log_file = openLogWriter();
} else if(LOG_TO_MEMORY == loggerType) {
stasis_log_file = open_InMemoryLog();
} else {
assert(stasis_log_file != NULL);
}
stasis_page_init();
#ifndef HAVE_O_DIRECT
@ -223,12 +232,13 @@ int Tinit() {
consumer_init();
setupLockManagerCallbacksNil();
//setupLockManagerCallbacksPage();
InitiateRecovery();
truncationInit();
stasis_recovery_initiate(stasis_log_file);
stasis_truncation_init();
if(stasis_truncation_automatic) {
autoTruncate(); // should this be before InitiateRecovery?
// should this be before InitiateRecovery?
stasis_truncation_thread_start(stasis_log_file);
}
return 0;
}
@ -265,7 +275,7 @@ int Tbegin() {
pthread_mutex_unlock(&transactional_2_mutex);
XactionTable[index] = LogTransBegin(xidCount_tmp);
XactionTable[index] = LogTransBegin(stasis_log_file, xidCount_tmp);
if(globalLockManager.begin) { globalLockManager.begin(XactionTable[index].xid); }
@ -285,11 +295,12 @@ static compensated_function void TactionHelper(int xid,
writelock(p->rwlatch,0);
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, op, dat, datlen);
e = LogUpdate(stasis_log_file, &XactionTable[xid % MAX_TRANSACTIONS],
p, op, dat, datlen);
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
doUpdate(e, p);
FreeLogEntry(e);
freeLogEntry(e);
unlock(p->rwlatch);
}
@ -358,7 +369,7 @@ int Tcommit(int xid) {
pthread_mutex_unlock(&transactional_2_mutex);
#endif
lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]);
lsn = LogTransCommit(stasis_log_file, &XactionTable[xid % MAX_TRANSACTIONS]);
if(globalLockManager.commit) { globalLockManager.commit(xid); }
allocTransactionCommit(xid);
@ -378,7 +389,7 @@ int Tprepare(int xid) {
assert(xid >= 0);
off_t i = xid % MAX_TRANSACTIONS;
assert(XactionTable[i].xid == xid);
LogTransPrepare(&XactionTable[i]);
LogTransPrepare(stasis_log_file, &XactionTable[i]);
return 0;
}
@ -389,10 +400,10 @@ int Tabort(int xid) {
TransactionLog * t =&XactionTable[xid%MAX_TRANSACTIONS];
assert(t->xid == xid);
lsn = LogTransAbort(t);
lsn = LogTransAbort(stasis_log_file, t);
/** @todo is the order of the next two calls important? */
undoTrans(*t);
undoTrans(stasis_log_file, *t);
if(globalLockManager.abort) { globalLockManager.abort(xid); }
allocTransactionAbort(xid);
@ -419,7 +430,7 @@ int Tdeinit() {
}
}
assert( numActiveXactions == 0 );
truncationDeinit();
stasis_truncation_deinit();
TnaiveHashDeinit();
TallocDeinit();
bufDeinit();
@ -431,7 +442,7 @@ int Tdeinit() {
slow_close = 0;
}
stasis_page_deinit();
LogDeinit();
stasis_log_file->deinit(stasis_log_file);
dirtyPagesDeinit();
initted = 0;
@ -443,7 +454,7 @@ int TuncleanShutdown() {
// We're simulating a crash; don't complain when writes get lost,
// and active transactions get rolled back.
stasis_suppress_unclean_shutdown_warnings = 1;
truncationDeinit();
stasis_truncation_deinit();
TnaiveHashDeinit();
simulateBufferManagerCrash();
if(slow_pfile) {
@ -452,7 +463,7 @@ int TuncleanShutdown() {
slow_close = 0;
}
stasis_page_deinit();
LogDeinit();
stasis_log_file->deinit(stasis_log_file);
numActiveXactions = 0;
dirtyPagesDeinit();
@ -577,14 +588,16 @@ typedef struct {
*/
void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
assert(xid >= 0);
LogEntry * e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], NULL, op, dat, datSize);
LogEntry * e = LogUpdate(stasis_log_file,
&XactionTable[xid % MAX_TRANSACTIONS],
NULL, op, dat, datSize);
DEBUG("Begin Nested Top Action e->LSN: %ld\n", e->LSN);
stasis_nta_handle * h = malloc(sizeof(stasis_nta_handle));
h->prev_lsn = e->prevLSN;
h->compensated_lsn = e->LSN;
FreeLogEntry(e);
freeLogEntry(e);
return h;
}
@ -597,7 +610,8 @@ lsn_t TendNestedTopAction(int xid, void * handle) {
assert(xid >= 0);
// Write a CLR.
lsn_t clrLSN = LogDummyCLR(xid, h->prev_lsn, h->compensated_lsn);
lsn_t clrLSN = LogDummyCLR(stasis_log_file, xid,
h->prev_lsn, h->compensated_lsn);
// Ensure that the next action in this transaction points to the CLR.
XactionTable[xid % MAX_TRANSACTIONS].prevLSN = clrLSN;

View file

@ -153,11 +153,11 @@ void dirtyPagesDeinit() {
pblHtDelete(dirtyPages);
dirtyPages = 0;
}
void truncationInit() {
void stasis_truncation_init() {
initialized = 1;
}
void truncationDeinit() {
void stasis_truncation_deinit() {
pthread_mutex_lock(&shutdown_mutex);
initialized = 0;
if(automaticallyTuncating) {
@ -171,76 +171,76 @@ void truncationDeinit() {
automaticallyTuncating = 0;
}
static void* periodicTruncation(void * ignored) {
static void* stasis_truncation_thread_worker(void* logp) {
stasis_log_t * log = logp;
pthread_mutex_lock(&shutdown_mutex);
while(initialized) {
if(LogFlushedLSN() - LogTruncationPoint() > TARGET_LOG_SIZE) {
truncateNow(0);
while(initialized) {
if(log->first_unstable_lsn(log) - log->truncation_point(log)
> TARGET_LOG_SIZE) {
stasis_truncation_truncate(log, 0);
}
struct timeval now;
struct timespec timeout;
int timeret = gettimeofday(&now, 0);
assert(0 == timeret);
timeout.tv_sec = now.tv_sec;
timeout.tv_nsec = now.tv_usec;
timeout.tv_sec += TRUNCATE_INTERVAL;
pthread_cond_timedwait(&shutdown_cond, &shutdown_mutex, &timeout);
}
pthread_mutex_unlock(&shutdown_mutex);
return (void*)0;
}
void autoTruncate() {
void stasis_truncation_thread_start(stasis_log_t* log) {
assert(!automaticallyTuncating);
automaticallyTuncating = 1;
pthread_create(&truncationThread, 0, &periodicTruncation, 0);
pthread_create(&truncationThread, 0, &stasis_truncation_thread_worker, log);
}
int truncateNow(int force) {
int stasis_truncation_truncate(stasis_log_t* log, int force) {
// *_minRecLSN() used to return the same value as flushed if
//there were no outstanding transactions, but flushed might
//not point to the front of a log entry... now, both return
//LSN_T_MAX if there are no outstanding transactions / no
//dirty pages.
lsn_t page_rec_lsn = dirtyPages_minRecLSN();
lsn_t xact_rec_lsn = transactions_minRecLSN();
lsn_t flushed_lsn = LogFlushedLSN();
lsn_t flushed_lsn = log->first_unstable_lsn(log);
lsn_t rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
lsn_t log_trunc = LogTruncationPoint();
if(force || (xact_rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
lsn_t log_trunc = log->truncation_point(log);
if(force || (xact_rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
//fprintf(stderr, "xact = %ld \t log = %ld\n", xact_rec_lsn, log_trunc);
if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
// fprintf(stderr, "Truncating now. rec_lsn = %ld, log_trunc = %ld\n", rec_lsn, log_trunc);
// fprintf(stderr, "Truncating to rec_lsn = %ld\n", rec_lsn);
forcePages();
LogTruncate(rec_lsn);
log->truncate(log, rec_lsn);
return 1;
} else {
lsn_t flushed = LogFlushedLSN();
if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) {
} else {
lsn_t flushed = log->first_unstable_lsn(log);
if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) {
//fprintf(stderr, "Flushing dirty buffers: rec_lsn = %ld log_trunc = %ld flushed = %ld\n", rec_lsn, log_trunc, flushed);
dirtyPages_flush();
page_rec_lsn = dirtyPages_minRecLSN();
rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
//fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn);
forcePages();
LogTruncate(rec_lsn);
log->truncate(log, rec_lsn);
return 1;
} else {
} else {
return 0;
}
}

View file

@ -1,16 +1,8 @@
#ifndef __INMEMORYLOG
#define __INMEMORYLOG
#include <stasis/logger/logger2.h>
#ifndef __INMEMORYLOG
#define __INMEMORYLOG 1
stasis_log_t* open_InMemoryLog();
int open_InMemoryLog();
int writeLogEntry_InMemoryLog(LogEntry * e);
lsn_t flushedLSN_InMemoryLog();
void syncLog_InMemoryLog();
int truncateLog_InMemoryLog(lsn_t lsn);
lsn_t firstLogEntry_InMemoryLog();
void close_InMemoryLog();
long sizeofInternalLogEntry_InMemoryLog(const LogEntry * e);
LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn);
lsn_t nextEntry_InMemoryLog(const LogEntry * e);
#endif

View file

@ -123,10 +123,14 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
@return a LogEntry that should be freed with free().
*/
LogEntry * allocCLRLogEntry(const LogEntry * e);
/**
/**
@param e a log entry returned from one of the alloc???LogEntry functions.
*/
void freeLogEntry(const LogEntry * e);
/**
@return the length, in bytes, of e.
*/
long sizeofLogEntry(const LogEntry * e);
lsn_t sizeofLogEntry(const LogEntry * e);
/**
@return the operation's arguments.
*/

View file

@ -47,6 +47,8 @@ terms specified in this license.
BEGIN_C_DECLS
typedef struct LogHandle LogHandle;
/**
@file
A simple data structure that allows forward iterations over
@ -65,13 +67,16 @@ BEGIN_C_DECLS
*/
/** Returns a logHandle pointing at the first log entry in the log. */
LogHandle getLogHandle();
LogHandle* getLogHandle(stasis_log_t* log);
/** Returns a logHandle pointing at lsn. */
LogHandle getLSNHandle(lsn_t lsn);
LogHandle* getLSNHandle(stasis_log_t* log, lsn_t lsn);
/** Returns a 'guarded log handle'. This handle executes a callback
function on each entry it encounters. If the guard returns 0,
then it's iterator terminates. Otherwise, it behaves normally. */
LogHandle getGuardedHandle(lsn_t lsn, guard_fcn_t * f, void * guard_state);
LogHandle* getGuardedHandle(stasis_log_t* log, lsn_t lsn,
guard_fcn_t * f, void * guard_state);
void freeLogHandle(LogHandle* lh);
/**
@return a pointer to the next log entry in the log, or NULL if at

View file

@ -71,107 +71,29 @@ terms specified in this license.
#define __LOGWRITER_H__
#include <stasis/common.h>
#include <stasis/logger/logger2.h>
#include <stasis/logger/logEntry.h>
BEGIN_C_DECLS
/**
start a new log stream by opening the log file for reading
returns 0 on success, or an error code define above
Start a new log stream by opening the log file for reading. This
is stasis's default log implementation, and uses safe writes to
perform truncation.
@return NULL on error
*/
int openLogWriter();
/**
@param e Pointer to a log entry. After the call, e->LSN will be set appropriately. If e's xid is set to -1, then this call has no effect (and e's LSN will be set to -1.)
returns 0 on success, or an error code defined above
*/
int writeLogEntry(LogEntry * e);
/**
flush the entire log (tail) that is currently in memory to disk
*/
void syncLog_LogWriter();
/**
Return the highest LSN that is known to be on disk. (Currently, we
only know if an LSN is on disk if we've written that LSN before a
call to syncLog().
Note: This function might not return an LSN corresponding to a real
log entry, but it will definitely return one that is greater than
or equal to the LSN of a log entry that has been forced to disk,
and is less than the LSN of all log entries that might not have
been forced to disk.
*/
lsn_t flushedLSN_LogWriter();
/**
Truncates the log file. In the single-threaded case, this works as
follows:
First, the LSN passed to this function, minus sizeof(lsn_t) is
written to a new file, called logfile.txt~. (If logfile.txt~
already exists, then it is truncated.)
Next, the contents of the log, starting with the LSN passed into
this function are copied to logfile.txt~
Finally, logfile.txt~ is moved on top of logfile.txt
As long as the move system call is atomic, this function should
maintain the system's durability.
The multithreaded case is a bit more complicated, as we need
to deal with latching:
With no lock, copy the log. Upon completion, if the log has grown,
then copy the part that remains. Next, obtain a read/write latch
on the logfile, and copy any remaining portions of the log.
Perform the move, and release the latch.
*/
int truncateLog_LogWriter(lsn_t);
/**
@return The LSN of the first entry in the log file. (If the file
is empty, this returns the LSN of the log entry that would be
created if writeLogEntry were called.)
*/
lsn_t firstLogEntry();
/**
Close the log stream
*/
void closeLogWriter();
stasis_log_t* openLogWriter();
/**
Actually deletes the log file that may have been written to disk! Danger!!
Only use after calling closeLogStream AND you are sure there are no active (or
future active) transactions!
Only use after calling closeLogStream AND you are sure there are no active
(or future active) transactions!
@todo This only exists because the tests use it...once the logfile
name isn't hardcoded, remove this function.
*/
void deleteLogWriter();
/**
Read a log entry at a particular LSN.
@param LSN the LSN of the entry that will be read.
*/
LogEntry * readLSNEntry_LogWriter(lsn_t LSN);
static inline lsn_t nextEntry_LogWriter(const LogEntry * e) {
return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t);
}
extern int logWriter_isDurable;
long sizeofInternalLogEntry_LogWriter(const LogEntry * e);
END_C_DECLS
#endif /* __LLADD_LOGGER_LOGWRITER_H */

View file

@ -46,9 +46,9 @@ terms specified in this license.
* Interface to Stasis' log file.
*
* @ingroup LOGGING_DISCIPLINE
*
*
* $Id$
*
*
*/
@ -64,110 +64,144 @@ terms specified in this license.
*/
typedef int (guard_fcn_t)(const LogEntry *, void *);
typedef struct {
/** The LSN of the log entry that we would return if next is called. */
lsn_t next_offset;
/** The LSN of the log entry that we would return if previous is called. */
lsn_t prev_offset;
guard_fcn_t * guard;
void * guard_state;
} LogHandle;
typedef struct stasis_log_t stasis_log_t;
/**
Contains the state needed by the logging layer to perform
operations on a transaction.
*/
typedef struct {
int xid;
int xid;
lsn_t prevLSN;
lsn_t recLSN;
} TransactionLog;
/**
This is the log implementation that is being used.
Before Stasis is intialized it will be set to a default value.
/**
This is the log implementation that is being used.
Before Stasis is intialized it will be set to a default value.
It may be changed before Tinit() is called by assigning to it.
The default can be overridden at compile time by defining
USE_LOGGER.
(eg: gcc ... -DUSE_LOGGER=LOG_TO_FOO)
@see constants.h for a list of recognized log implementations.
@see constants.h for a list of recognized log implementations.
(The constants are named LOG_TO_*)
*/
*/
extern int loggerType;
int LogInit(int logType);
int LogDeinit();
void LogForce(lsn_t lsn);
/**
@param lsn The first lsn that will be available after truncation.
*/
void LogTruncate(lsn_t lsn);
struct stasis_log_t {
/**
Needed by sizeofLogEntry
*/
lsn_t (*sizeof_internal_entry)(struct stasis_log_t* log, const LogEntry * e);
/**
Append a log entry to the end of the log.
@param e This call sets e->LSN to entry's offset.
@return 0 on success
*/
int (*write_entry)(struct stasis_log_t* log, LogEntry * e);
/**
Read a log entry, given its LSN.
@param lsn The lsn of the log entry to be read.
*/
const LogEntry* (*read_entry)(struct stasis_log_t* log, lsn_t lsn);
/**
Given a log entry, return the LSN of the next entry.
*/
lsn_t (*next_entry)(struct stasis_log_t* log, const LogEntry * e);
/**
This function returns the LSN of the most recent
log entry that has not been flushed to disk. If the entire log
is flushed, this function returns the LSN of the entry that will
be allocated the next time the log is appended to.
*/
lsn_t (*first_unstable_lsn)(struct stasis_log_t* log);
/**
Force any enqueued, unwritten entries to disk
*/
void (*force_tail)(struct stasis_log_t* log);
/**
@param lsn The first lsn that will be available after truncation.
@return 0 on success
*/
int (*truncate)(struct stasis_log_t* log, lsn_t lsn);
/**
Returns the LSN of the first entry of the log. If the log is
empty, return the LSN that will be assigned to the next log
entry that is appended to the log.
*/
lsn_t (*truncation_point)(struct stasis_log_t* log);
/**
@return 0 on success
*/
int (*deinit)(struct stasis_log_t* log);
int (*is_durable)();
};
/** This function is guaranteed to return the LSN of the most recent
log entry that has not been flushed to disk. (If the entire log
is flushed, this function returns the LSN of the entry that will
be allocated the next time the log is appended to. */
lsn_t LogFlushedLSN();
/** Returns the LSN of the first entry of the log, or the LSN of the
next to be allocated if the log is empty) */
lsn_t LogTruncationPoint();
/** Read a log entry, given its LSN.
@param lsn The lsn of the log entry to be read.
*/
const LogEntry * LogReadLSN(lsn_t lsn);
/**
Given a log entry, return the LSN of the next entry.
*/
lsn_t LogNextEntry(const LogEntry * e);
@todo get rid of this!
*/
extern stasis_log_t* stasis_log_file;
void LogForce(stasis_log_t* log, lsn_t lsn);
/**
Inform the logging layer that a new transaction has begun, and
obtain a handle.
*/
TransactionLog LogTransBegin(int xid);
TransactionLog LogTransBegin(stasis_log_t* log, int xid);
/**
Write a transaction PREPARE to the log tail. Blocks until the
prepare record is stable.
Write a transaction PREPARE to the log tail. Blocks until the
prepare record is stable.
@return the lsn of the prepare log entry
*/
lsn_t LogTransPrepare(TransactionLog * l);
/**
Write a transaction COMMIT to the log tail. Blocks until the commit
record is stable.
@return the lsn of the commit log entry.
@return the lsn of the prepare log entry
*/
lsn_t LogTransCommit(TransactionLog * l);
lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l);
/**
Write a transaction COMMIT to the log tail. Blocks until the commit
record is stable.
@return the lsn of the commit log entry.
*/
lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l);
/**
Write a transaction ABORT to the log tail. Does not force the log.
Write a transaction ABORT to the log tail. Does not force the log.
@return the lsn of the abort log entry.
@return the lsn of the abort log entry.
*/
lsn_t LogTransAbort(TransactionLog * l);
lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l);
/**
LogUpdate writes an UPDATELOG log record to the log tail. It also interprets
its operation argument to the extent necessary for allocating and laying out
the log entry. Finally, it updates the state of the parameter l.
Write a end transaction record @see XEND
@todo Implement LogEnd
*/
LogEntry * LogUpdate(TransactionLog * l, Page * p, unsigned int operation,
const byte * arg, size_t arg_size);
void LogEnd (stasis_log_t* log, TransactionLog * l);
/**
Any LogEntry that is returned by a function in logger2.h or
logHandle.h should be freed using this function.
@param e The log entry to be freed. (The "const" here is a hack
that allows LogReadLSN to return a const *.
LogUpdate writes an UPDATELOG log record to the log tail. It
also interprets its operation argument to the extent necessary for
allocating and laying out the log entry. Finally, it updates the
state of the parameter l.
*/
void FreeLogEntry(const LogEntry * e);
LogEntry * LogUpdate(stasis_log_t* log,
TransactionLog * l, Page * p, unsigned int operation,
const byte * arg, size_t arg_size);
/**
Write a compensation log record. These records are used to allow
@ -175,29 +209,14 @@ void FreeLogEntry(const LogEntry * e);
record the completion of undo operations, amongst other things.
@return the lsn of the CLR entry that was written to the log.
(Needed so that the lsn slot of the page in question can be
updated.)
(Needed so that the lsn slot of the page in question can be
updated.)
*/
lsn_t LogCLR(const LogEntry * e);
lsn_t LogCLR(stasis_log_t* log, const LogEntry * e);
lsn_t LogDummyCLR(int xid, lsn_t prev_lsn, lsn_t compensated_lsn);
lsn_t LogDummyCLR(stasis_log_t* log, int xid,
lsn_t prev_lsn, lsn_t compensated_lsn);
/**
Write a end transaction record @see XEND
@todo Implement LogEnd
*/
void LogEnd (TransactionLog * l);
/**
Needed by sizeofLogEntry
*/
long LoggerSizeOfInternalLogEntry(const LogEntry * e);
/**
For internal use only... This would be static, but it is called by
the test cases.
*/
void LogWrite(LogEntry * e);
#endif

View file

@ -15,6 +15,7 @@
*
* @param dat The page to be flushed to disk. No concurrent calls
* may have the same value of dat.
*
*/
extern void (*pageWrite)(Page * dat);

View file

@ -1,9 +1,10 @@
#ifndef __LLADD_RECOVERY2_H
#define __LLADD_RECOVERY2_H
void InitiateRecovery();
#include <stasis/logger/logger2.h>
void stasis_recovery_initiate(stasis_log_t* log);
/** This really doesn't belong in recovery.c, but there's so much code overlap, it doesn't make sense not to put it there. */
void undoTrans();
void undoTrans(stasis_log_t*log, TransactionLog transaction);
#endif

View file

@ -752,7 +752,6 @@ int stasis_transaction_table_forget(int xid);
*/
lsn_t transactions_minRecLSN();
/**
Report Stasis' current durability guarantees.

View file

@ -60,6 +60,8 @@ BEGIN_C_DECLS
#ifndef __LLADD_TRUNCATION_H
#define __LLADD_TRUNCATION_H 1
#include <stasis/logger/logger2.h>
void dirtyPagesInit();
void dirtyPagesDeinit();
@ -73,17 +75,17 @@ int dirtyPages_isDirty(Page * p);
*/
void dirtyPages_flushRange(pageid_t start, pageid_t stop);
void truncationInit();
void truncationDeinit();
void stasis_truncation_init();
void stasis_truncation_deinit();
/**
Spawn a periodic, demand-based log truncation thread.
*/
void autoTruncate();
void stasis_truncation_thread_start();
/**
Initiate a round of log truncation.
*/
int truncateNow(int force);
int stasis_truncation_truncate(stasis_log_t* log, int force);
END_C_DECLS
#endif

View file

@ -54,6 +54,7 @@ terms specified in this license.
#include <stasis/logger/logHandle.h>
#include <stasis/logger/logger2.h>
#include <stasis/logger/logWriter.h>
#include <stasis/logger/inMemoryLog.h>
#include <stasis/latches.h>
#include <sched.h>
@ -86,7 +87,7 @@ static void setup_log() {
rid.slot = 0;
rid.size = sizeof(unsigned long);
LogWrite(e);
stasis_log_file->write_entry(stasis_log_file,e);
prevLSN = e->LSN;
if(first) {
@ -94,28 +95,28 @@ static void setup_log() {
firstLSN = prevLSN;
}
f = LogReadLSN(prevLSN);
f = stasis_log_file->read_entry(stasis_log_file, prevLSN);
fail_unless(sizeofLogEntry(e) == sizeofLogEntry(f), "Log entry changed size!!");
fail_unless(0 == memcmp(e,f,sizeofLogEntry(e)), "Log entries did not agree!!");
FreeLogEntry (e);
FreeLogEntry (f);
freeLogEntry(e);
freeLogEntry(f);
e = allocUpdateLogEntry(prevLSN, xid, 1, rid.page, args, args_size);
LogWrite(e);
stasis_log_file->write_entry(stasis_log_file,e);
prevLSN = e->prevLSN;
// LogEntry * g = allocCLRLogEntry(100, 1, 200, rid, 0); //prevLSN);
LogEntry * g = allocCLRLogEntry(e); // XXX will probably break
g->prevLSN = firstLSN;
LogWrite(g);
stasis_log_file->write_entry(stasis_log_file,g);
assert (g->type == CLRLOG);
prevLSN = g->LSN;
FreeLogEntry (e);
FreeLogEntry (g);
prevLSN = g->LSN;
freeLogEntry (e);
freeLogEntry (g);
}
}
/**
@ -130,7 +131,7 @@ static void setup_log() {
In particular, logWriter checks to make sure that each log entry's
size matches the size that it recorded before the logEntry. Also,
when checking the 1000 of 3000 entries, this test uses
LogReadLSN, which tests the logWriter's ability to succesfully
log->read_entry, which tests the logWriter's ability to succesfully
manipulate LSN's.
@todo Test logHandle more thoroughly. (Still need to test the guard mechanism.)
@ -139,18 +140,20 @@ static void setup_log() {
START_TEST(loggerTest)
{
const LogEntry * e;
LogHandle h;
LogHandle* h;
int i = 0;
setup_log();
h = getLogHandle();
h = getLogHandle(stasis_log_file);
while((e = nextInLog(&h))) {
FreeLogEntry(e);
while((e = nextInLog(h))) {
freeLogEntry(e);
i++;
assert(i < 4000);
}
freeLogHandle(h);
assert(i == 3000);
deleteLogWriter();
@ -167,21 +170,22 @@ END_TEST
START_TEST(logHandleColdReverseIterator) {
const LogEntry * e;
setup_log();
LogHandle lh = getLogHandle();
LogHandle* lh = getLogHandle(stasis_log_file);
int i = 0;
while(((e = nextInLog(&lh)) && (i < 100)) ) {
FreeLogEntry(e);
while(((e = nextInLog(lh)) && (i < 100)) ) {
freeLogEntry(e);
i++;
}
i = 0;
lh = getLSNHandle(e->LSN);
while((e = previousInTransaction(&lh))) {
lh = getLSNHandle(stasis_log_file, e->LSN);
while((e = previousInTransaction(lh))) {
i++;
FreeLogEntry(e);
freeLogEntry(e);
}
freeLogHandle(lh);
assert(i <= 4); /* We should almost immediately hit a clr that goes to the beginning of the log... */
Tdeinit();
}
@ -199,58 +203,58 @@ START_TEST(loggerTruncate) {
const LogEntry * tmp;
setup_log();
LogHandle lh = getLogHandle();
LogHandle* lh = getLogHandle(stasis_log_file);
int i = 0;
while(i < 234) {
i++;
le = nextInLog(&lh);
le = nextInLog(lh);
}
le2 = nextInLog(&lh);
le2 = nextInLog(lh);
i = 0;
while(i < 23) {
i++;
le3 = nextInLog(&lh);
le3 = nextInLog(lh);
}
LogTruncate(le->LSN);
stasis_log_file->truncate(stasis_log_file, le->LSN);
tmp = LogReadLSN(le->LSN);
tmp = stasis_log_file->read_entry(stasis_log_file, le->LSN);
fail_unless(NULL != tmp, NULL);
fail_unless(tmp->LSN == le->LSN, NULL);
FreeLogEntry(tmp);
tmp = LogReadLSN(le2->LSN);
freeLogEntry(tmp);
tmp = stasis_log_file->read_entry(stasis_log_file, le2->LSN);
fail_unless(NULL != tmp, NULL);
fail_unless(tmp->LSN == le2->LSN, NULL);
FreeLogEntry(tmp);
tmp = LogReadLSN(le3->LSN);
freeLogEntry(tmp);
tmp = stasis_log_file->read_entry(stasis_log_file, le3->LSN);
fail_unless(NULL != tmp, NULL);
fail_unless(tmp->LSN == le3->LSN, NULL);
FreeLogEntry(tmp);
lh = getLogHandle();
freeLogEntry(tmp);
freeLogHandle(lh);
lh = getLogHandle(stasis_log_file);
i = 0;
FreeLogEntry(le);
FreeLogEntry(le2);
FreeLogEntry(le3);
freeLogEntry(le);
freeLogEntry(le2);
freeLogEntry(le3);
while((le = nextInLog(&lh))) {
while((le = nextInLog(lh))) {
if(le->type != INTERNALLOG) {
i++;
}
FreeLogEntry(le);
freeLogEntry(le);
}
assert(i == (3000 - 234 + 1));
freeLogHandle(lh);
Tdeinit();
} END_TEST
@ -303,11 +307,11 @@ static void* worker_thread(void * arg) {
#ifdef NO_CONCURRENCY
pthread_mutex_lock(&big);
#endif
LogTruncate(myTruncVal);
stasis_log_file->truncate(stasis_log_file, myTruncVal);
#ifdef NO_CONCURRENCY
pthread_mutex_unlock(&big);
#endif
assert(LogTruncationPoint() >= myTruncVal);
assert(stasis_log_file->truncation_point(stasis_log_file) >= myTruncVal);
}
if(threshold < 3) {
@ -316,7 +320,7 @@ static void* worker_thread(void * arg) {
#ifdef NO_CONCURRENCY
pthread_mutex_lock(&big);
#endif
LogWrite(le);
stasis_log_file->write_entry(stasis_log_file,le);
#ifdef NO_CONCURRENCY
pthread_mutex_unlock(&big);
#endif
@ -331,10 +335,10 @@ static void* worker_thread(void * arg) {
lsn_t lsn = lsns[entry];
pthread_mutex_unlock(&random_mutex);
const LogEntry * e = LogReadLSN(lsn);
const LogEntry * e = stasis_log_file->read_entry(stasis_log_file, lsn);
assert(e->xid == entry+key);
FreeLogEntry(e);
freeLogEntry(e);
} else {
pthread_mutex_unlock(&random_mutex);
}
@ -344,7 +348,7 @@ static void* worker_thread(void * arg) {
/* Try to interleave requests as much as possible */
sched_yield();
FreeLogEntry(le);
freeLogEntry(le);
}
@ -391,41 +395,56 @@ void reopenLogWorkload(int truncating) {
stasis_transaction_table_active_transaction_count_set(0);
LogInit(loggerType);
if(LOG_TO_FILE == loggerType) {
stasis_log_file = openLogWriter();
} else if(LOG_TO_MEMORY == loggerType) {
stasis_log_file = open_InMemoryLog();
} else {
assert(stasis_log_file != NULL);
}
int xid = 1;
TransactionLog l = LogTransBegin(xid);
TransactionLog l = LogTransBegin(stasis_log_file, xid);
lsn_t startLSN = 0;
LogEntry * entries[ENTRY_COUNT];
for(int i = 0; i < ENTRY_COUNT; i++) {
entries[i] = LogUpdate(&l, NULL, OPERATION_NOOP, NULL, 0);
entries[i] = LogUpdate(stasis_log_file,
&l, NULL, OPERATION_NOOP, NULL, 0);
if(i == SYNC_POINT) {
if(truncating) {
LogTruncate(entries[i]->LSN);
stasis_log_file->truncate(stasis_log_file,entries[i]->LSN);
startLSN = entries[i]->LSN;
}
}
}
LogDeinit();
LogInit(loggerType);
stasis_log_file->deinit(stasis_log_file);
LogHandle h;
if(LOG_TO_FILE == loggerType) {
stasis_log_file = openLogWriter();
} else if(LOG_TO_MEMORY == loggerType) {
stasis_log_file = open_InMemoryLog();
} else {
assert(stasis_log_file != NULL);
}
LogHandle * h;
int i;
if(truncating) {
h = getLogHandle();
h = getLogHandle(stasis_log_file);
i = SYNC_POINT;
} else {
h = getLogHandle();
h = getLogHandle(stasis_log_file);
i = 0;
}
const LogEntry * e;
while((e = nextInLog(&h))) {
while((e = nextInLog(h))) {
if(e->type != INTERNALLOG) {
assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i]));
assert(!memcmp(e, entries[i], sizeofLogEntry(entries[i])));
@ -438,22 +457,24 @@ void reopenLogWorkload(int truncating) {
LogEntry * entries2[ENTRY_COUNT];
for(int i = 0; i < ENTRY_COUNT; i++) {
entries2[i] = LogUpdate(&l, NULL, OPERATION_NOOP, NULL, 0);
entries2[i] = LogUpdate(stasis_log_file, &l, NULL, OPERATION_NOOP,
NULL, 0);
if(i == SYNC_POINT) {
syncLog_LogWriter();
stasis_log_file->force_tail(stasis_log_file);
}
}
freeLogHandle(h);
if(truncating) {
h = getLSNHandle(startLSN);
h = getLSNHandle(stasis_log_file, startLSN);
i = SYNC_POINT;
} else {
h = getLogHandle();
h = getLogHandle(stasis_log_file);
i = 0;
}
while((e = nextInLog(&h))) {
while((e = nextInLog(h))) {
if(e->type != INTERNALLOG) {
if( i < ENTRY_COUNT) {
assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i]));
@ -467,10 +488,11 @@ void reopenLogWorkload(int truncating) {
}
}
freeLogHandle(h);
assert(i == (ENTRY_COUNT * 2));
stasis_truncation_automatic = 1;
LogDeinit();
stasis_log_file->deinit(stasis_log_file);
}
START_TEST(loggerReopenTest) {

View file

@ -184,7 +184,8 @@ START_TEST(operation_physical_do_undo) {
// XXX This is a hack to put some stuff in the log. Otherwise, Tdeinit() fails.
for(int i = 0; i < 10; i++)
LogWrite(allocCommonLogEntry(-1, -1, -1));
stasis_log_file->write_entry(stasis_log_file,
allocCommonLogEntry(-1, -1, -1));
/** @todo need to re-think check_operations. The test is pretty broken. */
Tcommit(xid);

View file

@ -59,18 +59,18 @@ static char * logEntryToString(const LogEntry * le) {
void setupOperationsTable();
int main() {
LogHandle lh;
LogHandle* lh;
const LogEntry * le;
setupOperationsTable();
if(openLogWriter()) {
stasis_log_t* log;
if(NULL == (log = openLogWriter())) {
printf("Couldn't open log.\n");
}
lh = getLogHandle(); /*LSNHandle(0); */
lh = getLogHandle(log);
while((le = nextInLog(&lh))) {
while((le = nextInLog(lh))) {
char * s = logEntryToString(le);
if(s) {
@ -78,8 +78,8 @@ int main() {
free(s);
}
FreeLogEntry(le);
freeLogEntry(le);
}
freeLogHandle(lh);
}

View file

@ -2,7 +2,7 @@
#include <stasis/truncation.h>
int main(void) {
Tinit();
truncateNow(1);
stasis_truncation_truncate(stasis_log_file, 1);
Tdeinit();
return compensation_error();