Renamed some methods; fixed bug that caused recovery to create potentially unbounded numbers of concurrent, active transactions.

(Note: this commit contains quite a few instances of auto-cleaned whitespace....)
This commit is contained in:
Sears Russell 2009-04-11 17:17:42 +00:00
parent dacc33642e
commit 4b07b538a6
15 changed files with 359 additions and 308 deletions

View file

@ -42,6 +42,13 @@ int stasis_truncation_automatic = STASIS_TRUNCATION_AUTOMATIC;
int stasis_truncation_automatic = 1; int stasis_truncation_automatic = 1;
#endif #endif
#ifdef STASIS_LOG_TYPE
int stasis_log_type = STASIS_LOG_TYPE;
#else
int stasis_log_type = LOG_TO_FILE;
#endif
#ifdef STASIS_LOG_FILE_NAME #ifdef STASIS_LOG_FILE_NAME
char * stasis_log_file_name = STASIS_LOG_FILE_NAME; char * stasis_log_file_name = STASIS_LOG_FILE_NAME;
#else #else

View file

@ -2,123 +2,130 @@
#include <stasis/latches.h> #include <stasis/latches.h>
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
/**
@todo remove static fields from inMemoryLog
*/
static rwl * flushedLSN_lock;
static lsn_t nextAvailableLSN;
static lsn_t globalOffset;
static rwl * globalOffset_lock;
static LogEntry ** buffer;
static lsn_t bufferLen;
static lsn_t nextAvailableLSN_InMemoryLog(stasis_log_t * log) { typedef struct {
writelock(flushedLSN_lock,0); rwl * flushedLSN_lock;
writelock(globalOffset_lock,0); lsn_t nextAvailableLSN;
lsn_t ret = nextAvailableLSN; lsn_t globalOffset;
unlock(globalOffset_lock); rwl * globalOffset_lock;
unlock(flushedLSN_lock); LogEntry ** buffer;
lsn_t bufferLen;
} stasis_log_impl_in_memory;
static lsn_t stasis_log_impl_in_memory_next_available_lsn(stasis_log_t * log) {
stasis_log_impl_in_memory * impl = log->impl;
writelock(impl->flushedLSN_lock,0);
writelock(impl->globalOffset_lock,0);
lsn_t ret = impl->nextAvailableLSN;
unlock(impl->globalOffset_lock);
unlock(impl->flushedLSN_lock);
return ret; return ret;
} }
static int writeLogEntry_InMemoryLog(stasis_log_t * log, LogEntry *e) { static int stasis_log_impl_in_memory_write_entry(stasis_log_t * log, LogEntry *e) {
writelock(flushedLSN_lock, 0); stasis_log_impl_in_memory * impl = log->impl;
writelock(impl->flushedLSN_lock, 0);
lsn_t bufferOffset; lsn_t bufferOffset;
int done = 0; int done = 0;
do{ do{
writelock(globalOffset_lock,0); writelock(impl->globalOffset_lock,0);
bufferOffset = nextAvailableLSN - globalOffset; bufferOffset = impl->nextAvailableLSN - impl->globalOffset;
if(bufferOffset > bufferLen) { if(bufferOffset > impl->bufferLen) {
bufferLen *= 2; impl->bufferLen *= 2;
buffer = realloc(buffer, bufferLen); impl->buffer = realloc(impl->buffer, impl->bufferLen);
} else { } else {
done = 1; done = 1;
} }
} while (!done); } while (!done);
return 0;
e->LSN = impl->nextAvailableLSN;
e->LSN = nextAvailableLSN;
LogEntry * cpy = malloc(sizeofLogEntry(e)); LogEntry * cpy = malloc(sizeofLogEntry(e));
memcpy(cpy, e, sizeofLogEntry(e)); memcpy(cpy, e, sizeofLogEntry(e));
// printf ("lsn: %ld\n", e->LSN); DEBUG("lsn: %ld\n", e->LSN);
buffer[bufferOffset] = cpy; impl->buffer[bufferOffset] = cpy;
// printf("lsn: %ld type: %d\n", e->LSN, e->type); DEBUG("lsn: %ld type: %d\n", e->LSN, e->type);
nextAvailableLSN++; impl->nextAvailableLSN++;
unlock(globalOffset_lock); unlock(impl->globalOffset_lock);
unlock(flushedLSN_lock); unlock(impl->flushedLSN_lock);
return 0;
} }
static lsn_t flushedLSN_InMemoryLog(stasis_log_t* log, static lsn_t stasis_log_impl_in_memory_first_unstable_lsn(stasis_log_t* log,
stasis_log_force_mode_t mode) { stasis_log_force_mode_t mode) {
return nextAvailableLSN; stasis_log_impl_in_memory * impl = log->impl;
return impl->nextAvailableLSN;
} }
static void syncLog_InMemoryLog(stasis_log_t* log, stasis_log_force_mode_t m){ static void stasis_log_impl_in_memory_force_tail(stasis_log_t* log, stasis_log_force_mode_t m){
// no-op // no-op
} }
static lsn_t nextEntry_InMemoryLog(stasis_log_t * log, const LogEntry * e) { static lsn_t stasis_log_impl_in_memory_next_entry(stasis_log_t * log, const LogEntry * e) {
return e->LSN + 1; return e->LSN + 1;
} }
static int truncateLog_InMemoryLog(stasis_log_t * log, lsn_t lsn) { static int stasis_log_impl_in_memory_truncate(stasis_log_t * log, lsn_t lsn) {
writelock(flushedLSN_lock,1); stasis_log_impl_in_memory * impl = log->impl;
writelock(globalOffset_lock,1); writelock(impl->flushedLSN_lock,1);
writelock(impl->globalOffset_lock,1);
assert(lsn <= nextAvailableLSN); assert(lsn <= impl->nextAvailableLSN);
if(lsn > globalOffset) { if(lsn > impl->globalOffset) {
for(int i = globalOffset; i < lsn; i++) { for(int i = impl->globalOffset; i < lsn; i++) {
free(buffer[i - globalOffset]); free(impl->buffer[i - impl->globalOffset]);
} }
assert((lsn-globalOffset) + (nextAvailableLSN -lsn) < bufferLen); assert((lsn-impl->globalOffset) + (impl->nextAvailableLSN -lsn) < impl->bufferLen);
memmove(&(buffer[0]), &(buffer[lsn - globalOffset]), sizeof(LogEntry*) * (nextAvailableLSN - lsn)); memmove(&(impl->buffer[0]), &(impl->buffer[lsn - impl->globalOffset]),
globalOffset = lsn; sizeof(LogEntry*) * (impl->nextAvailableLSN - lsn));
impl->globalOffset = lsn;
} }
writeunlock(globalOffset_lock); writeunlock(impl->globalOffset_lock);
writeunlock(flushedLSN_lock); writeunlock(impl->flushedLSN_lock);
return 0; return 0;
} }
static lsn_t firstLogEntry_InMemoryLog() { static lsn_t stasis_log_impl_in_memory_truncation_point(stasis_log_t * log) {
return globalOffset; stasis_log_impl_in_memory * impl = log->impl;
return impl->globalOffset;
} }
static int close_InMemoryLog(stasis_log_t * log) { static int stasis_log_impl_in_memory_close(stasis_log_t * log) {
if(buffer) { stasis_log_impl_in_memory * impl = log->impl;
lsn_t firstEmptyOffset = nextAvailableLSN-globalOffset; if(impl->buffer) {
lsn_t firstEmptyOffset = impl->nextAvailableLSN-impl->globalOffset;
for(lsn_t i = 0; i < firstEmptyOffset; i++) { for(lsn_t i = 0; i < firstEmptyOffset; i++) {
assert(buffer[i]->LSN == i+globalOffset); assert(impl->buffer[i]->LSN == i+impl->globalOffset);
free(buffer[i]); free(impl->buffer[i]);
} }
free(buffer); free(impl->buffer);
nextAvailableLSN = 0; impl->nextAvailableLSN = 0;
globalOffset = 0; impl->globalOffset = 0;
bufferLen = 0; impl->bufferLen = 0;
buffer = 0; impl->buffer = 0;
free(impl);
} }
free (log); free (log);
return 0; return 0;
} }
static const LogEntry * readLSNEntry_InMemoryLog(stasis_log_t* log, static const LogEntry * stasis_log_impl_in_memory_read_entry(stasis_log_t* log,
lsn_t lsn) { lsn_t lsn) {
// printf("lsn: %ld\n", lsn); stasis_log_impl_in_memory * impl = log->impl;
if(lsn >= nextAvailableLSN) { return 0; } DEBUG("lsn: %ld\n", lsn);
assert(lsn-globalOffset >= 0 && lsn-globalOffset< bufferLen); if(lsn >= impl->nextAvailableLSN) { return 0; }
readlock(globalOffset_lock, 0); assert(lsn-impl->globalOffset >= 0 && lsn-impl->globalOffset< impl->bufferLen);
LogEntry * ptr = buffer[lsn - globalOffset]; readlock(impl->globalOffset_lock, 0);
unlock(globalOffset_lock); LogEntry * ptr = impl->buffer[lsn - impl->globalOffset];
unlock(impl->globalOffset_lock);
assert(ptr); assert(ptr);
assert(ptr->LSN == lsn); assert(ptr->LSN == lsn);
@ -126,36 +133,38 @@ static const LogEntry * readLSNEntry_InMemoryLog(stasis_log_t* log,
memcpy(ret, ptr, sizeofLogEntry(ptr)); memcpy(ret, ptr, sizeofLogEntry(ptr));
//printf("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN); DEBUG("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN);
return ret; return ret;
} }
static lsn_t sizeofInternalLogEntry_InMemoryLog(stasis_log_t* log, static lsn_t stasis_log_impl_in_memory_sizeof_internal_entry(stasis_log_t* log,
const LogEntry * e) { const LogEntry * e) {
abort(); abort();
} }
static int isDurable_InMemoryLog(stasis_log_t*log) { return 0; } static int stasis_log_impl_in_memory_is_durable(stasis_log_t*log) { return 0; }
stasis_log_t* open_InMemoryLog() { stasis_log_t* stasis_log_impl_in_memory_open() {
flushedLSN_lock = initlock(); stasis_log_impl_in_memory * impl = malloc(sizeof(*impl));
globalOffset_lock = initlock(); impl->flushedLSN_lock = initlock();
globalOffset = 0; impl->globalOffset_lock = initlock();
nextAvailableLSN = 0; impl->globalOffset = 0;
buffer = malloc(4096 * 1024 * sizeof (LogEntry *)); impl->nextAvailableLSN = 0;
bufferLen =4096 * 1024; impl->buffer = malloc(4096 * 1024 * sizeof (LogEntry *));
impl->bufferLen =4096 * 1024;
static stasis_log_t proto = { static stasis_log_t proto = {
sizeofInternalLogEntry_InMemoryLog, // sizeof_internal_entry stasis_log_impl_in_memory_sizeof_internal_entry,
writeLogEntry_InMemoryLog,// write_entry stasis_log_impl_in_memory_write_entry,
readLSNEntry_InMemoryLog, // read_entry stasis_log_impl_in_memory_read_entry,
nextEntry_InMemoryLog,// next_entry stasis_log_impl_in_memory_next_entry,
flushedLSN_InMemoryLog, // first_unstable_lsn stasis_log_impl_in_memory_first_unstable_lsn,
nextAvailableLSN_InMemoryLog, // next_available_lsn stasis_log_impl_in_memory_next_available_lsn,
syncLog_InMemoryLog, // force_tail stasis_log_impl_in_memory_force_tail,
truncateLog_InMemoryLog, // truncate stasis_log_impl_in_memory_truncate,
firstLogEntry_InMemoryLog,// truncation_point stasis_log_impl_in_memory_truncation_point,
close_InMemoryLog, // deinit stasis_log_impl_in_memory_close,
isDurable_InMemoryLog// is_durable stasis_log_impl_in_memory_is_durable
}; };
stasis_log_t* log = malloc(sizeof(*log)); stasis_log_t* log = malloc(sizeof(*log));
memcpy(log,&proto, sizeof(proto)); memcpy(log,&proto, sizeof(proto));
log->impl = impl;
return log; return log;
} }

View file

@ -61,15 +61,6 @@ terms specified in this license.
#include <stasis/logger/inMemoryLog.h> #include <stasis/logger/inMemoryLog.h>
#include <stasis/page.h> #include <stasis/page.h>
/**
@todo loggerType should go away.
*/
#ifdef USE_LOGGER
int loggerType = USE_LOGGER;
#else
int loggerType = LOG_TO_FILE;
#endif
/** /**
@todo stasis_log_file should be in transactional2.c, and not global @todo stasis_log_file should be in transactional2.c, and not global
*/ */
@ -178,6 +169,9 @@ lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l) {
lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) { lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) {
return LogTransCommon(log, l, XABORT); return LogTransCommon(log, l, XABORT);
} }
lsn_t LogTransEnd(stasis_log_t* log, TransactionLog * l) {
return LogTransCommon(log, l, XEND);
}
lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) { lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) {
lsn_t lsn = LogTransCommonPrepare(log, l); lsn_t lsn = LogTransCommonPrepare(log, l);
LogForce(log, lsn, LOG_FORCE_COMMIT); LogForce(log, lsn, LOG_FORCE_COMMIT);

View file

@ -49,7 +49,7 @@ static pthread_mutex_t rollback_mutex = PTHREAD_MUTEX_INITIALIZER;
no longer reads the pages in, there's no longer any reason to build no longer reads the pages in, there's no longer any reason to build
the list of dirty pages. the list of dirty pages.
*/ */
static void Analysis(stasis_log_t* log) { static void stasis_recovery_analysis(stasis_log_t* log) {
DEBUG("Recovery: Analysis\n"); DEBUG("Recovery: Analysis\n");
@ -115,6 +115,7 @@ static void Analysis(stasis_log_t* log) {
lsn_t* free_lsn = pblHtLookup(transactionLSN, &(e->xid), sizeof(int)); lsn_t* free_lsn = pblHtLookup(transactionLSN, &(e->xid), sizeof(int));
pblHtRemove(transactionLSN, &(e->xid), sizeof(int)); pblHtRemove(transactionLSN, &(e->xid), sizeof(int));
free(free_lsn); free(free_lsn);
stasis_transaction_table_forget(e->xid);
} }
break; break;
case UPDATELOG: case UPDATELOG:
@ -171,7 +172,7 @@ static void Analysis(stasis_log_t* log) {
Y (NTA replaces physical undo) Y (NTA replaces physical undo)
*/ */
static void Redo(stasis_log_t* log) { static void stasis_recovery_redo(stasis_log_t* log) {
LogHandle* lh = getLogHandle(log); LogHandle* lh = getLogHandle(log);
const LogEntry * e; const LogEntry * e;
@ -250,7 +251,7 @@ static void Redo(stasis_log_t* log) {
freeLogHandle(lh); freeLogHandle(lh);
} }
static void Undo(stasis_log_t* log, int recovery) { static void stasis_recovery_undo(stasis_log_t* log, int recovery) {
LogHandle* lh; LogHandle* lh;
DEBUG("Recovery: Undo\n"); DEBUG("Recovery: Undo\n");
@ -337,7 +338,8 @@ static void Undo(stasis_log_t* log, int recovery) {
// records may be passed in by undoTrans. // records may be passed in by undoTrans.
break; break;
case XCOMMIT: case XCOMMIT:
// Should never abort a transaction that contains a commit record case XEND:
// Should never abort a transaction that contains a commit or end record
abort(); abort();
case XPREPARE: { case XPREPARE: {
DEBUG("found prepared xact %d\n", e->xid); DEBUG("found prepared xact %d\n", e->xid);
@ -361,9 +363,8 @@ static void Undo(stasis_log_t* log, int recovery) {
freeLogEntry(e); freeLogEntry(e);
} }
if(!prepared) { if(!prepared) {
if(recovery) { // Log an XEND, remove transaction from XactionTable.
stasis_transaction_table_forget(thisXid); Tforget(thisXid);
}
if(globalLockManager.abort) { if(globalLockManager.abort) {
globalLockManager.abort(thisXid); globalLockManager.abort(thisXid);
} }
@ -375,12 +376,12 @@ void stasis_recovery_initiate(stasis_log_t* log) {
transactionLSN = pblHtCreate(); transactionLSN = pblHtCreate();
DEBUG("Analysis started\n"); DEBUG("Analysis started\n");
Analysis(log); stasis_recovery_analysis(log);
DEBUG("Redo started\n"); DEBUG("Redo started\n");
Redo(log); stasis_recovery_redo(log);
DEBUG("Undo started\n"); DEBUG("Undo started\n");
TallocPostInit(); TallocPostInit();
Undo(log,1); stasis_recovery_undo(log,1);
DEBUG("Recovery complete.\n"); DEBUG("Recovery complete.\n");
for(void * it = pblHtFirst(transactionLSN); it; it = pblHtNext(transactionLSN)) { for(void * it = pblHtFirst(transactionLSN); it; it = pblHtNext(transactionLSN)) {
@ -405,7 +406,7 @@ void undoTrans(stasis_log_t* log, TransactionLog transaction) {
/* Nothing to undo. (Happens for read-only xacts.) */ /* Nothing to undo. (Happens for read-only xacts.) */
} }
Undo(log, 0); stasis_recovery_undo(log, 0);
if(rollbackLSNs) { if(rollbackLSNs) {
destroyList(&rollbackLSNs); destroyList(&rollbackLSNs);
} }

View file

@ -42,7 +42,7 @@ struct ringBufferLog_s {
#define offset_to_lsn(x, lsn) ((lsn) + (x)->offset) #define offset_to_lsn(x, lsn) ((lsn) + (x)->offset)
#endif #endif
static int truncateLog(ringBufferLog_t * log, lsn_t lsn); static int stasis_ringbuffer_truncate(ringBufferLog_t * log, lsn_t lsn);
ringBufferLog_t * openLogRingBuffer(size_t size, lsn_t initialOffset) { ringBufferLog_t * openLogRingBuffer(size_t size, lsn_t initialOffset) {
ringBufferLog_t * ret = malloc(sizeof(ringBufferLog_t)); ringBufferLog_t * ret = malloc(sizeof(ringBufferLog_t));
@ -132,12 +132,12 @@ int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log, size_t size) {
} }
memcpyFromRingBuffer(buf, log, lsn_to_offset(log, log->start), size); memcpyFromRingBuffer(buf, log, lsn_to_offset(log, log->start), size);
return truncateLog(log, log->start + size); return stasis_ringbuffer_truncate(log, log->start + size);
} }
/** static because it does no error checking. */ /** static because it does no error checking. */
static int truncateLog(ringBufferLog_t * log, lsn_t lsn) { static int stasis_ringbuffer_truncate(ringBufferLog_t * log, lsn_t lsn) {
#ifdef TRACK_OFFSETS #ifdef TRACK_OFFSETS
lsn_t newStart = lsn_to_offset(log, lsn); lsn_t newStart = lsn_to_offset(log, lsn);

View file

@ -94,12 +94,12 @@ int Tinit() {
stasis_transaction_table_init(); stasis_transaction_table_init();
stasis_operation_table_init(); stasis_operation_table_init();
dirtyPagesInit(); dirtyPagesInit();
if(LOG_TO_FILE == loggerType) { if(LOG_TO_FILE == stasis_log_type) {
stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name,
stasis_log_file_mode, stasis_log_file_mode,
stasis_log_file_permissions); stasis_log_file_permissions);
} else if(LOG_TO_MEMORY == loggerType) { } else if(LOG_TO_MEMORY == stasis_log_type) {
stasis_log_file = open_InMemoryLog(); stasis_log_file = stasis_log_impl_in_memory_open();
} else { } else {
assert(stasis_log_file != NULL); assert(stasis_log_file != NULL);
} }
@ -420,15 +420,15 @@ int Tabort(int xid) {
allocTransactionAbort(xid); allocTransactionAbort(xid);
pthread_mutex_lock(&transactional_2_mutex);
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
numActiveXactions--;
assert( numActiveXactions >= 0 );
pthread_mutex_unlock(&transactional_2_mutex);
return 0; return 0;
} }
int Tforget(int xid) {
TransactionLog * t = &XactionTable[xid%MAX_TRANSACTIONS];
assert(t->xid == xid);
LogTransEnd(stasis_log_file, t);
stasis_transaction_table_forget(t->xid);
return 0;
}
int Tdeinit() { int Tdeinit() {
int i; int i;
@ -454,7 +454,7 @@ int Tdeinit() {
slow_close = 0; slow_close = 0;
} }
stasis_page_deinit(); stasis_page_deinit();
stasis_log_file->deinit(stasis_log_file); stasis_log_file->close(stasis_log_file);
dirtyPagesDeinit(); dirtyPagesDeinit();
initted = 0; initted = 0;
@ -475,7 +475,7 @@ int TuncleanShutdown() {
slow_close = 0; slow_close = 0;
} }
stasis_page_deinit(); stasis_page_deinit();
stasis_log_file->deinit(stasis_log_file); stasis_log_file->close(stasis_log_file);
numActiveXactions = 0; numActiveXactions = 0;
dirtyPagesDeinit(); dirtyPagesDeinit();
@ -581,7 +581,7 @@ int stasis_transaction_table_forget(int xid) {
int TdurabilityLevel() { int TdurabilityLevel() {
if(bufferManagerType == BUFFER_MANAGER_MEM_ARRAY) { if(bufferManagerType == BUFFER_MANAGER_MEM_ARRAY) {
return VOLATILE; return VOLATILE;
} else if(loggerType == LOG_TO_MEMORY) { } else if(stasis_log_type == LOG_TO_MEMORY) {
return PERSISTENT; return PERSISTENT;
} else { } else {
return DURABLE; return DURABLE;

View file

@ -57,6 +57,23 @@ extern int stasis_suppress_unclean_shutdown_warnings;
*/ */
extern int stasis_truncation_automatic; extern int stasis_truncation_automatic;
/**
This is the log implementation that is being used.
Before Stasis is initialized it will be set to a default value.
It may be changed before Tinit() is called by assigning to it.
The default can be overridden at compile time by defining
USE_LOGGER.
(eg: gcc ... -DSTASIS_LOG_TYPE=LOG_TO_FOO)
@see constants.h for a list of recognized log implementations.
(The constants are named LOG_TO_*)
@todo rename LOG_TO_* constants to STASIS_LOG_TYPE_*
*/
extern int stasis_log_type;
extern char * stasis_log_file_name; extern char * stasis_log_file_name;
extern int stasis_log_file_mode; extern int stasis_log_file_mode;
extern int stasis_log_file_permissions; extern int stasis_log_file_permissions;

View file

@ -3,6 +3,6 @@
#include <stasis/logger/logger2.h> #include <stasis/logger/logger2.h>
stasis_log_t* open_InMemoryLog(); stasis_log_t* stasis_log_impl_in_memory_open();
#endif #endif

View file

@ -90,21 +90,6 @@ typedef enum {
extern TransactionLog XactionTable[MAX_TRANSACTIONS]; extern TransactionLog XactionTable[MAX_TRANSACTIONS];
/**
This is the log implementation that is being used.
Before Stasis is intialized it will be set to a default value.
It may be changed before Tinit() is called by assigning to it.
The default can be overridden at compile time by defining
USE_LOGGER.
(eg: gcc ... -DUSE_LOGGER=LOG_TO_FOO)
@see constants.h for a list of recognized log implementations.
(The constants are named LOG_TO_*)
*/
extern int loggerType;
struct stasis_log_t { struct stasis_log_t {
/** /**
@ -169,7 +154,7 @@ struct stasis_log_t {
/** /**
@return 0 on success @return 0 on success
*/ */
int (*deinit)(struct stasis_log_t* log); int (*close)(struct stasis_log_t* log);
int (*is_durable)(struct stasis_log_t* log); int (*is_durable)(struct stasis_log_t* log);
@ -228,11 +213,10 @@ lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l);
lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l); lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l);
/** /**
Write a end transaction record @see XEND Write a end transaction record. This entry tells recovery's undo
phase that it may safely ignore the transaction.
@todo Implement LogEnd
*/ */
void LogEnd (stasis_log_t* log, TransactionLog * l); lsn_t LogTransEnd (stasis_log_t* log, TransactionLog * l);
/** /**
LogUpdate writes an UPDATELOG log record to the log tail. It LogUpdate writes an UPDATELOG log record to the log tail. It

View file

@ -783,6 +783,8 @@ int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn,
lsn_t prevLSN, lsn_t prevLSN,
lsn_t recLSN); lsn_t recLSN);
int stasis_transaction_table_forget(int xid); int stasis_transaction_table_forget(int xid);
int Tforget(int xid);
/** /**
This is used by log truncation. This is used by log truncation.
*/ */

View file

@ -414,7 +414,7 @@ Suite * check_suite(void) {
TCase *tc = tcase_create("recovery"); TCase *tc = tcase_create("recovery");
tcase_set_timeout(tc, 0); // disable timeouts tcase_set_timeout(tc, 0); // disable timeouts
if(LOG_TO_MEMORY != loggerType) { if(LOG_TO_MEMORY != stasis_log_type) {
/* void * foobar; */ /* used to supress warnings. */ /* void * foobar; */ /* used to supress warnings. */
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, recoverBlob__idempotent); tcase_add_test(tc, recoverBlob__idempotent);

View file

@ -139,8 +139,9 @@ static void setup_log() {
@todo Test logHandle more thoroughly. (Still need to test the guard mechanism.) @todo Test logHandle more thoroughly. (Still need to test the guard mechanism.)
*/ */
START_TEST(loggerTest) static void loggerTest(int logType) {
{
stasis_log_type = logType;
const LogEntry * e; const LogEntry * e;
LogHandle* h; LogHandle* h;
int i = 0; int i = 0;
@ -161,16 +162,21 @@ START_TEST(loggerTest)
stasis_log_safe_writes_delete(stasis_log_file_name); stasis_log_safe_writes_delete(stasis_log_file_name);
Tdeinit(); Tdeinit();
} }
END_TEST START_TEST(loggerFileTest) {
loggerTest(LOG_TO_FILE);
} END_TEST
START_TEST(loggerMemTest) {
loggerTest(LOG_TO_MEMORY);
} END_TEST
/** /**
@test @test
Checks for a bug ecountered during devlopment. What happens when Checks for a bug ecountered during devlopment. What happens when
previousInTransaction is called immediately after the handle is previousInTransaction is called immediately after the handle is
allocated? */ allocated? */
START_TEST(logHandleColdReverseIterator) { static void logHandleColdReverseIterator(int logType) {
const LogEntry * e; const LogEntry * e;
stasis_log_type = logType;
setup_log(); setup_log();
LogHandle* lh = getLogHandle(stasis_log_file); LogHandle* lh = getLogHandle(stasis_log_file);
int i = 0; int i = 0;
@ -191,14 +197,20 @@ START_TEST(logHandleColdReverseIterator) {
assert(i <= 4); /* We should almost immediately hit a clr that goes to the beginning of the log... */ assert(i <= 4); /* We should almost immediately hit a clr that goes to the beginning of the log... */
Tdeinit(); Tdeinit();
} }
END_TEST START_TEST(logHandleFileColdReverseIterator) {
logHandleColdReverseIterator(LOG_TO_FILE);
} END_TEST
START_TEST(logHandleMemColdReverseIterator) {
logHandleColdReverseIterator(LOG_TO_MEMORY);
} END_TEST
/** /**
@test @test
Build a simple log, truncate it, and then test the logWriter routines against it. Build a simple log, truncate it, and then test the logWriter routines against it.
*/ */
START_TEST(loggerTruncate) { static void loggerTruncate(int logType) {
stasis_log_type = logType;
const LogEntry * le; const LogEntry * le;
const LogEntry * le2; const LogEntry * le2;
const LogEntry * le3 = NULL; const LogEntry * le3 = NULL;
@ -258,7 +270,12 @@ START_TEST(loggerTruncate) {
assert(i == (3000 - 234 + 1)); assert(i == (3000 - 234 + 1));
freeLogHandle(lh); freeLogHandle(lh);
Tdeinit(); Tdeinit();
}
START_TEST(loggerFileTruncate) {
loggerTruncate(LOG_TO_FILE);
} END_TEST
START_TEST(loggerMemTruncate) {
loggerTruncate(LOG_TO_MEMORY);
} END_TEST } END_TEST
#define ENTRIES_PER_THREAD 200 #define ENTRIES_PER_THREAD 200
@ -357,8 +374,8 @@ static void* worker_thread(void * arg) {
return 0; return 0;
} }
static void loggerCheckWorker(int logType) {
START_TEST(loggerCheckWorker) { stasis_log_type = logType;
int four = 4; int four = 4;
pthread_mutex_init(&random_mutex, NULL); pthread_mutex_init(&random_mutex, NULL);
@ -367,9 +384,16 @@ START_TEST(loggerCheckWorker) {
worker_thread(&four); worker_thread(&four);
Tdeinit(); Tdeinit();
}
START_TEST(loggerFileCheckWorker) {
loggerCheckWorker(LOG_TO_FILE);
} END_TEST
START_TEST(loggerMemCheckWorker) {
loggerCheckWorker(LOG_TO_MEMORY);
} END_TEST } END_TEST
START_TEST(loggerCheckThreaded) { static void loggerCheckThreaded(int logType) {
stasis_log_type = logType;
#define THREAD_COUNT 100 #define THREAD_COUNT 100
pthread_t workers[THREAD_COUNT]; pthread_t workers[THREAD_COUNT];
@ -386,6 +410,13 @@ START_TEST(loggerCheckThreaded) {
} }
Tdeinit(); Tdeinit();
}
START_TEST(loggerFileCheckThreaded) {
loggerCheckThreaded(LOG_TO_FILE);
} END_TEST
START_TEST(loggerMemCheckThreaded) {
loggerCheckThreaded(LOG_TO_MEMORY);
} END_TEST } END_TEST
void reopenLogWorkload(int truncating) { void reopenLogWorkload(int truncating) {
@ -397,12 +428,12 @@ void reopenLogWorkload(int truncating) {
stasis_transaction_table_active_transaction_count_set(0); stasis_transaction_table_active_transaction_count_set(0);
if(LOG_TO_FILE == loggerType) { if(LOG_TO_FILE == stasis_log_type) {
stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name,
stasis_log_file_mode, stasis_log_file_mode,
stasis_log_file_permissions); stasis_log_file_permissions);
} else if(LOG_TO_MEMORY == loggerType) { } else if(LOG_TO_MEMORY == stasis_log_type) {
stasis_log_file = open_InMemoryLog(); stasis_log_file = stasis_log_impl_in_memory_open();
} else { } else {
assert(stasis_log_file != NULL); assert(stasis_log_file != NULL);
} }
@ -428,14 +459,14 @@ void reopenLogWorkload(int truncating) {
} }
} }
stasis_log_file->deinit(stasis_log_file); stasis_log_file->close(stasis_log_file);
if(LOG_TO_FILE == loggerType) { if(LOG_TO_FILE == stasis_log_type) {
stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name,
stasis_log_file_mode, stasis_log_file_mode,
stasis_log_file_permissions); stasis_log_file_permissions);
} else if(LOG_TO_MEMORY == loggerType) { } else if(LOG_TO_MEMORY == stasis_log_type) {
stasis_log_file = open_InMemoryLog(); stasis_log_file = stasis_log_impl_in_memory_open();
} else { } else {
assert(stasis_log_file != NULL); assert(stasis_log_file != NULL);
} }
@ -500,16 +531,18 @@ void reopenLogWorkload(int truncating) {
assert(i == (ENTRY_COUNT * 2)); assert(i == (ENTRY_COUNT * 2));
stasis_truncation_automatic = 1; stasis_truncation_automatic = 1;
stasis_log_file->deinit(stasis_log_file); stasis_log_file->close(stasis_log_file);
} }
START_TEST(loggerReopenTest) { START_TEST(loggerReopenTest) {
stasis_log_type = LOG_TO_FILE;
stasis_log_safe_writes_delete(stasis_log_file_name); stasis_log_safe_writes_delete(stasis_log_file_name);
reopenLogWorkload(0); reopenLogWorkload(0);
} END_TEST } END_TEST
START_TEST(loggerTruncateReopenTest) { START_TEST(loggerTruncateReopenTest) {
stasis_log_type = LOG_TO_FILE;
stasis_log_safe_writes_delete(stasis_log_file_name); stasis_log_safe_writes_delete(stasis_log_file_name);
reopenLogWorkload(1); reopenLogWorkload(1);
} END_TEST } END_TEST
@ -520,13 +553,17 @@ Suite * check_suite(void) {
TCase *tc = tcase_create("writeNew"); TCase *tc = tcase_create("writeNew");
tcase_set_timeout(tc, 0); tcase_set_timeout(tc, 0);
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, loggerFileTest);
tcase_add_test(tc, loggerTest); tcase_add_test(tc, loggerMemTest);
tcase_add_test(tc, logHandleColdReverseIterator); tcase_add_test(tc, logHandleFileColdReverseIterator);
tcase_add_test(tc, loggerTruncate); tcase_add_test(tc, logHandleMemColdReverseIterator);
tcase_add_test(tc, loggerCheckWorker); tcase_add_test(tc, loggerFileTruncate);
tcase_add_test(tc, loggerCheckThreaded); tcase_add_test(tc, loggerMemTruncate);
if(loggerType != LOG_TO_MEMORY) { tcase_add_test(tc, loggerFileCheckWorker);
tcase_add_test(tc, loggerMemCheckWorker);
tcase_add_test(tc, loggerFileCheckThreaded);
tcase_add_test(tc, loggerMemCheckThreaded);
if(stasis_log_type != LOG_TO_MEMORY) {
tcase_add_test(tc, loggerReopenTest); tcase_add_test(tc, loggerReopenTest);
tcase_add_test(tc, loggerTruncateReopenTest); tcase_add_test(tc, loggerTruncateReopenTest);
} }

View file

@ -697,7 +697,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, operation_physical_do_undo); tcase_add_test(tc, operation_physical_do_undo);
tcase_add_test(tc, operation_nestedTopAction); tcase_add_test(tc, operation_nestedTopAction);
tcase_add_test(tc, operation_set_range); tcase_add_test(tc, operation_set_range);
if(loggerType != LOG_TO_MEMORY) { if(stasis_log_type != LOG_TO_MEMORY) {
tcase_add_test(tc, operation_prepare); tcase_add_test(tc, operation_prepare);
} }
tcase_add_test(tc, operation_alloc_test); tcase_add_test(tc, operation_alloc_test);

View file

@ -212,7 +212,7 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, pageOpCheckAllocDealloc); tcase_add_test(tc, pageOpCheckAllocDealloc);
if(LOG_TO_MEMORY != loggerType) { if(LOG_TO_MEMORY != stasis_log_type) {
tcase_add_test(tc, pageOpCheckRecovery); tcase_add_test(tc, pageOpCheckRecovery);
} }
/* --------------------------------------------- */ /* --------------------------------------------- */

View file

@ -484,7 +484,7 @@ Suite * check_suite(void) {
tcase_set_timeout(tc, 0); // disable timeouts tcase_set_timeout(tc, 0); // disable timeouts
if(LOG_TO_MEMORY != loggerType) { if(LOG_TO_MEMORY != stasis_log_type) {
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, recovery_idempotent); tcase_add_test(tc, recovery_idempotent);