From 4b07b538a6dd4c7a60eb55661362ebb855bbb782 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Sat, 11 Apr 2009 17:17:42 +0000 Subject: [PATCH] Renamed some methods; fixed bug that caused recovery to create potentially unbounded numbers of concurrent, active transactions. (Note: this commit contains quite a few instances of auto-cleaned whitespace....) --- src/stasis/flags.c | 7 ++ src/stasis/logger/inMemoryLog.c | 195 +++++++++++++++-------------- src/stasis/logger/logger2.c | 24 ++-- src/stasis/recovery2.c | 61 ++++----- src/stasis/ringbuffer.c | 6 +- src/stasis/transactional2.c | 68 +++++----- stasis/flags.h | 17 +++ stasis/logger/inMemoryLog.h | 2 +- stasis/logger/logger2.h | 24 +--- stasis/transactional.h | 68 +++++----- test/stasis/check_blobRecovery.c | 2 +- test/stasis/check_logWriter.c | 187 ++++++++++++++++----------- test/stasis/check_operations.c | 2 +- test/stasis/check_pageOperations.c | 2 +- test/stasis/check_recovery.c | 2 +- 15 files changed, 359 insertions(+), 308 deletions(-) diff --git a/src/stasis/flags.c b/src/stasis/flags.c index be23a91..d7832e2 100644 --- a/src/stasis/flags.c +++ b/src/stasis/flags.c @@ -42,6 +42,13 @@ int stasis_truncation_automatic = STASIS_TRUNCATION_AUTOMATIC; int stasis_truncation_automatic = 1; #endif +#ifdef STASIS_LOG_TYPE +int stasis_log_type = STASIS_LOG_TYPE; +#else +int stasis_log_type = LOG_TO_FILE; +#endif + + #ifdef STASIS_LOG_FILE_NAME char * stasis_log_file_name = STASIS_LOG_FILE_NAME; #else diff --git a/src/stasis/logger/inMemoryLog.c b/src/stasis/logger/inMemoryLog.c index e54a7a7..34649b1 100644 --- a/src/stasis/logger/inMemoryLog.c +++ b/src/stasis/logger/inMemoryLog.c @@ -2,160 +2,169 @@ #include #include #include -/** - @todo remove static fields from inMemoryLog -*/ -static rwl * flushedLSN_lock; -static lsn_t nextAvailableLSN; -static lsn_t globalOffset; -static rwl * globalOffset_lock; -static LogEntry ** buffer; -static lsn_t bufferLen; -static lsn_t nextAvailableLSN_InMemoryLog(stasis_log_t * log) { - writelock(flushedLSN_lock,0); - writelock(globalOffset_lock,0); - lsn_t ret = nextAvailableLSN; - unlock(globalOffset_lock); - unlock(flushedLSN_lock); +typedef struct { + rwl * flushedLSN_lock; + lsn_t nextAvailableLSN; + lsn_t globalOffset; + rwl * globalOffset_lock; + LogEntry ** buffer; + lsn_t bufferLen; +} stasis_log_impl_in_memory; + +static lsn_t stasis_log_impl_in_memory_next_available_lsn(stasis_log_t * log) { + stasis_log_impl_in_memory * impl = log->impl; + writelock(impl->flushedLSN_lock,0); + writelock(impl->globalOffset_lock,0); + lsn_t ret = impl->nextAvailableLSN; + unlock(impl->globalOffset_lock); + unlock(impl->flushedLSN_lock); return ret; } -static int writeLogEntry_InMemoryLog(stasis_log_t * log, LogEntry *e) { - writelock(flushedLSN_lock, 0); +static int stasis_log_impl_in_memory_write_entry(stasis_log_t * log, LogEntry *e) { + stasis_log_impl_in_memory * impl = log->impl; + writelock(impl->flushedLSN_lock, 0); lsn_t bufferOffset; int done = 0; - do{ - writelock(globalOffset_lock,0); - bufferOffset = nextAvailableLSN - globalOffset; - if(bufferOffset > bufferLen) { - bufferLen *= 2; - buffer = realloc(buffer, bufferLen); + do{ + writelock(impl->globalOffset_lock,0); + bufferOffset = impl->nextAvailableLSN - impl->globalOffset; + if(bufferOffset > impl->bufferLen) { + impl->bufferLen *= 2; + impl->buffer = realloc(impl->buffer, impl->bufferLen); } else { done = 1; } } while (!done); - return 0; - - e->LSN = nextAvailableLSN; + e->LSN = impl->nextAvailableLSN; LogEntry * cpy = malloc(sizeofLogEntry(e)); memcpy(cpy, e, sizeofLogEntry(e)); - // printf ("lsn: %ld\n", e->LSN); - buffer[bufferOffset] = cpy; + DEBUG("lsn: %ld\n", e->LSN); + impl->buffer[bufferOffset] = cpy; - // printf("lsn: %ld type: %d\n", e->LSN, e->type); - nextAvailableLSN++; + DEBUG("lsn: %ld type: %d\n", e->LSN, e->type); + impl->nextAvailableLSN++; - unlock(globalOffset_lock); - unlock(flushedLSN_lock); + unlock(impl->globalOffset_lock); + unlock(impl->flushedLSN_lock); + return 0; } -static lsn_t flushedLSN_InMemoryLog(stasis_log_t* log, +static lsn_t stasis_log_impl_in_memory_first_unstable_lsn(stasis_log_t* log, stasis_log_force_mode_t mode) { - return nextAvailableLSN; + stasis_log_impl_in_memory * impl = log->impl; + return impl->nextAvailableLSN; } -static void syncLog_InMemoryLog(stasis_log_t* log, stasis_log_force_mode_t m){ +static void stasis_log_impl_in_memory_force_tail(stasis_log_t* log, stasis_log_force_mode_t m){ // no-op } -static lsn_t nextEntry_InMemoryLog(stasis_log_t * log, const LogEntry * e) { +static lsn_t stasis_log_impl_in_memory_next_entry(stasis_log_t * log, const LogEntry * e) { return e->LSN + 1; } -static int truncateLog_InMemoryLog(stasis_log_t * log, lsn_t lsn) { - writelock(flushedLSN_lock,1); - writelock(globalOffset_lock,1); - - assert(lsn <= nextAvailableLSN); +static int stasis_log_impl_in_memory_truncate(stasis_log_t * log, lsn_t lsn) { + stasis_log_impl_in_memory * impl = log->impl; + writelock(impl->flushedLSN_lock,1); + writelock(impl->globalOffset_lock,1); + + assert(lsn <= impl->nextAvailableLSN); - if(lsn > globalOffset) { - for(int i = globalOffset; i < lsn; i++) { - free(buffer[i - globalOffset]); + if(lsn > impl->globalOffset) { + for(int i = impl->globalOffset; i < lsn; i++) { + free(impl->buffer[i - impl->globalOffset]); } - assert((lsn-globalOffset) + (nextAvailableLSN -lsn) < bufferLen); - memmove(&(buffer[0]), &(buffer[lsn - globalOffset]), sizeof(LogEntry*) * (nextAvailableLSN - lsn)); - globalOffset = lsn; + assert((lsn-impl->globalOffset) + (impl->nextAvailableLSN -lsn) < impl->bufferLen); + memmove(&(impl->buffer[0]), &(impl->buffer[lsn - impl->globalOffset]), + sizeof(LogEntry*) * (impl->nextAvailableLSN - lsn)); + impl->globalOffset = lsn; } - writeunlock(globalOffset_lock); - writeunlock(flushedLSN_lock); + writeunlock(impl->globalOffset_lock); + writeunlock(impl->flushedLSN_lock); return 0; } -static lsn_t firstLogEntry_InMemoryLog() { - return globalOffset; +static lsn_t stasis_log_impl_in_memory_truncation_point(stasis_log_t * log) { + stasis_log_impl_in_memory * impl = log->impl; + return impl->globalOffset; } -static int close_InMemoryLog(stasis_log_t * log) { - if(buffer) { - lsn_t firstEmptyOffset = nextAvailableLSN-globalOffset; - for(lsn_t i = 0; i < firstEmptyOffset; i++) { - assert(buffer[i]->LSN == i+globalOffset); - free(buffer[i]); +static int stasis_log_impl_in_memory_close(stasis_log_t * log) { + stasis_log_impl_in_memory * impl = log->impl; + if(impl->buffer) { + lsn_t firstEmptyOffset = impl->nextAvailableLSN-impl->globalOffset; + for(lsn_t i = 0; i < firstEmptyOffset; i++) { + assert(impl->buffer[i]->LSN == i+impl->globalOffset); + free(impl->buffer[i]); } - free(buffer); - nextAvailableLSN = 0; - globalOffset = 0; - bufferLen = 0; - buffer = 0; - + free(impl->buffer); + impl->nextAvailableLSN = 0; + impl->globalOffset = 0; + impl->bufferLen = 0; + impl->buffer = 0; + free(impl); } free (log); return 0; } -static const LogEntry * readLSNEntry_InMemoryLog(stasis_log_t* log, +static const LogEntry * stasis_log_impl_in_memory_read_entry(stasis_log_t* log, lsn_t lsn) { - // printf("lsn: %ld\n", lsn); - if(lsn >= nextAvailableLSN) { return 0; } - assert(lsn-globalOffset >= 0 && lsn-globalOffset< bufferLen); - readlock(globalOffset_lock, 0); - LogEntry * ptr = buffer[lsn - globalOffset]; - unlock(globalOffset_lock); + stasis_log_impl_in_memory * impl = log->impl; + DEBUG("lsn: %ld\n", lsn); + if(lsn >= impl->nextAvailableLSN) { return 0; } + assert(lsn-impl->globalOffset >= 0 && lsn-impl->globalOffset< impl->bufferLen); + readlock(impl->globalOffset_lock, 0); + LogEntry * ptr = impl->buffer[lsn - impl->globalOffset]; + unlock(impl->globalOffset_lock); assert(ptr); assert(ptr->LSN == lsn); - + LogEntry * ret = malloc(sizeofLogEntry(ptr)); memcpy(ret, ptr, sizeofLogEntry(ptr)); - - //printf("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN); + + DEBUG("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN); return ret; } -static lsn_t sizeofInternalLogEntry_InMemoryLog(stasis_log_t* log, +static lsn_t stasis_log_impl_in_memory_sizeof_internal_entry(stasis_log_t* log, const LogEntry * e) { abort(); } -static int isDurable_InMemoryLog(stasis_log_t*log) { return 0; } +static int stasis_log_impl_in_memory_is_durable(stasis_log_t*log) { return 0; } -stasis_log_t* open_InMemoryLog() { - flushedLSN_lock = initlock(); - globalOffset_lock = initlock(); - globalOffset = 0; - nextAvailableLSN = 0; - buffer = malloc(4096 * 1024 * sizeof (LogEntry *)); - bufferLen =4096 * 1024; +stasis_log_t* stasis_log_impl_in_memory_open() { + stasis_log_impl_in_memory * impl = malloc(sizeof(*impl)); + impl->flushedLSN_lock = initlock(); + impl->globalOffset_lock = initlock(); + impl->globalOffset = 0; + impl->nextAvailableLSN = 0; + impl->buffer = malloc(4096 * 1024 * sizeof (LogEntry *)); + impl->bufferLen =4096 * 1024; static stasis_log_t proto = { - sizeofInternalLogEntry_InMemoryLog, // sizeof_internal_entry - writeLogEntry_InMemoryLog,// write_entry - readLSNEntry_InMemoryLog, // read_entry - nextEntry_InMemoryLog,// next_entry - flushedLSN_InMemoryLog, // first_unstable_lsn - nextAvailableLSN_InMemoryLog, // next_available_lsn - syncLog_InMemoryLog, // force_tail - truncateLog_InMemoryLog, // truncate - firstLogEntry_InMemoryLog,// truncation_point - close_InMemoryLog, // deinit - isDurable_InMemoryLog// is_durable + stasis_log_impl_in_memory_sizeof_internal_entry, + stasis_log_impl_in_memory_write_entry, + stasis_log_impl_in_memory_read_entry, + stasis_log_impl_in_memory_next_entry, + stasis_log_impl_in_memory_first_unstable_lsn, + stasis_log_impl_in_memory_next_available_lsn, + stasis_log_impl_in_memory_force_tail, + stasis_log_impl_in_memory_truncate, + stasis_log_impl_in_memory_truncation_point, + stasis_log_impl_in_memory_close, + stasis_log_impl_in_memory_is_durable }; stasis_log_t* log = malloc(sizeof(*log)); memcpy(log,&proto, sizeof(proto)); + log->impl = impl; return log; } diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index a2541df..2926d33 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -61,15 +61,6 @@ terms specified in this license. #include #include -/** - @todo loggerType should go away. - */ -#ifdef USE_LOGGER -int loggerType = USE_LOGGER; -#else -int loggerType = LOG_TO_FILE; -#endif - /** @todo stasis_log_file should be in transactional2.c, and not global */ @@ -96,7 +87,7 @@ static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) { l->prevLSN = e->LSN; pthread_mutex_unlock(&l->mut); - DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, + DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); ret = e->LSN; @@ -138,7 +129,7 @@ LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l, arg, arg_size); log->write_entry(log, e); - DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid, + DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size); pthread_mutex_lock(&l->mut); if(l->prevLSN == -1) { l->recLSN = e->LSN; } @@ -147,11 +138,11 @@ LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l, return e; } -lsn_t LogCLR(stasis_log_t* log, const LogEntry * old_e) { +lsn_t LogCLR(stasis_log_t* log, const LogEntry * old_e) { LogEntry * e = allocCLRLogEntry(old_e); log->write_entry(log, e); - DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid, + DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid, e->LSN, LSN, prevLSN); lsn_t ret = e->LSN; freeLogEntry(e); @@ -178,6 +169,9 @@ lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l) { lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) { return LogTransCommon(log, l, XABORT); } +lsn_t LogTransEnd(stasis_log_t* log, TransactionLog * l) { + return LogTransCommon(log, l, XEND); +} lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) { lsn_t lsn = LogTransCommonPrepare(log, l); LogForce(log, lsn, LOG_FORCE_COMMIT); @@ -226,8 +220,8 @@ static void groupCommit(stasis_log_t* log, lsn_t lsn) { if((xactcount > 1 && pendingCommits < xactcount) || (xactcount > 20 && pendingCommits < (int)((double)xactcount * 0.95))) { int retcode; - while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) { - if(retcode != 0) { + while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) { + if(retcode != 0) { printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by " "a signal in groupCommit(). Acting as though it timed out.\n", __FILE__, __LINE__); diff --git a/src/stasis/recovery2.c b/src/stasis/recovery2.c index 63352ec..24e535a 100644 --- a/src/stasis/recovery2.c +++ b/src/stasis/recovery2.c @@ -1,4 +1,4 @@ -/** +/** @file @@ -24,7 +24,7 @@ #include /** @todo Get rid of linkedlist */ #include -#include // Needed for pageReadLSN. +#include // Needed for pageReadLSN. static pblHashTable_t * transactionLSN; static LinkedList * rollbackLSNs = NULL; @@ -41,15 +41,15 @@ static pthread_mutex_t rollback_mutex = PTHREAD_MUTEX_INITIALIZER; - Calculated a list of all dirty pages. It no longer does either of these things: - - A checkpointing algorithm could figure out where the redo pass + - A checkpointing algorithm could figure out where the redo pass should begin. (It would then truncate the log at that point.) This function could be called before analysis if efficiency is a concern. - We were using the list of dirty pages as an optimization to prevent the pages from being read later during recovery. Since this function - no longer reads the pages in, there's no longer any reason to build + no longer reads the pages in, there's no longer any reason to build the list of dirty pages. */ -static void Analysis(stasis_log_t* log) { +static void stasis_recovery_analysis(stasis_log_t* log) { DEBUG("Recovery: Analysis\n"); @@ -74,16 +74,16 @@ static void Analysis(stasis_log_t* log) { - map: xid -> max LSN - sorted list of maxLSN's */ - + if(xactLSN == NULL) { - xactLSN = malloc(sizeof(lsn_t)); + xactLSN = malloc(sizeof(lsn_t)); pblHtInsert(transactionLSN, &(e->xid), sizeof(int), xactLSN); - + } else { /* We've seen this xact before, and must have put a value in rollbackLSNs for it. That value is now stale, so remove it. */ - + DEBUG("Removing %lld\n", *xactLSN); removeVal(&rollbackLSNs, *xactLSN); } @@ -102,7 +102,7 @@ static void Analysis(stasis_log_t* log) { be rolled back, so we're done. */ break; case XEND: { - /* + /* XEND means this transaction reached stable storage. Therefore, we can skip redoing any of its operations. (The timestamps on each page guarantee that the redo phase will @@ -115,14 +115,15 @@ static void Analysis(stasis_log_t* log) { lsn_t* free_lsn = pblHtLookup(transactionLSN, &(e->xid), sizeof(int)); pblHtRemove(transactionLSN, &(e->xid), sizeof(int)); free(free_lsn); + stasis_transaction_table_forget(e->xid); } break; case UPDATELOG: case CLRLOG: - /* - If the last record we see for a transaction is an update or clr, + /* + If the last record we see for a transaction is an update or clr, then the transaction must not have committed, so it must need - to be rolled back. + to be rolled back. Add it to the list @@ -131,9 +132,9 @@ static void Analysis(stasis_log_t* log) { addSortedVal(&rollbackLSNs, e->LSN); break; - case XABORT: + case XABORT: // If the last record we see for a transaction is an abort, then - // the transaction didn't commit, and must be rolled back. + // the transaction didn't commit, and must be rolled back. DEBUG("Adding %lld\n", e->LSN); addSortedVal(&rollbackLSNs, e->LSN); break; @@ -143,8 +144,8 @@ static void Analysis(stasis_log_t* log) { case INTERNALLOG: // Created by the logger, just ignore it // Make sure the log entry doesn't interfere with real xacts. - assert(e->xid == INVALID_XID); - break; + assert(e->xid == INVALID_XID); + break; default: abort(); } @@ -171,7 +172,7 @@ static void Analysis(stasis_log_t* log) { Y (NTA replaces physical undo) */ -static void Redo(stasis_log_t* log) { +static void stasis_recovery_redo(stasis_log_t* log) { LogHandle* lh = getLogHandle(log); const LogEntry * e; @@ -250,7 +251,7 @@ static void Redo(stasis_log_t* log) { freeLogHandle(lh); } -static void Undo(stasis_log_t* log, int recovery) { +static void stasis_recovery_undo(stasis_log_t* log, int recovery) { LogHandle* lh; DEBUG("Recovery: Undo\n"); @@ -274,7 +275,7 @@ static void Undo(stasis_log_t* log, int recovery) { switch(e->type) { case UPDATELOG: { - if(e->update.page == INVALID_PAGE) { + if(e->update.page == INVALID_PAGE) { DEBUG("logical update\n"); // logical undo: no-op; then the NTA didn't complete, and @@ -337,7 +338,8 @@ static void Undo(stasis_log_t* log, int recovery) { // records may be passed in by undoTrans. break; case XCOMMIT: - // Should never abort a transaction that contains a commit record + case XEND: + // Should never abort a transaction that contains a commit or end record abort(); case XPREPARE: { DEBUG("found prepared xact %d\n", e->xid); @@ -355,15 +357,14 @@ static void Undo(stasis_log_t* log, int recovery) { default: DEBUG ("Unknown log type to undo (TYPE=%d,XID= %d,LSN=%lld), skipping...\n", - e->type, e->xid, e->LSN); + e->type, e->xid, e->LSN); abort(); } freeLogEntry(e); } if(!prepared) { - if(recovery) { - stasis_transaction_table_forget(thisXid); - } + // Log an XEND, remove transaction from XactionTable. + Tforget(thisXid); if(globalLockManager.abort) { globalLockManager.abort(thisXid); } @@ -375,12 +376,12 @@ void stasis_recovery_initiate(stasis_log_t* log) { transactionLSN = pblHtCreate(); DEBUG("Analysis started\n"); - Analysis(log); + stasis_recovery_analysis(log); DEBUG("Redo started\n"); - Redo(log); + stasis_recovery_redo(log); DEBUG("Undo started\n"); TallocPostInit(); - Undo(log,1); + stasis_recovery_undo(log,1); DEBUG("Recovery complete.\n"); for(void * it = pblHtFirst(transactionLSN); it; it = pblHtNext(transactionLSN)) { @@ -393,7 +394,7 @@ void stasis_recovery_initiate(stasis_log_t* log) { } -void undoTrans(stasis_log_t* log, TransactionLog transaction) { +void undoTrans(stasis_log_t* log, TransactionLog transaction) { pthread_mutex_lock(&rollback_mutex); assert(!rollbackLSNs); @@ -405,7 +406,7 @@ void undoTrans(stasis_log_t* log, TransactionLog transaction) { /* Nothing to undo. (Happens for read-only xacts.) */ } - Undo(log, 0); + stasis_recovery_undo(log, 0); if(rollbackLSNs) { destroyList(&rollbackLSNs); } diff --git a/src/stasis/ringbuffer.c b/src/stasis/ringbuffer.c index d958dbd..973c144 100644 --- a/src/stasis/ringbuffer.c +++ b/src/stasis/ringbuffer.c @@ -42,7 +42,7 @@ struct ringBufferLog_s { #define offset_to_lsn(x, lsn) ((lsn) + (x)->offset) #endif -static int truncateLog(ringBufferLog_t * log, lsn_t lsn); +static int stasis_ringbuffer_truncate(ringBufferLog_t * log, lsn_t lsn); ringBufferLog_t * openLogRingBuffer(size_t size, lsn_t initialOffset) { ringBufferLog_t * ret = malloc(sizeof(ringBufferLog_t)); @@ -132,12 +132,12 @@ int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log, size_t size) { } memcpyFromRingBuffer(buf, log, lsn_to_offset(log, log->start), size); - return truncateLog(log, log->start + size); + return stasis_ringbuffer_truncate(log, log->start + size); } /** static because it does no error checking. */ -static int truncateLog(ringBufferLog_t * log, lsn_t lsn) { +static int stasis_ringbuffer_truncate(ringBufferLog_t * log, lsn_t lsn) { #ifdef TRACK_OFFSETS lsn_t newStart = lsn_to_offset(log, lsn); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 5038120..ae394db 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -37,9 +37,9 @@ static int initted = 0; const recordid ROOT_RECORD = {1, 0, -1}; const recordid NULLRID = {0,0,-1}; const short SLOT_TYPE_LENGTHS[] = { -1, -1, sizeof(blob_record_t), -1}; -/** +/** Locking for transactional2.c works as follows: - + numActiveXactions, xidCount are protected, XactionTable is not. This implies that we do not support multi-threaded transactions, at least for now. @@ -58,7 +58,7 @@ void stasis_transaction_table_init() { } // @todo this factory stuff doesn't really belong here... -static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { +static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { stasis_handle_t * h = stasis_handle(open_memory)(off); //h = stasis_handle(open_debug)(h); stasis_write_buffer_t * w = h->append_buffer(h, len); @@ -70,7 +70,7 @@ typedef struct sf_args { int openMode; int filePerm; } sf_args; -static stasis_handle_t * slow_file_factory(void * argsP) { +static stasis_handle_t * slow_file_factory(void * argsP) { sf_args * args = (sf_args*) argsP; stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm); //h = stasis_handle(open_debug)(h); @@ -94,12 +94,12 @@ int Tinit() { stasis_transaction_table_init(); stasis_operation_table_init(); dirtyPagesInit(); - if(LOG_TO_FILE == loggerType) { + if(LOG_TO_FILE == stasis_log_type) { stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file_mode, stasis_log_file_permissions); - } else if(LOG_TO_MEMORY == loggerType) { - stasis_log_file = open_InMemoryLog(); + } else if(LOG_TO_MEMORY == stasis_log_type) { + stasis_log_file = stasis_log_impl_in_memory_open(); } else { assert(stasis_log_file != NULL); } @@ -169,7 +169,7 @@ int Tinit() { openMode, FILE_PERM); pageHandleOpen(pageFile); } break; - case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: { + case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: { printf("\nWarning: Using old I/O routines (with known bugs).\n"); openPageFile(); } break; @@ -245,7 +245,7 @@ static compensated_function void TactionHelper(int xid, Page * p) { LogEntry * e; assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid); - try { + try { if(globalLockManager.writeLockPage) { globalLockManager.writeLockPage(xid, p->id); } @@ -269,7 +269,7 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page, assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid); Page * p = loadPage(xid, page); assert(p); - try { + try { if(globalLockManager.writeLockPage) { globalLockManager.writeLockPage(xid, p->id); } @@ -322,8 +322,8 @@ compensated_function void TupdateStr(int xid, pageid_t page, Tupdate(xid, page, dat, datlen, op); } -compensated_function void Tupdate(int xid, pageid_t page, - const void *dat, size_t datlen, int op) { +compensated_function void Tupdate(int xid, pageid_t page, + const void *dat, size_t datlen, int op) { Page * p = loadPage(xid, page); assert(initted); TactionHelper(xid, dat, datlen, op, p); @@ -336,14 +336,14 @@ compensated_function void TreadStr(int xid, recordid rid, char * dat) { compensated_function void Tread(int xid, recordid rid, void * dat) { Page * p; - try { + try { p = loadPage(xid, rid.page); } end; readlock(p->rwlatch,0); rid = stasis_record_dereference(xid, p, rid); - if(rid.page != p->id) { + if(rid.page != p->id) { unlock(p->rwlatch); releasePage(p); p = loadPage(xid, rid.page); @@ -363,7 +363,7 @@ compensated_function void Tread(int xid, recordid rid, void * dat) { compensated_function void TreadRaw(int xid, recordid rid, void * dat) { Page * p; - try { + try { p = loadPage(xid, rid.page); } end; readlock(p->rwlatch,0); @@ -375,7 +375,7 @@ compensated_function void TreadRaw(int xid, recordid rid, void * dat) { int Tcommit(int xid) { lsn_t lsn; assert(xid >= 0); -#ifdef DEBUGGING +#ifdef DEBUGGING pthread_mutex_lock(&transactional_2_mutex); assert(numActiveXactions <= MAX_TRANSACTIONS); pthread_mutex_unlock(&transactional_2_mutex); @@ -420,15 +420,15 @@ int Tabort(int xid) { allocTransactionAbort(xid); - pthread_mutex_lock(&transactional_2_mutex); - - XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID; - numActiveXactions--; - assert( numActiveXactions >= 0 ); - pthread_mutex_unlock(&transactional_2_mutex); return 0; } - +int Tforget(int xid) { + TransactionLog * t = &XactionTable[xid%MAX_TRANSACTIONS]; + assert(t->xid == xid); + LogTransEnd(stasis_log_file, t); + stasis_transaction_table_forget(t->xid); + return 0; +} int Tdeinit() { int i; @@ -454,7 +454,7 @@ int Tdeinit() { slow_close = 0; } stasis_page_deinit(); - stasis_log_file->deinit(stasis_log_file); + stasis_log_file->close(stasis_log_file); dirtyPagesDeinit(); initted = 0; @@ -475,7 +475,7 @@ int TuncleanShutdown() { slow_close = 0; } stasis_page_deinit(); - stasis_log_file->deinit(stasis_log_file); + stasis_log_file->close(stasis_log_file); numActiveXactions = 0; dirtyPagesDeinit(); @@ -496,13 +496,13 @@ void stasis_transaction_table_active_transaction_count_set(int xid) { pthread_mutex_unlock(&transactional_2_mutex); } -lsn_t transactions_minRecLSN() { +lsn_t transactions_minRecLSN() { lsn_t minRecLSN = LSN_T_MAX; pthread_mutex_lock(&transactional_2_mutex); - for(int i = 0; i < MAX_TRANSACTIONS; i++) { - if(XactionTable[i].xid != INVALID_XTABLE_XID) { + for(int i = 0; i < MAX_TRANSACTIONS; i++) { + if(XactionTable[i].xid != INVALID_XTABLE_XID) { lsn_t recLSN = XactionTable[i].recLSN; - if(recLSN != -1 && recLSN < minRecLSN) { + if(recLSN != -1 && recLSN < minRecLSN) { minRecLSN = recLSN; } } @@ -531,7 +531,7 @@ int* TlistActiveTransactions() { pthread_mutex_unlock(&transactional_2_mutex); return ret; } -int TisActiveTransaction(int xid) { +int TisActiveTransaction(int xid) { if(xid < 0) { return 0; } pthread_mutex_lock(&transactional_2_mutex); int ret = xid != INVALID_XTABLE_XID && XactionTable[xid%MAX_TRANSACTIONS].xid == xid; @@ -579,11 +579,11 @@ int stasis_transaction_table_forget(int xid) { } int TdurabilityLevel() { - if(bufferManagerType == BUFFER_MANAGER_MEM_ARRAY) { + if(bufferManagerType == BUFFER_MANAGER_MEM_ARRAY) { return VOLATILE; - } else if(loggerType == LOG_TO_MEMORY) { + } else if(stasis_log_type == LOG_TO_MEMORY) { return PERSISTENT; - } else { + } else { return DURABLE; } } @@ -642,7 +642,7 @@ lsn_t TendNestedTopAction(int xid, void * handle) { // Ensure that the next action in this transaction points to the CLR. XactionTable[xid % MAX_TRANSACTIONS].prevLSN = clrLSN; - DEBUG("NestedTopAction CLR %d, LSN: %ld type: %ld (undoing: %ld, next to undo: %ld)\n", e->xid, + DEBUG("NestedTopAction CLR %d, LSN: %ld type: %ld (undoing: %ld, next to undo: %ld)\n", e->xid, clrLSN, undoneLSN, *prevLSN); free(h); diff --git a/stasis/flags.h b/stasis/flags.h index 91a2803..4c6128e 100644 --- a/stasis/flags.h +++ b/stasis/flags.h @@ -57,6 +57,23 @@ extern int stasis_suppress_unclean_shutdown_warnings; */ extern int stasis_truncation_automatic; +/** + This is the log implementation that is being used. + + Before Stasis is initialized it will be set to a default value. + It may be changed before Tinit() is called by assigning to it. + The default can be overridden at compile time by defining + USE_LOGGER. + + (eg: gcc ... -DSTASIS_LOG_TYPE=LOG_TO_FOO) + + @see constants.h for a list of recognized log implementations. + (The constants are named LOG_TO_*) + @todo rename LOG_TO_* constants to STASIS_LOG_TYPE_* + +*/ +extern int stasis_log_type; + extern char * stasis_log_file_name; extern int stasis_log_file_mode; extern int stasis_log_file_permissions; diff --git a/stasis/logger/inMemoryLog.h b/stasis/logger/inMemoryLog.h index d8f3115..248cc50 100644 --- a/stasis/logger/inMemoryLog.h +++ b/stasis/logger/inMemoryLog.h @@ -3,6 +3,6 @@ #include -stasis_log_t* open_InMemoryLog(); +stasis_log_t* stasis_log_impl_in_memory_open(); #endif diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index 15f7e0d..8efcea6 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -90,21 +90,6 @@ typedef enum { extern TransactionLog XactionTable[MAX_TRANSACTIONS]; -/** - This is the log implementation that is being used. - - Before Stasis is intialized it will be set to a default value. - It may be changed before Tinit() is called by assigning to it. - The default can be overridden at compile time by defining - USE_LOGGER. - - (eg: gcc ... -DUSE_LOGGER=LOG_TO_FOO) - - @see constants.h for a list of recognized log implementations. - (The constants are named LOG_TO_*) - -*/ -extern int loggerType; struct stasis_log_t { /** @@ -169,7 +154,7 @@ struct stasis_log_t { /** @return 0 on success */ - int (*deinit)(struct stasis_log_t* log); + int (*close)(struct stasis_log_t* log); int (*is_durable)(struct stasis_log_t* log); @@ -228,11 +213,10 @@ lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l); lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l); /** - Write a end transaction record @see XEND - - @todo Implement LogEnd + Write a end transaction record. This entry tells recovery's undo + phase that it may safely ignore the transaction. */ -void LogEnd (stasis_log_t* log, TransactionLog * l); +lsn_t LogTransEnd (stasis_log_t* log, TransactionLog * l); /** LogUpdate writes an UPDATELOG log record to the log tail. It diff --git a/stasis/transactional.h b/stasis/transactional.h index 65ac7c9..24d36bd 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -55,7 +55,7 @@ terms specified in this license. */ /** @mainpage Introduction to Stasis - + Stasis is a flexible transactional storage library. Unlike existing systems, it provides application and server developers with much freedom, but little guidance regarding page file layouts, @@ -116,18 +116,18 @@ terms specified in this license. @section compiling Compiling and installation Prerequisites: - - - automake 1.8+: needed to build from CVS - - check: A unit testing + + - automake 1.8+: needed to build from CVS + - check: A unit testing framework (needed to run the self-tests) Optional: - libconfuse: Used by older networking code to parse configuration options. - - BerkeleyDB: Used by the benchmarking code for purposes of comparison. - + - BerkeleyDB: Used by the benchmarking code for purposes of comparison. + Development is currently performed under Debian's Testing branch. - + To compile Stasis, first check out a copy with SVN. If you have commit access: @verbatim @@ -141,17 +141,17 @@ terms specified in this license. @endverbatim then: - + @code - + $ ./reconf $ ./configure --quiet $ make -j4 > /dev/null $ cd test/stasis $ make check - + @endcode - + This will fail if your system defaults to an old (pre-1.7) version of autotools. Fortunately, multiple versions of autotools may exist on the same system. Execute the following commands to @@ -183,22 +183,22 @@ terms specified in this license. @include examples/ex1.c - Hopefully, Tbegin(), Talloc(), Tset(), Tcommit(), Tabort() and Tdealloc() are - self explanatory. If not, they are covered in detail elsewhere. Tinit() and + Hopefully, Tbegin(), Talloc(), Tset(), Tcommit(), Tabort() and Tdealloc() are + self explanatory. If not, they are covered in detail elsewhere. Tinit() and Tdeinit() initialize the library, and clean up when the program is finished. - + Other particularly useful functions are ThashCreate(), ThashDelete(), ThashInsert(), ThashRemove(), and ThashLookup() which provide a re-entrant linear hash implementation. ThashIterator() and ThashNext() provide an iterator over the hashtable's values. - + @subsection bootstrap Reopening a closed data store - - Stasis imposes as little structure upon the application's data structures as + + Stasis imposes as little structure upon the application's data structures as possible. Therefore, it does not maintain any information about the contents - or naming of objects within the page file. This means that the application + or naming of objects within the page file. This means that the application must maintain such information manually. - + In order to facilitate this, Stasis provides the function TrecordType() and guarantees that the first recordid returned by any allocation will point to the same page and slot as the constant @@ -206,10 +206,10 @@ terms specified in this license. record passed to it does not exist. A second function, TrecordSize() returns the size of a record in bytes, or -1 if the record does not exist. - + Therefore, the following code (found in examples/ex2.c) will safely initialize or reopen a data store: - + @include examples/ex2.c @todo Explain how to determine the correct value of rootEntry.size in the case @@ -476,7 +476,7 @@ terms specified in this license. * @defgroup LOGGING_DISCIPLINES Logging Disciplines * * Stasis' log API provides a number of methods that directly - * manipulate the log. + * manipulate the log. * * @section SNF STEAL/NO-FORCE recovery * Stasis includes a function, Tupdate(), that @@ -521,13 +521,13 @@ terms specified in this license. /** - * @file + * @file * * Defines Stasis' primary interface. * * * - * @todo error handling + * @todo error handling * * @ingroup LLADD_CORE * $Id$ @@ -607,12 +607,12 @@ int Tbegin(); * * @see operations.h For an overview of the operations API */ -compensated_function void Tupdate(int xid, pageid_t page, +compensated_function void Tupdate(int xid, pageid_t page, const void *dat, size_t datlen, int op); /** @deprecated Only exists to work around swig/python limitations. */ -compensated_function void TupdateStr(int xid, pageid_t page, +compensated_function void TupdateStr(int xid, pageid_t page, const char *dat, size_t datlen, int op); void TreorderableUpdate(int xid, void * h, pageid_t page, @@ -632,7 +632,7 @@ void TreorderableWritebackUpdate(int xid, void* h, /** * Read the value of a record. - * + * * @param xid transaction ID * @param rid reference to page/slot * @param dat buffer into which data goes @@ -650,9 +650,9 @@ compensated_function void TreadRaw(int xid, recordid rid, void *dat); compensated_function void TreadStr(int xid, recordid rid, char *dat); /** - * Commit an active transaction. Each transaction should be completed + * Commit an active transaction. Each transaction should be completed * with exactly one call to Tcommit() or Tabort(). - * + * * @param xid transaction ID * @return 0 on success */ @@ -661,8 +661,8 @@ int Tcommit(int xid); /** * Abort (rollback) an active transaction. Each transaction should be * completed with exactly one call to Tcommit() or Tabort(). - * - * @param xid transaction ID + * + * @param xid transaction ID * @return 0 on success, -1 on error. */ int Tabort(int xid); @@ -689,7 +689,7 @@ int TuncleanShutdown(); * Used by the recovery process. * Revives Tprepare'ed transactions. * - * @param xid The xid that is to be revived. + * @param xid The xid that is to be revived. * @param prevlsn The lsn of that xid's most recent PREPARE entry in the log. * @param reclsn The lsn of the transaction's BEGIN record. */ @@ -753,7 +753,7 @@ void stasis_transaction_table_max_transaction_id_set(int xid); /** * Used by test cases to mess with internal transaction table state. * - * @param xid The new active transaction count. + * @param xid The new active transaction count. */ void stasis_transaction_table_active_transaction_count_set(int xid); @@ -783,6 +783,8 @@ int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn, lsn_t prevLSN, lsn_t recLSN); int stasis_transaction_table_forget(int xid); + +int Tforget(int xid); /** This is used by log truncation. */ diff --git a/test/stasis/check_blobRecovery.c b/test/stasis/check_blobRecovery.c index a306694..17195c9 100644 --- a/test/stasis/check_blobRecovery.c +++ b/test/stasis/check_blobRecovery.c @@ -414,7 +414,7 @@ Suite * check_suite(void) { TCase *tc = tcase_create("recovery"); tcase_set_timeout(tc, 0); // disable timeouts - if(LOG_TO_MEMORY != loggerType) { + if(LOG_TO_MEMORY != stasis_log_type) { /* void * foobar; */ /* used to supress warnings. */ /* Sub tests are added, one per line, here */ tcase_add_test(tc, recoverBlob__idempotent); diff --git a/test/stasis/check_logWriter.c b/test/stasis/check_logWriter.c index 2e0d021..0b5518f 100644 --- a/test/stasis/check_logWriter.c +++ b/test/stasis/check_logWriter.c @@ -40,10 +40,10 @@ permission to use and distribute the software in accordance with the terms specified in this license. ---*/ -/** +/** @file check_logWriter - - Tests logWriter. + + Tests logWriter. @todo Get rid of include for logWriter.h (stop calling deleteLogWriter, syncLog_logWriter...) */ @@ -92,7 +92,7 @@ static void setup_log() { assert(test <= e->LSN); - if(first) { + if(first) { first = 0; firstLSN = prevLSN; } @@ -122,7 +122,7 @@ static void setup_log() { } } /** - @test + @test Quick test of log writer and log handler. Not very extensive. Just writes out 3000 log entries, checks that 1000 of them make @@ -139,8 +139,9 @@ static void setup_log() { @todo Test logHandle more thoroughly. (Still need to test the guard mechanism.) */ -START_TEST(loggerTest) -{ +static void loggerTest(int logType) { + + stasis_log_type = logType; const LogEntry * e; LogHandle* h; int i = 0; @@ -161,16 +162,21 @@ START_TEST(loggerTest) stasis_log_safe_writes_delete(stasis_log_file_name); Tdeinit(); } -END_TEST - -/** +START_TEST(loggerFileTest) { + loggerTest(LOG_TO_FILE); +} END_TEST +START_TEST(loggerMemTest) { + loggerTest(LOG_TO_MEMORY); +} END_TEST +/** @test Checks for a bug ecountered during devlopment. What happens when previousInTransaction is called immediately after the handle is allocated? */ -START_TEST(logHandleColdReverseIterator) { +static void logHandleColdReverseIterator(int logType) { const LogEntry * e; + stasis_log_type = logType; setup_log(); LogHandle* lh = getLogHandle(stasis_log_file); int i = 0; @@ -180,7 +186,7 @@ START_TEST(logHandleColdReverseIterator) { freeLogEntry(e); i++; } - + i = 0; lh = getLSNHandle(stasis_log_file, e->LSN); while((e = previousInTransaction(lh))) { @@ -191,14 +197,20 @@ START_TEST(logHandleColdReverseIterator) { assert(i <= 4); /* We should almost immediately hit a clr that goes to the beginning of the log... */ Tdeinit(); } -END_TEST +START_TEST(logHandleFileColdReverseIterator) { + logHandleColdReverseIterator(LOG_TO_FILE); +} END_TEST +START_TEST(logHandleMemColdReverseIterator) { + logHandleColdReverseIterator(LOG_TO_MEMORY); +} END_TEST -/** +/** @test Build a simple log, truncate it, and then test the logWriter routines against it. */ -START_TEST(loggerTruncate) { +static void loggerTruncate(int logType) { + stasis_log_type = logType; const LogEntry * le; const LogEntry * le2; const LogEntry * le3 = NULL; @@ -212,21 +224,21 @@ START_TEST(loggerTruncate) { i++; le = nextInLog(lh); } - + le2 = nextInLog(lh); i = 0; while(i < 23) { i++; le3 = nextInLog(lh); } - + stasis_log_file->truncate(stasis_log_file, le->LSN); - + tmp = stasis_log_file->read_entry(stasis_log_file, le->LSN); fail_unless(NULL != tmp, NULL); fail_unless(tmp->LSN == le->LSN, NULL); - + freeLogEntry(tmp); tmp = stasis_log_file->read_entry(stasis_log_file, le2->LSN); @@ -238,19 +250,19 @@ START_TEST(loggerTruncate) { fail_unless(NULL != tmp, NULL); fail_unless(tmp->LSN == le3->LSN, NULL); - + freeLogEntry(tmp); freeLogHandle(lh); lh = getLogHandle(stasis_log_file); - + i = 0; - + freeLogEntry(le); freeLogEntry(le2); freeLogEntry(le3); while((le = nextInLog(lh))) { - if(le->type != INTERNALLOG) { + if(le->type != INTERNALLOG) { i++; } freeLogEntry(le); @@ -258,7 +270,12 @@ START_TEST(loggerTruncate) { assert(i == (3000 - 234 + 1)); freeLogHandle(lh); Tdeinit(); - +} +START_TEST(loggerFileTruncate) { + loggerTruncate(LOG_TO_FILE); +} END_TEST +START_TEST(loggerMemTruncate) { + loggerTruncate(LOG_TO_MEMORY); } END_TEST #define ENTRIES_PER_THREAD 200 @@ -277,7 +294,7 @@ static void* worker_thread(void * arg) { lsn_t lsns[ENTRIES_PER_THREAD]; - for(i = 0; i < ENTRIES_PER_THREAD; i++) { + for(i = 0; i < ENTRIES_PER_THREAD; i++) { lsns[i] = 0; } i = 0; @@ -293,7 +310,7 @@ static void* worker_thread(void * arg) { threshold = (int) (2000.0*random()/(RAND_MAX+1.0)); entry = (long) (ENTRIES_PER_THREAD*random()/(RAND_MAX+1.0)); - if(threshold < 3) { + if(threshold < 3) { if(i > 10) { needToTruncate = 1; if(lsns[i - 10] > truncated_to) { @@ -305,32 +322,32 @@ static void* worker_thread(void * arg) { pthread_mutex_unlock(&random_mutex); - if(needToTruncate) { -#ifdef NO_CONCURRENCY + if(needToTruncate) { +#ifdef NO_CONCURRENCY pthread_mutex_lock(&big); #endif stasis_log_file->truncate(stasis_log_file, myTruncVal); -#ifdef NO_CONCURRENCY +#ifdef NO_CONCURRENCY pthread_mutex_unlock(&big); -#endif +#endif assert(stasis_log_file->truncation_point(stasis_log_file) >= myTruncVal); } if(threshold < 3) { } else { le->xid = i+key; -#ifdef NO_CONCURRENCY +#ifdef NO_CONCURRENCY pthread_mutex_lock(&big); #endif stasis_log_file->write_entry(stasis_log_file,le); -#ifdef NO_CONCURRENCY +#ifdef NO_CONCURRENCY pthread_mutex_unlock(&big); #endif lsns[i] = le->LSN; i++; } pthread_mutex_lock(&random_mutex); -#ifdef NO_CONCURRENCY +#ifdef NO_CONCURRENCY pthread_mutex_lock(&big); #endif if(lsns[entry] > truncated_to && entry < i) { @@ -341,13 +358,13 @@ static void* worker_thread(void * arg) { assert(e->xid == entry+key); freeLogEntry(e); - } else { + } else { pthread_mutex_unlock(&random_mutex); } -#ifdef NO_CONCURRENCY +#ifdef NO_CONCURRENCY pthread_mutex_unlock(&big); #endif - + /* Try to interleave requests as much as possible */ sched_yield(); freeLogEntry(le); @@ -357,8 +374,8 @@ static void* worker_thread(void * arg) { return 0; } - -START_TEST(loggerCheckWorker) { +static void loggerCheckWorker(int logType) { + stasis_log_type = logType; int four = 4; pthread_mutex_init(&random_mutex, NULL); @@ -367,9 +384,16 @@ START_TEST(loggerCheckWorker) { worker_thread(&four); Tdeinit(); +} +START_TEST(loggerFileCheckWorker) { + loggerCheckWorker(LOG_TO_FILE); +} END_TEST +START_TEST(loggerMemCheckWorker) { + loggerCheckWorker(LOG_TO_MEMORY); } END_TEST -START_TEST(loggerCheckThreaded) { +static void loggerCheckThreaded(int logType) { + stasis_log_type = logType; #define THREAD_COUNT 100 pthread_t workers[THREAD_COUNT]; @@ -386,9 +410,16 @@ START_TEST(loggerCheckThreaded) { } Tdeinit(); +} + +START_TEST(loggerFileCheckThreaded) { + loggerCheckThreaded(LOG_TO_FILE); +} END_TEST +START_TEST(loggerMemCheckThreaded) { + loggerCheckThreaded(LOG_TO_MEMORY); } END_TEST -void reopenLogWorkload(int truncating) { +void reopenLogWorkload(int truncating) { stasis_operation_table_init(); stasis_truncation_automatic = 0; @@ -397,12 +428,12 @@ void reopenLogWorkload(int truncating) { stasis_transaction_table_active_transaction_count_set(0); - if(LOG_TO_FILE == loggerType) { + if(LOG_TO_FILE == stasis_log_type) { stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file_mode, stasis_log_file_permissions); - } else if(LOG_TO_MEMORY == loggerType) { - stasis_log_file = open_InMemoryLog(); + } else if(LOG_TO_MEMORY == stasis_log_type) { + stasis_log_file = stasis_log_impl_in_memory_open(); } else { assert(stasis_log_file != NULL); } @@ -418,24 +449,24 @@ void reopenLogWorkload(int truncating) { for(int i = 0; i < ENTRY_COUNT; i++) { entries[i] = LogUpdate(stasis_log_file, - &l, NULL, OPERATION_NOOP, NULL, 0); + &l, NULL, OPERATION_NOOP, NULL, 0); if(i == SYNC_POINT) { - if(truncating) { + if(truncating) { stasis_log_file->truncate(stasis_log_file,entries[i]->LSN); startLSN = entries[i]->LSN; } } } - stasis_log_file->deinit(stasis_log_file); + stasis_log_file->close(stasis_log_file); - if(LOG_TO_FILE == loggerType) { + if(LOG_TO_FILE == stasis_log_type) { stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file_mode, stasis_log_file_permissions); - } else if(LOG_TO_MEMORY == loggerType) { - stasis_log_file = open_InMemoryLog(); + } else if(LOG_TO_MEMORY == stasis_log_type) { + stasis_log_file = stasis_log_impl_in_memory_open(); } else { assert(stasis_log_file != NULL); } @@ -443,51 +474,51 @@ void reopenLogWorkload(int truncating) { LogHandle * h; int i; - if(truncating) { + if(truncating) { h = getLogHandle(stasis_log_file); i = SYNC_POINT; - } else { + } else { h = getLogHandle(stasis_log_file); i = 0; - } + } const LogEntry * e; - while((e = nextInLog(h))) { - if(e->type != INTERNALLOG) { + while((e = nextInLog(h))) { + if(e->type != INTERNALLOG) { assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i])); assert(!memcmp(e, entries[i], sizeofLogEntry(entries[i]))); assert(i < ENTRY_COUNT); i++; } } - + assert(i == (ENTRY_COUNT)); LogEntry * entries2[ENTRY_COUNT]; for(int i = 0; i < ENTRY_COUNT; i++) { entries2[i] = LogUpdate(stasis_log_file, &l, NULL, OPERATION_NOOP, NULL, 0); - if(i == SYNC_POINT) { + if(i == SYNC_POINT) { stasis_log_file->force_tail(stasis_log_file, LOG_FORCE_COMMIT); } } freeLogHandle(h); - if(truncating) { + if(truncating) { h = getLSNHandle(stasis_log_file, startLSN); i = SYNC_POINT; - } else { + } else { h = getLogHandle(stasis_log_file); i = 0; - } + } - while((e = nextInLog(h))) { - if(e->type != INTERNALLOG) { - if( i < ENTRY_COUNT) { + while((e = nextInLog(h))) { + if(e->type != INTERNALLOG) { + if( i < ENTRY_COUNT) { assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i])); assert(!memcmp(e, entries[i], sizeofLogEntry(entries[i]))); - } else { + } else { assert(i < ENTRY_COUNT * 2); assert(sizeofLogEntry(e) == sizeofLogEntry(entries2[i-ENTRY_COUNT])); assert(!memcmp(e, entries2[i-ENTRY_COUNT], sizeofLogEntry(entries2[i-ENTRY_COUNT]))); @@ -497,19 +528,21 @@ void reopenLogWorkload(int truncating) { } freeLogHandle(h); - assert(i == (ENTRY_COUNT * 2)); + assert(i == (ENTRY_COUNT * 2)); stasis_truncation_automatic = 1; - stasis_log_file->deinit(stasis_log_file); + stasis_log_file->close(stasis_log_file); } START_TEST(loggerReopenTest) { - stasis_log_safe_writes_delete(stasis_log_file_name); + stasis_log_type = LOG_TO_FILE; + stasis_log_safe_writes_delete(stasis_log_file_name); reopenLogWorkload(0); } END_TEST -START_TEST(loggerTruncateReopenTest) { +START_TEST(loggerTruncateReopenTest) { + stasis_log_type = LOG_TO_FILE; stasis_log_safe_writes_delete(stasis_log_file_name); reopenLogWorkload(1); } END_TEST @@ -520,19 +553,23 @@ Suite * check_suite(void) { TCase *tc = tcase_create("writeNew"); tcase_set_timeout(tc, 0); /* Sub tests are added, one per line, here */ - - tcase_add_test(tc, loggerTest); - tcase_add_test(tc, logHandleColdReverseIterator); - tcase_add_test(tc, loggerTruncate); - tcase_add_test(tc, loggerCheckWorker); - tcase_add_test(tc, loggerCheckThreaded); - if(loggerType != LOG_TO_MEMORY) { + tcase_add_test(tc, loggerFileTest); + tcase_add_test(tc, loggerMemTest); + tcase_add_test(tc, logHandleFileColdReverseIterator); + tcase_add_test(tc, logHandleMemColdReverseIterator); + tcase_add_test(tc, loggerFileTruncate); + tcase_add_test(tc, loggerMemTruncate); + tcase_add_test(tc, loggerFileCheckWorker); + tcase_add_test(tc, loggerMemCheckWorker); + tcase_add_test(tc, loggerFileCheckThreaded); + tcase_add_test(tc, loggerMemCheckThreaded); + if(stasis_log_type != LOG_TO_MEMORY) { tcase_add_test(tc, loggerReopenTest); tcase_add_test(tc, loggerTruncateReopenTest); } /* --------------------------------------------- */ - + tcase_add_checked_fixture(tc, setup, teardown); diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 96aa618..ef1f070 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -697,7 +697,7 @@ Suite * check_suite(void) { tcase_add_test(tc, operation_physical_do_undo); tcase_add_test(tc, operation_nestedTopAction); tcase_add_test(tc, operation_set_range); - if(loggerType != LOG_TO_MEMORY) { + if(stasis_log_type != LOG_TO_MEMORY) { tcase_add_test(tc, operation_prepare); } tcase_add_test(tc, operation_alloc_test); diff --git a/test/stasis/check_pageOperations.c b/test/stasis/check_pageOperations.c index e66edfd..991186d 100644 --- a/test/stasis/check_pageOperations.c +++ b/test/stasis/check_pageOperations.c @@ -212,7 +212,7 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ tcase_add_test(tc, pageOpCheckAllocDealloc); - if(LOG_TO_MEMORY != loggerType) { + if(LOG_TO_MEMORY != stasis_log_type) { tcase_add_test(tc, pageOpCheckRecovery); } /* --------------------------------------------- */ diff --git a/test/stasis/check_recovery.c b/test/stasis/check_recovery.c index 2a54d47..9186452 100644 --- a/test/stasis/check_recovery.c +++ b/test/stasis/check_recovery.c @@ -484,7 +484,7 @@ Suite * check_suite(void) { tcase_set_timeout(tc, 0); // disable timeouts - if(LOG_TO_MEMORY != loggerType) { + if(LOG_TO_MEMORY != stasis_log_type) { /* Sub tests are added, one per line, here */ tcase_add_test(tc, recovery_idempotent);