From 1409a9eb27a148779aacdb8423850693465cef57 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 31 Dec 2009 23:01:37 +0000 Subject: [PATCH] more refactoring; log handles now manage their log entries memory --- benchmarks/sequentialThroughput.c | 7 +-- src/stasis/logger/logEntry.c | 18 ++++---- src/stasis/logger/logHandle.c | 8 ++++ src/stasis/logger/logger2.c | 22 +++++----- src/stasis/logger/safeWrites.c | 28 ++++++------ src/stasis/operations/pageOperations.c | 4 +- src/stasis/operations/regions.c | 6 +-- src/stasis/recovery2.c | 3 -- src/stasis/transactional2.c | 14 +++--- stasis/logger/logEntry.h | 14 +++--- stasis/logger/logHandle.h | 3 ++ stasis/operations/pageOperations.h | 2 +- stasis/operations/regions.h | 2 +- test/stasis/check_logEntry.c | 17 ++++++-- test/stasis/check_logWriter.c | 60 +++++++++++++++++--------- test/stasis/check_multiplexer.c | 4 +- test/stasis/check_operations.c | 4 +- utilities/logfile_dump.c | 1 - 18 files changed, 123 insertions(+), 94 deletions(-) diff --git a/benchmarks/sequentialThroughput.c b/benchmarks/sequentialThroughput.c index b4f169c..d1258e3 100644 --- a/benchmarks/sequentialThroughput.c +++ b/benchmarks/sequentialThroughput.c @@ -95,10 +95,11 @@ int main(int argc, char ** argv) { if(log_mode) { lsn_t prevLSN = -1; byte * arg = calloc(PAGE_SIZE, 1); - LogEntry * e = allocUpdateLogEntry(prevLSN, -1, OPERATION_NOOP, + stasis_log_t * l = stasis_log(); + + LogEntry * e = allocUpdateLogEntry(l, prevLSN, -1, OPERATION_NOOP, 0, PAGE_SIZE); memcpy(stasis_log_entry_update_args_ptr(e), arg, PAGE_SIZE); - stasis_log_t * l = stasis_log(); for(long i = 0; i < page_count; i++) { void * h; LogEntry * e2 = l->reserve_entry(l, sizeofLogEntry(l, e), &h); @@ -107,7 +108,7 @@ int main(int argc, char ** argv) { memcpy(e2, e, sizeofLogEntry(l, e)); l->entry_done(l, e2, h); } - freeLogEntry(e); + freeLogEntry(l, e); free(arg); } else { if(stake) { diff --git a/src/stasis/logger/logEntry.c b/src/stasis/logger/logEntry.c index ffac83a..555818b 100644 --- a/src/stasis/logger/logEntry.c +++ b/src/stasis/logger/logEntry.c @@ -46,7 +46,7 @@ terms specified in this license. #include -LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) { +LogEntry * allocCommonLogEntry(stasis_log_t* log, lsn_t prevLSN, int xid, unsigned int type) { LogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)); ret->LSN = -1; ret->prevLSN = prevLSN; @@ -54,7 +54,7 @@ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) { ret->type = type; return ret; } -LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN) { +LogEntry * allocPrepareLogEntry(stasis_log_t* log, lsn_t prevLSN, int xid, lsn_t recLSN) { LogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeof(lsn_t)); ret->LSN = -1; ret->prevLSN = prevLSN; @@ -86,7 +86,7 @@ lsn_t getPrepareRecLSN(const LogEntry *e) { return ret; } -LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, +LogEntry * allocUpdateLogEntry(stasis_log_t* log, lsn_t prevLSN, int xid, unsigned int op, pageid_t page, unsigned int arg_size) { /** Use calloc since the struct might not be packed in memory; @@ -107,7 +107,7 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, return ret; } -LogEntry * allocCLRLogEntry(const LogEntry * old_e) { +LogEntry * allocCLRLogEntry(stasis_log_t* log, const LogEntry * old_e) { CLRLogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeofLogEntry(0, old_e)); ret->LSN = -1; @@ -119,18 +119,18 @@ LogEntry * allocCLRLogEntry(const LogEntry * old_e) { return (LogEntry*)ret; } -void freeLogEntry(const LogEntry* e) { +void freeLogEntry(stasis_log_t* log, const LogEntry* e) { free((void*)e); } -lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e) { +lsn_t sizeofLogEntry(stasis_log_t * log, const LogEntry * e) { switch (e->type) { case CLRLOG: { const LogEntry * contents = getCLRCompensated((const CLRLogEntry*) e); assert(contents->type != CLRLOG); - return sizeof(struct __raw_log_entry) + sizeofLogEntry(lh, contents); + return sizeof(struct __raw_log_entry) + sizeofLogEntry(log, contents); } case UPDATELOG: { @@ -138,8 +138,8 @@ lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e) { sizeof(UpdateLogEntry) + e->update.arg_size; } case INTERNALLOG: - assert(lh); - return lh->sizeof_internal_entry(lh,e); + assert(log); + return log->sizeof_internal_entry(log,e); case XPREPARE: return sizeof(struct __raw_log_entry)+sizeof(lsn_t); default: diff --git a/src/stasis/logger/logHandle.c b/src/stasis/logger/logHandle.c index 4db6d74..fe5a83d 100644 --- a/src/stasis/logger/logHandle.c +++ b/src/stasis/logger/logHandle.c @@ -46,6 +46,8 @@ struct LogHandle { lsn_t next_offset; /** The LSN of the log entry that we would return if previous is called. */ lsn_t prev_offset; + /** The last LogEntry this iterator returned */ + const LogEntry * last; /** The log this iterator traverses. */ stasis_log_t* log; }; @@ -69,27 +71,33 @@ LogHandle* getLSNHandle(stasis_log_t * log, lsn_t lsn) { LogHandle* ret = malloc(sizeof(*ret)); ret->next_offset = lsn; ret->prev_offset = lsn; + ret->last = 0; ret->log = log; return ret; } void freeLogHandle(LogHandle* lh) { + if(lh->last) { freeLogEntry(lh->log, lh->last); } free(lh); } const LogEntry * nextInLog(LogHandle * h) { + if(h->last) { freeLogEntry(h->log, h->last); } const LogEntry * ret = h->log->read_entry(h->log,h->next_offset); if(ret != NULL) { set_offsets(h, ret); } + h->last = ret; return ret; } const LogEntry * previousInTransaction(LogHandle * h) { const LogEntry * ret = NULL; + if(h->last) { freeLogEntry(h->log, h->last); } if(h->prev_offset > 0) { ret = h->log->read_entry(h->log, h->prev_offset); set_offsets(h, ret); } + h->last = ret; return ret; } diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index bb8fbb1..189f085 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -53,7 +53,7 @@ terms specified in this license. #include static lsn_t stasis_log_write_common(stasis_log_t* log, stasis_transaction_table_entry_t * l, int type) { - LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type); + LogEntry * e = allocCommonLogEntry(log, l->prevLSN, l->xid, type); lsn_t ret; log->write_entry(log, e); @@ -68,13 +68,13 @@ static lsn_t stasis_log_write_common(stasis_log_t* log, stasis_transaction_table ret = e->LSN; - freeLogEntry(e); + freeLogEntry(log, e); return ret; } static lsn_t stasis_log_write_prepare(stasis_log_t* log, stasis_transaction_table_entry_t * l) { - LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN); + LogEntry * e = allocPrepareLogEntry(log, l->prevLSN, l->xid, l->recLSN); lsn_t ret; DEBUG("Log prepare xid = %d prevlsn = %lld reclsn = %lld, %lld\n", @@ -90,7 +90,7 @@ static lsn_t stasis_log_write_prepare(stasis_log_t* log, stasis_transaction_tabl ret = e->LSN; - freeLogEntry(e); + freeLogEntry(log, e); return ret; @@ -100,7 +100,7 @@ LogEntry * stasis_log_write_update(stasis_log_t* log, stasis_transaction_table_e pageid_t page, unsigned int op, const byte * arg, size_t arg_size) { - LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op, + LogEntry * e = allocUpdateLogEntry(log, l->prevLSN, l->xid, op, page, arg_size); memcpy(stasis_log_entry_update_args_ptr(e), arg, arg_size); log->write_entry(log, e); @@ -115,7 +115,7 @@ LogEntry * stasis_log_write_update(stasis_log_t* log, stasis_transaction_table_e LogEntry * stasis_log_begin_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, unsigned int op, const byte * arg, size_t arg_size) { - LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op, INVALID_PAGE, arg_size); + LogEntry * e = allocUpdateLogEntry(log, l->prevLSN, l->xid, op, INVALID_PAGE, arg_size); memcpy(stasis_log_entry_update_args_ptr(e), arg, arg_size); return e; } @@ -125,28 +125,28 @@ lsn_t stasis_log_end_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; } lsn_t ret = l->prevLSN = e->LSN; // pthread_mutex_unlock(&l->mut); - freeLogEntry(e); + freeLogEntry(log, e); return ret; } lsn_t stasis_log_write_clr(stasis_log_t* log, const LogEntry * old_e) { - LogEntry * e = allocCLRLogEntry(old_e); + LogEntry * e = allocCLRLogEntry(log, old_e); log->write_entry(log, e); DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid, e->LSN, LSN, prevLSN); lsn_t ret = e->LSN; - freeLogEntry(e); + freeLogEntry(log, e); return ret; } lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid, lsn_t prevLSN) { // XXX waste of log bandwidth. - const LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP, + const LogEntry * e = allocUpdateLogEntry(log, prevLSN, xid, OPERATION_NOOP, INVALID_PAGE, 0); lsn_t ret = stasis_log_write_clr(log, e); - freeLogEntry(e); + freeLogEntry(log, e); return ret; } diff --git a/src/stasis/logger/safeWrites.c b/src/stasis/logger/safeWrites.c index 7494917..b00994e 100644 --- a/src/stasis/logger/safeWrites.c +++ b/src/stasis/logger/safeWrites.c @@ -244,13 +244,13 @@ static inline void log_crc_reset(stasis_log_safe_writes_state* sw) { static inline void log_crc_update(stasis_log_t* log, const LogEntry * e, unsigned int * crc) { *crc = stasis_crc32(e, sizeofLogEntry(log, e), *crc); } -static LogEntry* log_crc_dummy_entry() { - LogEntry* ret = allocCommonLogEntry(0, -1, INTERNALLOG); +static LogEntry* log_crc_dummy_entry(stasis_log_t *log) { + LogEntry* ret = allocCommonLogEntry(log, 0, -1, INTERNALLOG); assert(ret->prevLSN == 0); return ret; } -static LogEntry* log_crc_entry(unsigned int crc) { - LogEntry* ret = allocCommonLogEntry(crc, -1, INTERNALLOG); +static LogEntry* log_crc_entry(stasis_log_t *log, unsigned int crc) { + LogEntry* ret = allocCommonLogEntry(log, crc, -1, INTERNALLOG); return ret; } @@ -279,13 +279,13 @@ static inline lsn_t log_crc_next_lsn(stasis_log_t* log, lsn_t ret) { (unsigned int) le->prevLSN, crc, le->LSN); // The log wasn't successfully forced to this point; discard // everything after the last CRC. - freeLogEntry(le); + freeLogEntry(log, le); break; } } else { log_crc_update(log, le, &crc); } - freeLogEntry(le); + freeLogEntry(log, le); } return ret; } @@ -411,7 +411,7 @@ static void syncLog_LogWriter(stasis_log_t * log, newFlushedLSN = sw->nextAvailableLSN; pthread_mutex_unlock(&sw->nextAvailableLSN_mutex); - LogEntry* crc_entry = log_crc_entry(sw->crc); + LogEntry* crc_entry = log_crc_entry(log, sw->crc); writeLogEntryUnlocked(log, crc_entry); free(crc_entry); // Reset log_crc to zero each time a crc entry is written. @@ -631,28 +631,24 @@ static int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { } nextLSN = nextEntry_LogWriter(log, le); + LogEntry *firstCRC = 0; // zero out crc of first entry during copy if(firstInternalEntry && le->type == INTERNALLOG) { - LogEntry * firstCRC = malloc(size); + firstCRC = malloc(size); memcpy(firstCRC, le, size); - freeLogEntry(le); firstCRC->prevLSN = 0; le = firstCRC; + firstInternalEntry = 0; } lengthOfCopiedLog += (size + sizeof(lsn_t)); myFwrite(&size, sizeof(lsn_t), tmpLog); myFwrite(le, size, tmpLog); - if(firstInternalEntry && le->type == INTERNALLOG) { - free((void*)le); // remove const qualifier + free - firstInternalEntry = 0; - } else { - freeLogEntry(le); - } + if(firstCRC) { free(firstCRC); } } freeLogHandle(lh); - LogEntry * crc_entry = log_crc_dummy_entry(); + LogEntry * crc_entry = log_crc_dummy_entry(log); pthread_mutex_lock(&sw->nextAvailableLSN_mutex); crc_entry->LSN = sw->nextAvailableLSN; diff --git a/src/stasis/operations/pageOperations.c b/src/stasis/operations/pageOperations.c index 432b6a3..e9d6c13 100644 --- a/src/stasis/operations/pageOperations.c +++ b/src/stasis/operations/pageOperations.c @@ -76,9 +76,9 @@ int TpageSetRange(int xid, pageid_t page, int offset, const void * memAddr, int This calls loadPage and releasePage directly, and bypasses the logger. */ -compensated_function void pageOperationsInit() { +compensated_function void pageOperationsInit(stasis_log_t *log) { - regionsInit(); + regionsInit(log); boundary_tag t; recordid rid = {0, 0, sizeof(boundary_tag)}; diff --git a/src/stasis/operations/regions.c b/src/stasis/operations/regions.c index 67836e0..00c3119 100644 --- a/src/stasis/operations/regions.c +++ b/src/stasis/operations/regions.c @@ -134,7 +134,7 @@ static void TdeallocBoundaryTag(int xid, pageid_t page) { } -void regionsInit() { +void regionsInit(stasis_log_t *log) { Page * p = loadPage(-1, 0); holding_mutex = pthread_self(); @@ -155,13 +155,13 @@ void regionsInit() { // recordid rid = {0,0,sizeof(boundary_tag)}; // hack; allocate a fake log entry; pass it into ourselves. - LogEntry * e = allocUpdateLogEntry(0,0,OPERATION_ALLOC_BOUNDARY_TAG, + LogEntry * e = allocUpdateLogEntry(log, 0,0,OPERATION_ALLOC_BOUNDARY_TAG, p->id, sizeof(boundary_tag)); memcpy(stasis_log_entry_update_args_ptr(e), &t, sizeof(boundary_tag)); writelock(p->rwlatch,0); op_alloc_boundary_tag(e,p); unlock(p->rwlatch); - freeLogEntry(e); + freeLogEntry(log, e); } holding_mutex = 0; releasePage(p); diff --git a/src/stasis/recovery2.c b/src/stasis/recovery2.c index df48803..9a0909b 100644 --- a/src/stasis/recovery2.c +++ b/src/stasis/recovery2.c @@ -133,7 +133,6 @@ static void stasis_recovery_analysis(stasis_log_t* log, stasis_transaction_table default: abort(); } - freeLogEntry(e); } freeLogHandle(lh); } @@ -228,7 +227,6 @@ static void stasis_recovery_redo(stasis_log_t* log, stasis_transaction_table_t * abort(); } } // end switch - freeLogEntry(e); } // end loop freeLogHandle(lh); } @@ -329,7 +327,6 @@ static void stasis_recovery_undo(stasis_log_t* log, stasis_transaction_table_t * abort(); } } - freeLogEntry(e); } if(!prepared) { // Log an XEND, remove transaction from stasis_transaction_table. diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 1ced2fc..b6f7a2f 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -94,7 +94,7 @@ int Tinit() { stasis_buffer_manager = stasis_buffer_manager_factory(stasis_log_file, stasis_dirty_page_table); stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table, stasis_buffer_manager); // xxx circular dependency. - pageOperationsInit(); + pageOperationsInit(stasis_log_file); stasis_allocation_policy = stasis_allocation_policy_init(); stasis_alloc = stasis_alloc_init(stasis_transaction_table, stasis_allocation_policy); @@ -160,7 +160,7 @@ compensated_function void Tupdate(int xid, pageid_t page, assert(xact->prevLSN == e->LSN); DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); stasis_operation_do(e, p); - freeLogEntry(e); + freeLogEntry(stasis_log_file, e); if(p) unlock(p->rwlatch); if(p) releasePage(p); @@ -180,7 +180,7 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page, pthread_mutex_lock(&h->mut); - LogEntry * e = allocUpdateLogEntry(-1, h->l->xid, op, + LogEntry * e = allocUpdateLogEntry(h->log, -1, h->l->xid, op, p->id, datlen); memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen); @@ -193,11 +193,11 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page, unlock(p->rwlatch); pthread_mutex_unlock(&h->mut); // page will be released by the log handle... - freeLogEntry(e); + freeLogEntry(stasis_log_file, e); } lsn_t TwritebackUpdate(int xid, pageid_t page, const void *dat, size_t datlen, int op) { - LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, datlen); + LogEntry * e = allocUpdateLogEntry(stasis_log_file, -1, xid, op, page, datlen); memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen); stasis_transaction_table_entry_t* l = stasis_transaction_table_get(stasis_transaction_table, xid); @@ -206,7 +206,7 @@ lsn_t TwritebackUpdate(int xid, pageid_t page, if(l->prevLSN == -1) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; - freeLogEntry(e); + freeLogEntry(stasis_log_file, e); return l->prevLSN; } /** DANGER: you need to set the LSN's on the pages that you want to write back, @@ -218,7 +218,7 @@ void TreorderableWritebackUpdate(int xid, void* hp, stasis_log_reordering_handle_t* h = hp; assert(stasis_transaction_table_is_active(stasis_transaction_table, xid)); pthread_mutex_lock(&h->mut); - LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, datlen); + LogEntry * e = allocUpdateLogEntry(stasis_log_file, -1, xid, op, page, datlen); memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen); stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(0, e)); pthread_mutex_unlock(&h->mut); diff --git a/stasis/logger/logEntry.h b/stasis/logger/logEntry.h index 6c33805..0719c4f 100644 --- a/stasis/logger/logEntry.h +++ b/stasis/logger/logEntry.h @@ -94,9 +94,9 @@ struct LogEntry { @return a LogEntry that should be freed with free(). */ -LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type); +LogEntry * allocCommonLogEntry(stasis_log_t *log, lsn_t prevLSN, int xid, unsigned int type); -LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN); +LogEntry * allocPrepareLogEntry(stasis_log_t *log, lsn_t prevLSN, int xid, lsn_t recLSN); /** Allocate a log entry associated with an operation implemention. This is usually called inside of Tupdate(). @@ -104,7 +104,7 @@ LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN); @return a LogEntry that should be freed with free(). */ -LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, +LogEntry * allocUpdateLogEntry(stasis_log_t *log, lsn_t prevLSN, int xid, unsigned int op, pageid_t page, unsigned int arg_size); /** @@ -112,20 +112,18 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, entries are undone. This moves undo operations into the redo phase, by recording the inverse of the original operation, and sets prevLSN to the prevLSN of old_e. - - @return a LogEntry that should be freed with free(). */ -LogEntry * allocCLRLogEntry(const LogEntry * e); +LogEntry * allocCLRLogEntry(stasis_log_t *log, const LogEntry * e); /** @param e a log entry returned from one of the alloc???LogEntry functions. */ -void freeLogEntry(const LogEntry * e); +void freeLogEntry(stasis_log_t *log, const LogEntry * e); /** @param lh The log handle the entry will be stored in. (Needed because some log entries are of type INTERNALLOG) May be NULL if e is not of type INTERNALLOG. @param e A log entry of any type. @return the length, in bytes, of e. */ -lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e); +lsn_t sizeofLogEntry(stasis_log_t * log, const LogEntry * e); /** * @return the operation's arguments, or NULL if there are no arguments. */ diff --git a/stasis/logger/logHandle.h b/stasis/logger/logHandle.h index a7abd15..8478295 100644 --- a/stasis/logger/logHandle.h +++ b/stasis/logger/logHandle.h @@ -58,6 +58,9 @@ typedef struct LogHandle LogHandle; in a particular transaction. They follow the prevLSN field, skipping any undo entries that have been marked complete. + The memory that contains the logEntry objects is managed by the LogHandle. + Callers should not free() or otherwise dispose of memory returned by a LogHandle. + @see logWriter.h For write access to the log. @see logger.h For the api provided by log implementations. */ diff --git a/stasis/operations/pageOperations.h b/stasis/operations/pageOperations.h index 737b246..4208748 100644 --- a/stasis/operations/pageOperations.h +++ b/stasis/operations/pageOperations.h @@ -79,6 +79,6 @@ stasis_operation_impl stasis_op_impl_page_initialize(); stasis_operation_impl stasis_op_impl_fixed_page_alloc(); -compensated_function void pageOperationsInit(); +compensated_function void pageOperationsInit(stasis_log_t *log); #endif diff --git a/stasis/operations/regions.h b/stasis/operations/regions.h index 76b02e9..79c0133 100644 --- a/stasis/operations/regions.h +++ b/stasis/operations/regions.h @@ -26,7 +26,7 @@ typedef struct boundary_tag { #define REGION_OCCUPIED (REGION_BASE + 2) #define REGION_CONDEMNED (REGION_BASE + 3) -void regionsInit(); +void regionsInit(stasis_log_t *log); pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocaionManager); void TregionDealloc(int xid, pageid_t firstPage); diff --git a/test/stasis/check_logEntry.c b/test/stasis/check_logEntry.c index f9ddeb2..e163149 100644 --- a/test/stasis/check_logEntry.c +++ b/test/stasis/check_logEntry.c @@ -49,13 +49,16 @@ terms specified in this license. START_TEST(rawLogEntryAlloc) { - LogEntry * log = allocCommonLogEntry(200, 1, XABORT); + Tinit(); + stasis_log_t *l = stasis_log(); + LogEntry * log = allocCommonLogEntry(log, 200, 1, XABORT); assert(log->LSN == -1); assert(log->prevLSN == 200); assert(log->xid == 1); assert(log->type == XABORT); assert(sizeofLogEntry(0, log) == sizeof(struct __raw_log_entry)); - free(log); + freeLogEntry(l, log); + Tdeinit(); } END_TEST @@ -74,8 +77,9 @@ START_TEST(updateLogEntryAlloc) LogEntry * log; Tinit(); /* Needed because it sets up the operations table. */ + stasis_log_t *l = stasis_log(); - log = allocUpdateLogEntry(200, 1, OPERATION_SET, + log = allocUpdateLogEntry(l, 200, 1, OPERATION_SET, rid.page, 3*sizeof(char)); memcpy(stasis_log_entry_update_args_ptr(log), args, 3*sizeof(char)); assert(log->LSN == -1); @@ -103,9 +107,12 @@ END_TEST START_TEST(updateLogEntryAllocNoExtras) { + Tinit(); + recordid rid = { 3 , 4, sizeof(int)*3 }; - LogEntry * log = allocUpdateLogEntry(200, 1, OPERATION_SET, + stasis_log_t *l = stasis_log(); + LogEntry * log = allocUpdateLogEntry(l, 200, 1, OPERATION_SET, rid.page, 0); assert(log->LSN == -1); assert(log->prevLSN == 200); @@ -120,6 +127,8 @@ START_TEST(updateLogEntryAllocNoExtras) assert(sizeofLogEntry(0, log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 0 * (sizeof(int)+sizeof(char)))); free(log); + + Tdeinit(); } END_TEST diff --git a/test/stasis/check_logWriter.c b/test/stasis/check_logWriter.c index ac77d9b..342e041 100644 --- a/test/stasis/check_logWriter.c +++ b/test/stasis/check_logWriter.c @@ -73,7 +73,7 @@ static stasis_log_t * setup_log() { int first = 1; stasis_log_t * stasis_log_file = stasis_log(); for(i = 0 ; i < 1000; i++) { - LogEntry * e = allocCommonLogEntry(prevLSN, xid, XBEGIN); + LogEntry * e = allocCommonLogEntry(stasis_log_file, prevLSN, xid, XBEGIN); const LogEntry * f; recordid rid; byte * args = (byte*)"Test 123."; @@ -100,23 +100,23 @@ static stasis_log_t * setup_log() { fail_unless(sizeofLogEntry(0, e) == sizeofLogEntry(0, f), "Log entry changed size!!"); fail_unless(0 == memcmp(e,f,sizeofLogEntry(0, e)), "Log entries did not agree!!"); - freeLogEntry(e); - freeLogEntry(f); + freeLogEntry(stasis_log_file, e); + freeLogEntry(stasis_log_file, f); - e = allocUpdateLogEntry(prevLSN, xid, 1, rid.page, args_size); + e = allocUpdateLogEntry(stasis_log_file, prevLSN, xid, 1, rid.page, args_size); memcpy(stasis_log_entry_update_args_ptr(e), args, args_size); stasis_log_file->write_entry(stasis_log_file,e); prevLSN = e->prevLSN; // LogEntry * g = allocCLRLogEntry(100, 1, 200, rid, 0); //prevLSN); - LogEntry * g = allocCLRLogEntry(e); // XXX will probably break + LogEntry * g = allocCLRLogEntry(stasis_log_file, e); // XXX will probably break g->prevLSN = firstLSN; stasis_log_file->write_entry(stasis_log_file,g); assert (g->type == CLRLOG); prevLSN = g->LSN; - freeLogEntry (e); - freeLogEntry (g); + freeLogEntry (stasis_log_file, e); + freeLogEntry (stasis_log_file, g); } return stasis_log_file; } @@ -148,7 +148,6 @@ static void loggerTest(int logType) { h = getLogHandle(stasis_log_file); while((e = nextInLog(h))) { - freeLogEntry(e); i++; assert(i < 4000); } @@ -181,15 +180,17 @@ static void logHandleColdReverseIterator(int logType) { while(((e = nextInLog(lh)) && (i < 100)) ) { - freeLogEntry(e); i++; } + lsn_t lsn = e->LSN; + + freeLogHandle(lh); + i = 0; - lh = getLSNHandle(stasis_log_file, e->LSN); + lh = getLSNHandle(stasis_log_file, lsn); while((e = previousInTransaction(lh))) { i++; - freeLogEntry(e); } freeLogHandle(lh); assert(i <= 4); /* We should almost immediately hit a clr that goes to the beginning of the log... */ @@ -224,13 +225,26 @@ static void loggerTruncate(int logType) { le = nextInLog(lh); } + LogEntry * copy = malloc(sizeofLogEntry(stasis_log_file, le)); + memcpy(copy, le, sizeofLogEntry(stasis_log_file, le)); + le = copy; + le2 = nextInLog(lh); + + copy = malloc(sizeofLogEntry(stasis_log_file, le2)); + memcpy(copy, le2, sizeofLogEntry(stasis_log_file, le2)); + le2 = copy; + i = 0; while(i < 23) { i++; le3 = nextInLog(lh); } + copy = malloc(sizeofLogEntry(stasis_log_file, le3)); + memcpy(copy, le3, sizeofLogEntry(stasis_log_file, le3)); + le3 = copy; + stasis_log_file->truncate(stasis_log_file, le->LSN); tmp = stasis_log_file->read_entry(stasis_log_file, le->LSN); @@ -238,33 +252,32 @@ static void loggerTruncate(int logType) { fail_unless(NULL != tmp, NULL); fail_unless(tmp->LSN == le->LSN, NULL); - freeLogEntry(tmp); + freeLogEntry(stasis_log_file, tmp); tmp = stasis_log_file->read_entry(stasis_log_file, le2->LSN); fail_unless(NULL != tmp, NULL); fail_unless(tmp->LSN == le2->LSN, NULL); - freeLogEntry(tmp); + freeLogEntry(stasis_log_file, tmp); tmp = stasis_log_file->read_entry(stasis_log_file, le3->LSN); fail_unless(NULL != tmp, NULL); fail_unless(tmp->LSN == le3->LSN, NULL); - freeLogEntry(tmp); + freeLogEntry(stasis_log_file, tmp); freeLogHandle(lh); lh = getLogHandle(stasis_log_file); i = 0; - freeLogEntry(le); - freeLogEntry(le2); - freeLogEntry(le3); + free((void*)le); + free((void*)le2); + free((void*)le3); while((le = nextInLog(lh))) { if(le->type != INTERNALLOG) { i++; } - freeLogEntry(le); } assert(i == (3000 - 234 + 1)); freeLogHandle(lh); @@ -300,7 +313,7 @@ static void* worker_thread(void * arg) { stasis_log_t * stasis_log_file = stasis_log(); while(i < ENTRIES_PER_THREAD) { - LogEntry * le = allocCommonLogEntry(-1, -1, XBEGIN); + LogEntry * le = allocCommonLogEntry(stasis_log_file, -1, -1, XBEGIN); int threshold; long entry; int needToTruncate = 0; @@ -361,7 +374,7 @@ static void* worker_thread(void * arg) { pthread_mutex_unlock(&random_mutex); } else { assert(e->xid == entry+key); - freeLogEntry(e); + freeLogEntry(stasis_log_file, e); } } else { pthread_mutex_unlock(&random_mutex); @@ -372,7 +385,7 @@ static void* worker_thread(void * arg) { /* Try to interleave requests as much as possible */ sched_yield(); - freeLogEntry(le); + freeLogEntry(stasis_log_file, le); } @@ -536,6 +549,11 @@ void reopenLogWorkload(int truncating) { freeLogHandle(h); assert(i == (ENTRY_COUNT * 2)); + for(int i = 0; i < ENTRY_COUNT; i++) { + freeLogEntry(stasis_log_file, entries[i]); + freeLogEntry(stasis_log_file, entries2[i]); + } + stasis_truncation_automatic = 1; stasis_log_file->close(stasis_log_file); } diff --git a/test/stasis/check_multiplexer.c b/test/stasis/check_multiplexer.c index fe7fe8b..408060a 100644 --- a/test/stasis/check_multiplexer.c +++ b/test/stasis/check_multiplexer.c @@ -180,14 +180,14 @@ START_TEST(multiplexTest) { for(i = 0; i < NUM_INSERTS; i++) { (*(lsn_t*)(arg+1)) = i; - LogEntry * e = allocUpdateLogEntry(-1, -1, OPERATION_LINEAR_HASH_INSERT, INVALID_PAGE, + LogEntry * e = allocUpdateLogEntry(stasis_log(), -1, -1, OPERATION_LINEAR_HASH_INSERT, INVALID_PAGE, sizeof(linearHash_remove_arg) + sizeof(lsn_t) + sizeof(char)); memcpy(stasis_log_entry_update_args_ptr(e), arg, sizeof(linearHash_remove_arg) + sizeof(lsn_t) + sizeof(char)); ThashInsert(xid, hash, (byte*)&i, sizeof(lsn_t), (byte*)e, sizeofLogEntry(0, e)); - free(e); + freeLogEntry(stasis_log(), e); } diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index beee1d8..bd888c1 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -88,7 +88,7 @@ START_TEST(operation_physical_do_undo) { // XXX fails; set log format has changed - setToTwo = allocUpdateLogEntry(-1, xid, OPERATION_SET, rid.page, + setToTwo = allocUpdateLogEntry(stasis_log(), -1, xid, OPERATION_SET, rid.page, sizeof(slotid_t) + sizeof(int64_t) + 2 * sizeof(int)); memcpy(stasis_log_entry_update_args_ptr(setToTwo), arg, sizeof(slotid_t) + sizeof(int64_t) + 2 * sizeof(int)); @@ -181,7 +181,7 @@ START_TEST(operation_physical_do_undo) { // XXX This is a hack to put some stuff in the log. Otherwise, Tdeinit() fails. for(int i = 0; i < 10; i++) ((stasis_log_t*)stasis_log())->write_entry(stasis_log(), - allocCommonLogEntry(-1, -1, -1)); + allocCommonLogEntry(stasis_log(), -1, -1, -1)); /** @todo need to re-think check_operations. The test is pretty broken. */ Tcommit(xid); diff --git a/utilities/logfile_dump.c b/utilities/logfile_dump.c index 45a4b79..f1aaf51 100644 --- a/utilities/logfile_dump.c +++ b/utilities/logfile_dump.c @@ -82,7 +82,6 @@ int main() { free(s); } - freeLogEntry(le); } freeLogHandle(lh);