From 87218ad929970d60980859daee429dc955326f50 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Wed, 13 May 2009 18:04:53 +0000 Subject: [PATCH] yet another refactoring; broke groupForce into its own module, removed more static variables and namespace violations --- .cproject | 3 +- benchmarks/distributedLsnFree.c | 6 +- benchmarks/lsn_bench_common.h | 20 +-- benchmarks/qos.c | 4 +- src/apps/readOnlyHash/buildTable.c | 14 +-- src/stasis/CMakeLists.txt | 9 +- src/stasis/Makefile.am | 1 + src/stasis/bufferManager/legacy/pageFile.c | 6 +- src/stasis/flags.c | 6 +- src/stasis/logger/filePool.c | 4 +- src/stasis/logger/groupForce.c | 103 ++++++++++++++++ src/stasis/logger/inMemoryLog.c | 8 +- src/stasis/logger/logEntry.c | 11 +- src/stasis/logger/logger2.c | 134 +++++---------------- src/stasis/logger/reorderingHandle.c | 8 +- src/stasis/logger/safeWrites.c | 24 ++-- src/stasis/operations/prepare.c | 26 ++-- src/stasis/page/lsnFree.c | 3 +- src/stasis/page/raw.c | 16 +-- src/stasis/pageHandle.c | 6 +- src/stasis/recovery2.c | 4 +- src/stasis/transactional2.c | 44 ++++--- stasis/bufferManager.h | 2 +- stasis/bufferManager/legacy/pageFile.h | 3 +- stasis/flags.h | 2 +- stasis/logger/groupForce.h | 18 +++ stasis/logger/logEntry.h | 27 +++-- stasis/logger/logger2.h | 42 +++---- stasis/pageHandle.h | 31 ++++- stasis/transactional.h | 13 +- test/stasis/check_logEntry.c | 6 +- test/stasis/check_logWriter.c | 39 +++--- test/stasis/check_multiplexer.c | 2 +- test/stasis/check_operations.c | 4 +- utilities/truncate_log.c | 2 +- 35 files changed, 386 insertions(+), 265 deletions(-) create mode 100644 src/stasis/logger/groupForce.c create mode 100644 stasis/logger/groupForce.h diff --git a/.cproject b/.cproject index 5ad646c..5817980 100644 --- a/.cproject +++ b/.cproject @@ -23,7 +23,7 @@ - @@ -211,6 +211,7 @@ + diff --git a/benchmarks/distributedLsnFree.c b/benchmarks/distributedLsnFree.c index 359ef0b..4340c4f 100644 --- a/benchmarks/distributedLsnFree.c +++ b/benchmarks/distributedLsnFree.c @@ -15,10 +15,10 @@ int main(int argc, char ** argv) { /*if(!(strcmp(mode, "writeback-pipeline"))) { // pipelining likes big queues - // stasis_log_write_buffer_size = 50 * 1024 * 1024; + // stasis_log_file_write_buffer_size = 50 * 1024 * 1024; } else { }*/ - stasis_log_write_buffer_size = 50 * 1024 * 1024; + stasis_log_file_write_buffer_size = 50 * 1024 * 1024; Tinit(); alloc_rids(num_rids,&rids,&fast); @@ -77,7 +77,7 @@ int main(int argc, char ** argv) { handles[i] = stasis_log_reordering_handle_open( &stasis_transaction_table[xid%MAX_TRANSACTIONS], stasis_log_file, - (0.9*stasis_log_write_buffer_size)/num_workers, + (0.9*stasis_log_file_write_buffer_size)/num_workers, //512*1024/ua->divisor, // 0.5 mb in log tail at once 1000000/num_workers, // max num outstanding requests (50 * 1024 * 1024)/num_workers // max backlog in bytes diff --git a/benchmarks/lsn_bench_common.h b/benchmarks/lsn_bench_common.h index ca231c7..44d71dc 100644 --- a/benchmarks/lsn_bench_common.h +++ b/benchmarks/lsn_bench_common.h @@ -55,7 +55,7 @@ typedef struct { void build_cache(recordid * rids, cached_addr** cache, long long count) { *cache = malloc (sizeof(**cache) * count); - lsn_t log_trunc = stasis_log_file->truncation_point(stasis_log_file); + lsn_t log_trunc = ((stasis_log_t*)stasis_log())->truncation_point(stasis_log()); for(long long i = 0; i < count; i++) { (*cache)[i].pid = rids[i].page; @@ -91,8 +91,8 @@ int my_write_entry(struct stasis_log_t* log, LogEntry *e) { } void emulate_remote_log() { - original_write_entry = stasis_log_file->write_entry; - stasis_log_file->write_entry = my_write_entry; + original_write_entry = ((stasis_log_t*)stasis_log())->write_entry; + ((stasis_log_t*)stasis_log())->write_entry = my_write_entry; } void emulate_remote_pages() { origWrite = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite; @@ -157,8 +157,8 @@ void * writeback_unit_of_work(void * ap) { stasis_log_reordering_handle_t * rh = stasis_log_reordering_handle_open( &stasis_transaction_table[ua->xid%MAX_TRANSACTIONS], - stasis_log_file, - (0.9*stasis_log_write_buffer_size)/ua->divisor, + stasis_log(), + (0.9*stasis_log_file_write_buffer_size)/ua->divisor, //512*1024/ua->divisor, // 0.5 mb in log tail at once 1000000/ua->divisor, // max num outstanding requests (50 * 1024 * 1024)/ua->divisor // max backlog in bytes @@ -166,7 +166,7 @@ void * writeback_unit_of_work(void * ap) { /* stasis_log_reordering_handle_open(&stasis_transaction_table[ua->xid%MAX_TRANSACTIONS], stasis_log_file, - (stasis_log_write_buffer_size * 0.25)/ua->divisor, + (stasis_log_file_write_buffer_size * 0.25)/ua->divisor, //512*1024/ua->divisor, // 0.5 mb in log tail at once 1000000/ua->divisor, // max num outstanding requests (50 * 1024 * 1024)/ua->divisor // max backlog in bytes @@ -178,7 +178,7 @@ stasis_log_reordering_handle_open(&stasis_transaction_table[ua->xid%MAX_TRANSACT TsetReorderableWriteBack(ua->xid, rh, ua->cache[idx].pid, ua->cache[idx].off, ua->cache[idx].len,&ua->cache[idx].val,&old); // TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val); - + } stasis_log_reordering_handle_close(rh); return 0; @@ -199,8 +199,8 @@ void * bg_unit_of_work(void * ap) { stasis_log_reordering_handle_t * rh = stasis_log_reordering_handle_open(&stasis_transaction_table[ua->xid%MAX_TRANSACTIONS], - stasis_log_file, - (stasis_log_write_buffer_size * 0.25)/ua->divisor, + stasis_log(), + (stasis_log_file_write_buffer_size * 0.25)/ua->divisor, //512*1024/ua->divisor, // 0.5 mb in log tail at once 1000000/ua->divisor, // max num outstanding requests (50 * 1024 * 1024)/ua->divisor // max backlog in bytes @@ -224,7 +224,7 @@ void* bg_worker(void * ap) { long long start = tv.tv_usec + tv.tv_sec * 1000000; int xid = Tbegin(); - if(stasis_log_file->write_entry == my_write_entry) { + if(((stasis_log_t*)stasis_log())->write_entry == my_write_entry) { // based on tweaking; also, normal-net is ~ 100x slower than nromal int num_worker = 100; pthread_t workers[num_worker]; diff --git a/benchmarks/qos.c b/benchmarks/qos.c index 3f87322..fd67612 100644 --- a/benchmarks/qos.c +++ b/benchmarks/qos.c @@ -17,10 +17,10 @@ int main(int argc, char** argv) { // XXX instead of overriding this, set tail of priority log to 80% // stasis log buf or something... - // stasis_log_write_buffer_size = 50 * 1024 * 1024; + // stasis_log_file_write_buffer_size = 50 * 1024 * 1024; printf("%s %s %s %s %lld\n", argv[0], argv[1], argv[2], argv[3], - stasis_log_write_buffer_size); + stasis_log_file_write_buffer_size); Tinit(); diff --git a/src/apps/readOnlyHash/buildTable.c b/src/apps/readOnlyHash/buildTable.c index 637341e..a9e9206 100644 --- a/src/apps/readOnlyHash/buildTable.c +++ b/src/apps/readOnlyHash/buildTable.c @@ -8,12 +8,12 @@ #include #include -int main(int argc, char** argv) { +int main(int argc, char** argv) { Tinit(); char * key; char * value; - + int ret; int xid = Tbegin(); @@ -31,8 +31,8 @@ int main(int argc, char** argv) { // bleah; gcc would warn without the casts, since it doesn't understand that %as = Allocate String char ** keyp = &key; // The extra garbage is to avoid type punning warnings... char ** valuep = &value; - while(EOF != (ret=scanf("%as\t%as\n", (float*)keyp, (float*)valuep))) { - if(!ret) { + while(EOF != (ret=scanf("%as\t%as\n", (float*)keyp, (float*)valuep))) { + if(!ret) { printf("Could not parse input!\n"); Tabort(xid); Tdeinit(); @@ -44,8 +44,8 @@ int main(int argc, char** argv) { free(key); free(value); count ++; - - if(!(count % 10000)) { + + if(!(count % 10000)) { gettimeofday(&now,0); double rate = ((double)count)/((double)(now.tv_sec-start.tv_sec)); printf("%d tuples inserted (%f per sec)\n", count, rate); @@ -53,7 +53,7 @@ int main(int argc, char** argv) { } Tcommit(xid); - stasis_truncation_truncate(stasis_log_file, 1); + TtruncateLog(); Tdeinit(); return 0; } diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index 4ac17b3..8344c94 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -1,20 +1,21 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common.c flags.c stats.c bufferManager.c linkedlist.c operations.c pageHandle.c - bufferManager/legacy/pageFile.c + bufferManager/legacy/pageFile.c bufferManager/legacy/pageCache.c bufferManager/legacy/legacyBufferManager.c - page.c bufferPool.c blobManager.c + page.c bufferPool.c blobManager.c recovery2.c truncation.c transactional2.c allocationPolicy.c lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c logger/logEntry.c logger/safeWrites.c logger/logWriterUtils.c - logger/filePool.c - logger/inMemoryLog.c + logger/filePool.c + logger/inMemoryLog.c logger/logHandle.c logger/logger2.c logger/logMemory.c page/raw.c page/slotted.c page/lsnFree.c logger/reorderingHandle.c + logger/groupForce.c page/fixed.c compensations.c operations/pageOperations.c page/indirect.c operations/decrement.c operations/increment.c diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index a0e8eee..8126175 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -11,6 +11,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \ logger/logMemory.c \ logger/reorderingHandle.c \ + logger/groupForce.c \ page/raw.c page/slotted.c page/lsnFree.c page/fixed.c compensations.c \ operations/pageOperations.c page/indirect.c operations/decrement.c \ operations/increment.c operations/prepare.c operations/set.c \ diff --git a/src/stasis/bufferManager/legacy/pageFile.c b/src/stasis/bufferManager/legacy/pageFile.c index 66cb44d..8ab2a19 100644 --- a/src/stasis/bufferManager/legacy/pageFile.c +++ b/src/stasis/bufferManager/legacy/pageFile.c @@ -94,7 +94,7 @@ static void pfPageWrite(stasis_page_handle_t * h, Page * ret) { // If necessary, force the log to disk so that ret's LSN will be stable. assert(ret->LSN == stasis_page_lsn_read(ret)); - LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL); + if(h->log) { stasis_log_force(h->log, ret->LSN, LOG_FORCE_WAL); } pthread_mutex_lock(&stable_mutex); @@ -133,14 +133,14 @@ static void pfPageWrite(stasis_page_handle_t * h, Page * ret) { //#define PAGE_FILE_O_DIRECT /** @todo O_DIRECT is broken in older linuxes (eg 2.4). The build script should disable it on such platforms. */ -stasis_page_handle_t* openPageFile() { +stasis_page_handle_t* openPageFile(stasis_log_t * log) { stasis_page_handle_t * ret = malloc(sizeof(*ret)); ret->read = pfPageRead; ret->write = pfPageWrite; ret->force_file = pfForcePageFile; ret->force_range = pfForceRangePageFile; ret->close = pfClosePageFile; - + ret->log = log; DEBUG("Opening storefile.\n"); #ifdef PAGE_FILE_O_DIRECT diff --git a/src/stasis/flags.c b/src/stasis/flags.c index d7832e2..6a06640 100644 --- a/src/stasis/flags.c +++ b/src/stasis/flags.c @@ -85,8 +85,8 @@ const char* stasis_log_dir_name = "stasis_log"; #endif //STASIS_LOG_DIR_LSN_CHARS const int stasis_log_dir_name_lsn_chars = 20; -#ifdef STASIS_LOG_WRITE_BUFFER_SIZE -lsn_t stasis_log_write_buffer_size = STASIS_LOG_WRITE_BUFFER_SIZE; +#ifdef STASIS_LOG_FILE_WRITE_BUFFER_SIZE +lsn_t stasis_log_file_write_buffer_size = STASIS_LOG_FILE_WRITE_BUFFER_SIZE; #else -lsn_t stasis_log_write_buffer_size = 1024 * 1024; +lsn_t stasis_log_file_write_buffer_size = 1024 * 1024; #endif diff --git a/src/stasis/logger/filePool.c b/src/stasis/logger/filePool.c index 738783a..419fa67 100644 --- a/src/stasis/logger/filePool.c +++ b/src/stasis/logger/filePool.c @@ -218,8 +218,8 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f pthread_mutex_init(&fp->read_mutex,0); fp->state_latch = initlock(); - fp->buffer = calloc(stasis_log_write_buffer_size, sizeof(char)); - setbuffer(fp->fp, fp->buffer, stasis_log_write_buffer_size); + fp->buffer = calloc(stasis_log_file_write_buffer_size, sizeof(char)); + setbuffer(fp->fp, fp->buffer, stasis_log_file_write_buffer_size); } diff --git a/src/stasis/logger/groupForce.c b/src/stasis/logger/groupForce.c new file mode 100644 index 0000000..ed61352 --- /dev/null +++ b/src/stasis/logger/groupForce.c @@ -0,0 +1,103 @@ +/* + * groupForce.c + * + * Created on: May 12, 2009 + * Author: sears + */ + +#include +#include +#include + +#include +#include + +struct stasis_log_group_force_t { + stasis_log_t * log; + pthread_mutex_t check_commit; + pthread_cond_t tooFewXacts; + int pendingCommits; + uint64_t wait_nsec; +}; + +stasis_log_group_force_t * stasis_log_group_force_init(stasis_log_t * log, uint64_t wait_nsec) { + static int warned = 0; + if(wait_nsec > (1000 * 1000 * 1000)) { + warned = 1; + fprintf(stderr, "TODO stasis_log_group_force: Efficiently support wait " + "times > 1 second. (%llu second wait time requested)\n", + (long long unsigned int) (wait_nsec / (1000 * 1000 * 1000))); + } + stasis_log_group_force_t * ret = malloc(sizeof(*ret)); + ret->log = log; + pthread_mutex_init(&ret->check_commit,0); + pthread_cond_init(&ret->tooFewXacts,0); + ret->pendingCommits = 0; + ret->wait_nsec = wait_nsec; + return ret; +} + +void stasis_log_group_force(stasis_log_group_force_t* lh, lsn_t lsn) { +// static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER; +// static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER; +// static int pendingCommits; + + pthread_mutex_lock(&lh->check_commit); + if(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) > lsn) { + pthread_mutex_unlock(&lh->check_commit); + return; + } + if(lh->log->is_durable(lh->log)) { + struct timeval now; + struct timespec timeout; + + gettimeofday(&now, NULL); + timeout.tv_sec = now.tv_sec; + timeout.tv_nsec = now.tv_usec * 1000; + // 0123456789 <- number of zeros on the next three lines... +// timeout.tv_nsec += 10000000; // wait ten msec. + timeout.tv_nsec += lh->wait_nsec; + while(timeout.tv_nsec > (1000 * 1000 * 1000)) { + timeout.tv_nsec -= (1000 * 1000 * 1000); + timeout.tv_sec++; + } + + lh->pendingCommits++; + int xactcount = TactiveTransactionCount(); + if((xactcount > 1 && lh->pendingCommits < xactcount) || + (xactcount > 20 && lh->pendingCommits < (int)((double)xactcount * 0.95))) { + int retcode; + while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&lh->tooFewXacts, &lh->check_commit, &timeout))) { + if(retcode != 0) { + printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by " + "a signal in groupCommit(). Acting as though it timed out.\n", + __FILE__, __LINE__); + break; + } + if(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) > lsn) { + (lh->pendingCommits)--; + pthread_mutex_unlock(&lh->check_commit); + return; + } + } + } + } else { + (lh->pendingCommits)++; + } + if(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) <= lsn) { + lh->log->force_tail(lh->log, LOG_FORCE_COMMIT); + assert(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) > lsn); + pthread_cond_broadcast(&lh->tooFewXacts); + } + assert(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) > lsn); + (lh->pendingCommits)--; + pthread_mutex_unlock(&lh->check_commit); + return; +} + +void stasis_log_group_force_deinit(stasis_log_group_force_t * lh) { + pthread_mutex_destroy(&lh->check_commit); + pthread_cond_destroy(&lh->tooFewXacts); + free(lh); +} + diff --git a/src/stasis/logger/inMemoryLog.c b/src/stasis/logger/inMemoryLog.c index 34649b1..01b025d 100644 --- a/src/stasis/logger/inMemoryLog.c +++ b/src/stasis/logger/inMemoryLog.c @@ -41,8 +41,8 @@ static int stasis_log_impl_in_memory_write_entry(stasis_log_t * log, LogEntry *e e->LSN = impl->nextAvailableLSN; - LogEntry * cpy = malloc(sizeofLogEntry(e)); - memcpy(cpy, e, sizeofLogEntry(e)); + LogEntry * cpy = malloc(sizeofLogEntry(log, e)); + memcpy(cpy, e, sizeofLogEntry(log, e)); DEBUG("lsn: %ld\n", e->LSN); impl->buffer[bufferOffset] = cpy; @@ -129,9 +129,9 @@ static const LogEntry * stasis_log_impl_in_memory_read_entry(stasis_log_t* log, assert(ptr); assert(ptr->LSN == lsn); - LogEntry * ret = malloc(sizeofLogEntry(ptr)); + LogEntry * ret = malloc(sizeofLogEntry(log, ptr)); - memcpy(ret, ptr, sizeofLogEntry(ptr)); + memcpy(ret, ptr, sizeofLogEntry(log, ptr)); DEBUG("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN); return ret; diff --git a/src/stasis/logger/logEntry.c b/src/stasis/logger/logEntry.c index 316e5ba..f90122b 100644 --- a/src/stasis/logger/logEntry.c +++ b/src/stasis/logger/logEntry.c @@ -111,14 +111,14 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, } LogEntry * allocCLRLogEntry(const LogEntry * old_e) { - CLRLogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeofLogEntry(old_e)); + CLRLogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeofLogEntry(0, old_e)); ret->LSN = -1; ret->prevLSN = old_e->prevLSN; ret->xid = old_e->xid; ret->type = CLRLOG; DEBUG("compensates: %lld\n", old_e->LSN); - memcpy((void*)getCLRCompensated(ret), old_e, sizeofLogEntry(old_e)); + memcpy((void*)getCLRCompensated(ret), old_e, sizeofLogEntry(0, old_e)); return (LogEntry*)ret; } @@ -127,13 +127,13 @@ void freeLogEntry(const LogEntry* e) { } -lsn_t sizeofLogEntry(const LogEntry * e) { +lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e) { switch (e->type) { case CLRLOG: { const LogEntry * contents = getCLRCompensated((const CLRLogEntry*) e); assert(contents->type != CLRLOG); - return sizeof(struct __raw_log_entry) + sizeofLogEntry(contents); + return sizeof(struct __raw_log_entry) + sizeofLogEntry(lh, contents); } case UPDATELOG: { @@ -141,7 +141,8 @@ lsn_t sizeofLogEntry(const LogEntry * e) { sizeof(UpdateLogEntry) + e->update.arg_size; } case INTERNALLOG: - return stasis_log_file->sizeof_internal_entry(stasis_log_file, e); + assert(lh); + return lh->sizeof_internal_entry(lh,e); case XPREPARE: return sizeof(struct __raw_log_entry)+sizeof(lsn_t); default: diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index 4b52ed9..839535a 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -52,31 +52,13 @@ terms specified in this license. #include #include -#include -#include - #include #include #include #include -/** - @todo stasis_log_file should be in transactional2.c, and not global - */ -stasis_log_t* stasis_log_file = 0; - -static int pendingCommits; - -void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* tl) { - tl->xid = xid; - - DEBUG("Log Begin %d\n", xid); - tl->prevLSN = -1; - tl->recLSN = -1; -} - -static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) { +static lsn_t stasis_log_write_common(stasis_log_t* log, TransactionLog * l, int type) { LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type); lsn_t ret; @@ -95,9 +77,9 @@ static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) { freeLogEntry(e); return ret; - } -static lsn_t LogTransCommonPrepare(stasis_log_t* log, TransactionLog * l) { + +static lsn_t stasis_log_write_prepare(stasis_log_t* log, TransactionLog * l) { LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN); lsn_t ret; @@ -120,7 +102,7 @@ static lsn_t LogTransCommonPrepare(stasis_log_t* log, TransactionLog * l) { } -LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l, +LogEntry * stasis_log_write_update(stasis_log_t* log, TransactionLog * l, Page * p, unsigned int op, const byte * arg, size_t arg_size) { @@ -138,7 +120,7 @@ LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l, return e; } -lsn_t LogCLR(stasis_log_t* log, const LogEntry * old_e) { +lsn_t stasis_log_write_clr(stasis_log_t* log, const LogEntry * old_e) { LogEntry * e = allocCLRLogEntry(old_e); log->write_entry(log, e); @@ -150,7 +132,7 @@ lsn_t LogCLR(stasis_log_t* log, const LogEntry * old_e) { return ret; } -lsn_t LogDummyCLR(stasis_log_t* log, int xid, lsn_t prevLSN, +lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid, lsn_t prevLSN, lsn_t compensatedLSN) { const LogEntry * const_e; LogEntry * e; @@ -160,99 +142,49 @@ lsn_t LogDummyCLR(stasis_log_t* log, int xid, lsn_t prevLSN, } else { const_e = log->read_entry(log, compensatedLSN); } - e = malloc(sizeofLogEntry(const_e)); - memcpy(e, const_e, sizeofLogEntry(const_e)); + e = malloc(sizeofLogEntry(log, const_e)); + memcpy(e, const_e, sizeofLogEntry(log, const_e)); e->LSN = compensatedLSN; - lsn_t ret = LogCLR(log, e); + lsn_t ret = stasis_log_write_clr(log, e); freeLogEntry(const_e); free(e); return ret; } -lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l) { - lsn_t lsn = LogTransCommon(log, l, XCOMMIT); - LogForce(log, lsn, LOG_FORCE_COMMIT); +void stasis_log_begin_transaction(stasis_log_t* log, int xid, TransactionLog* tl) { + tl->xid = xid; + + DEBUG("Log Begin %d\n", xid); + tl->prevLSN = -1; + tl->recLSN = -1; +} + +lsn_t stasis_log_abort_transaction(stasis_log_t* log, TransactionLog * l) { + return stasis_log_write_common(log, l, XABORT); +} +lsn_t stasis_log_end_aborted_transaction(stasis_log_t* log, TransactionLog * l) { + return stasis_log_write_common(log, l, XEND); +} +lsn_t stasis_log_prepare_transaction(stasis_log_t* log, TransactionLog * l) { + lsn_t lsn = stasis_log_write_prepare(log, l); + stasis_log_force(log, lsn, LOG_FORCE_COMMIT); return lsn; } -lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) { - return LogTransCommon(log, l, XABORT); -} -lsn_t LogTransEnd(stasis_log_t* log, TransactionLog * l) { - return LogTransCommon(log, l, XEND); -} -lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) { - lsn_t lsn = LogTransCommonPrepare(log, l); - LogForce(log, lsn, LOG_FORCE_COMMIT); + +lsn_t stasis_log_commit_transaction(stasis_log_t* log, TransactionLog * l) { + lsn_t lsn = stasis_log_write_common(log, l, XCOMMIT); + stasis_log_force(log, lsn, LOG_FORCE_COMMIT); return lsn; } -static void groupCommit(stasis_log_t* log, lsn_t lsn); - -void LogForce(stasis_log_t* log, lsn_t lsn, +void stasis_log_force(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode) { - if(mode == LOG_FORCE_COMMIT) { - groupCommit(log, lsn); + if((mode == LOG_FORCE_COMMIT) && log->group_force) { + stasis_log_group_force(log->group_force, lsn); } else { if(log->first_unstable_lsn(log,mode) <= lsn) { log->force_tail(log,mode); } } } - -static void groupCommit(stasis_log_t* log, lsn_t lsn) { - static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER; - static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER; - - - pthread_mutex_lock(&check_commit); - if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn) { - pthread_mutex_unlock(&check_commit); - return; - } - if(log->is_durable(log)) { - struct timeval now; - struct timespec timeout; - - gettimeofday(&now, NULL); - timeout.tv_sec = now.tv_sec; - timeout.tv_nsec = now.tv_usec * 1000; - // 0123456789 <- number of zeros on the next three lines... - timeout.tv_nsec += 10000000; // wait ten msec. - if(timeout.tv_nsec > 1000000000) { - timeout.tv_nsec -= 1000000000; - timeout.tv_sec++; - } - - pendingCommits++; - int xactcount = TactiveTransactionCount(); - if((xactcount > 1 && pendingCommits < xactcount) || - (xactcount > 20 && pendingCommits < (int)((double)xactcount * 0.95))) { - int retcode; - while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) { - if(retcode != 0) { - printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by " - "a signal in groupCommit(). Acting as though it timed out.\n", - __FILE__, __LINE__); - break; - } - if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn) { - pendingCommits--; - pthread_mutex_unlock(&check_commit); - return; - } - } - } - } else { - pendingCommits++; - } - if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) <= lsn) { - log->force_tail(log, LOG_FORCE_COMMIT); - assert(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn); - pthread_cond_broadcast(&tooFewXacts); - } - assert(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn); - pendingCommits--; - pthread_mutex_unlock(&check_commit); - return; -} diff --git a/src/stasis/logger/reorderingHandle.c b/src/stasis/logger/reorderingHandle.c index def2443..78c57e6 100644 --- a/src/stasis/logger/reorderingHandle.c +++ b/src/stasis/logger/reorderingHandle.c @@ -11,14 +11,14 @@ static void* stasis_log_reordering_handle_worker(void * a) { while(h->cur_len) { size_t chunk_len = 0; while(chunk_len < h->chunk_len && h->cur_len) { - LogEntry * e = LogUpdate(h->log, + LogEntry * e = stasis_log_write_update(h->log, h->l, h->queue[h->cur_off].p, h->queue[h->cur_off].op, h->queue[h->cur_off].arg, h->queue[h->cur_off].arg_size); assert(e->xid != INVALID_XID); - chunk_len += sizeofLogEntry(e); + chunk_len += sizeofLogEntry(h->log, e); if(h->queue[h->cur_off].p) { Page * p = h->queue[h->cur_off].p; @@ -31,14 +31,14 @@ static void* stasis_log_reordering_handle_worker(void * a) { xaction table for prevLSN is their friend. */ h->cur_len--; - h->phys_size -= sizeofLogEntry(e); + h->phys_size -= sizeofLogEntry(h->log, e); h->cur_off = (h->cur_off+1)%h->max_len; } if(chunk_len > 0) { lsn_t to_force = h->l->prevLSN; pthread_mutex_unlock(&h->mut); - LogForce(h->log, to_force, LOG_FORCE_COMMIT); + stasis_log_force(h->log, to_force, LOG_FORCE_COMMIT); if(stasis_log_reordering_usleep_after_flush) { usleep(stasis_log_reordering_usleep_after_flush); } diff --git a/src/stasis/logger/safeWrites.c b/src/stasis/logger/safeWrites.c index 9511b4e..15380eb 100644 --- a/src/stasis/logger/safeWrites.c +++ b/src/stasis/logger/safeWrites.c @@ -225,7 +225,7 @@ static inline int isDurable_LogWriter(stasis_log_t* log) { static inline lsn_t nextEntry_LogWriter(stasis_log_t* log, const LogEntry* e) { - return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t); + return e->LSN + sizeofLogEntry(log, e) + sizeof(lsn_t); } // crc handling @@ -233,8 +233,8 @@ static inline lsn_t nextEntry_LogWriter(stasis_log_t* log, static inline void log_crc_reset(stasis_log_safe_writes_state* sw) { sw->crc = 0; } -static inline void log_crc_update(const LogEntry * le, unsigned int * crc) { - *crc = stasis_crc32(le, sizeofLogEntry(le), *crc); +static inline void log_crc_update(stasis_log_t* log, const LogEntry * e, unsigned int * crc) { + *crc = stasis_crc32(e, sizeofLogEntry(log, e), *crc); } static LogEntry* log_crc_dummy_entry() { LogEntry* ret = allocCommonLogEntry(0, -1, INTERNALLOG); @@ -275,7 +275,7 @@ static inline lsn_t log_crc_next_lsn(stasis_log_t* log, lsn_t ret) { break; } } else { - log_crc_update(le, &crc); + log_crc_update(log, le, &crc); } freeLogEntry(le); } @@ -303,14 +303,14 @@ static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) { stasis_log_safe_writes_state* sw = log->impl; - const lsn_t size = sizeofLogEntry(e); + const lsn_t size = sizeofLogEntry(log, e); /* Set the log entry's LSN. */ pthread_mutex_lock(&sw->nextAvailableLSN_mutex); e->LSN = sw->nextAvailableLSN; pthread_mutex_unlock(&sw->nextAvailableLSN_mutex); - log_crc_update(e, &sw->crc); + log_crc_update(log, e, &sw->crc); DEBUG("Writing Log entry type = %d lsn = %ld, size = %ld\n", e->type, e->LSN, size); @@ -600,7 +600,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { int firstInternalEntry = 1; lsn_t nextLSN = 0; while((le = nextInLog(lh))) { - size = sizeofLogEntry(le); + size = sizeofLogEntry(log, le); if(nextLSN) { assert(nextLSN == le->LSN); } @@ -636,7 +636,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { assert(sw->nextAvailableLSN == LSN + lengthOfCopiedLog); - size = sizeofLogEntry(crc_entry); + size = sizeofLogEntry(log, crc_entry); sw->nextAvailableLSN = nextEntry_LogWriter(log, crc_entry); @@ -700,7 +700,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) { return LLADD_IO_ERROR; } - setbuffer(sw->fp, sw->buffer, stasis_log_write_buffer_size); + setbuffer(sw->fp, sw->buffer, stasis_log_file_write_buffer_size); sw->global_offset = LSN - sizeof(lsn_t); @@ -776,10 +776,8 @@ stasis_log_t* stasis_log_safe_writes_open(const char * filename, stasis_log_t* log = malloc(sizeof(*log)); memcpy(log,&proto, sizeof(proto)); log->impl = sw; - // XXX hack; we call things that call into this object during init! - stasis_log_file = log; - sw->buffer = calloc(stasis_log_write_buffer_size, sizeof(char)); + sw->buffer = calloc(stasis_log_file_write_buffer_size, sizeof(char)); if(!sw->buffer) { return 0; /*LLADD_NO_MEM;*/ } @@ -804,7 +802,7 @@ stasis_log_t* stasis_log_safe_writes_open(const char * filename, } /* Increase the length of log's buffer, since it's in O_SYNC mode. */ - setbuffer(sw->fp, sw->buffer, stasis_log_write_buffer_size); + setbuffer(sw->fp, sw->buffer, stasis_log_file_write_buffer_size); /* fread() doesn't notice when another handle writes to its file, even if fflush() is used to push the changes out to disk. diff --git a/src/stasis/operations/prepare.c b/src/stasis/operations/prepare.c index 9d00a33..b02de88 100644 --- a/src/stasis/operations/prepare.c +++ b/src/stasis/operations/prepare.c @@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of California, and other parties. The following terms apply to all files associated with the software unless explicitly disclaimed in individual files. - + The authors hereby grant permission to use, copy, modify, distribute, and license this software and its documentation for any purpose, provided that existing copyright notices are retained in all copies @@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by their authors and need not follow the licensing terms described here, provided that the new terms are clearly indicated on the first page of each file where they apply. - + IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - + THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. - + GOVERNMENT USE: If you are acquiring this software on behalf of the U.S. government, the Government shall have only "Restricted Rights" in the software and related documentation as defined in the Federal @@ -42,7 +42,7 @@ terms specified in this license. /********************************************** * $Id$ - * + * * sets the given reference to dat **********************************************/ @@ -55,16 +55,16 @@ terms specified in this license. recordid prepare_bogus_rec = { 0, 0, 0}; static int op_prepare(const LogEntry * e, Page * p) { - LogForce(stasis_log_file, e->LSN, LOG_FORCE_COMMIT); + //NO-OP. return 0; } stasis_operation_impl stasis_op_impl_prepare() { stasis_operation_impl o = { - OPERATION_PREPARE, OPERATION_PREPARE, /* id */ + OPERATION_PREPARE, OPERATION_NOOP, - &op_prepare /* Function */ + &op_prepare/* Function */ }; return o; } @@ -78,7 +78,7 @@ typedef struct{ int aborted; } PrepareGuardState; -void * getPrepareGuardState() { +void * getPrepareGuardState() { PrepareGuardState * s = malloc (sizeof(PrepareGuardState)); s->continueIterating = 1; s->prevLSN = -1; @@ -89,10 +89,10 @@ void * getPrepareGuardState() { int prepareGuard(const LogEntry * e, void * state) { - PrepareGuardState * pgs = state; + PrepareGuardState * pgs = state; int ret = pgs->continueIterating; if(e->type == UPDATELOG && !pgs->aborted) { - if(e->update.funcID == OPERATION_PREPARE) { + if(e->update.funcID == OPERATION_PREPARE) { pgs->continueIterating = 0; pgs->prevLSN = e->prevLSN; } @@ -112,7 +112,7 @@ int prepareGuard(const LogEntry * e, void * state) { /** @todo When fleshing out the logHandle's prepareAction interface, figure out what the return value should mean... */ int prepareAction(void * state) { - PrepareGuardState * pgs = state; + PrepareGuardState * pgs = state; int ret; if(!(pgs->continueIterating || pgs->aborted)) { //assert(pgs->prevLSN != -1); @@ -121,6 +121,6 @@ int prepareAction(void * state) { ret = 1; } else { ret = 0; - } + } return ret; } diff --git a/src/stasis/page/lsnFree.c b/src/stasis/page/lsnFree.c index 2b69652..0163191 100644 --- a/src/stasis/page/lsnFree.c +++ b/src/stasis/page/lsnFree.c @@ -8,8 +8,9 @@ void stasis_slotted_lsn_free_initialize_page(Page * p) { *stasis_page_type_ptr(p) = SLOTTED_LSN_FREE_PAGE; *stasis_page_lsn_ptr(p) = -1; } +// XXX still not correct; need to have an "LSN_FREE" constant. static void lsnFreeLoaded(Page * p) { - p->LSN = stasis_log_file->next_available_lsn(stasis_log_file); + p->LSN = 0; //stasis_log_file->next_available_lsn(stasis_log_file); } static void lsnFreeFlushed(Page * p) { } diff --git a/src/stasis/page/raw.c b/src/stasis/page/raw.c index 40be361..6e6fbcd 100644 --- a/src/stasis/page/raw.c +++ b/src/stasis/page/raw.c @@ -7,9 +7,11 @@ XXX rawPageInferMetadata is wrong; setting lsn to LogFlushedLSN() breaks recovery. + + XXX still not correct; need an "LSN_FREE" constant. */ void rawPageInferMetadata(Page * p) { - p->LSN = stasis_log_file->first_unstable_lsn(stasis_log_file, LOG_FORCE_WAL); + p->LSN = 0; //stasis_log_file->first_unstable_lsn(stasis_log_file, LOG_FORCE_WAL); } byte* rawPageGetData(int xid, Page * p) { @@ -17,7 +19,7 @@ byte* rawPageGetData(int xid, Page * p) { return units_from_start_raw(byte, p, 0); } -void rawPageSetData(int xid, lsn_t lsn, Page * p) { +void rawPageSetData(int xid, lsn_t lsn, Page * p) { assertlocked(p->rwlatch); // writelock(p->rwlatch, 255); rawPageWriteLSN(xid, p, lsn); @@ -26,23 +28,23 @@ void rawPageSetData(int xid, lsn_t lsn, Page * p) { return; } -lsn_t rawPageReadLSN(const Page * p) { +lsn_t rawPageReadLSN(const Page * p) { assertlocked(p->rwlatch); // There are some potential optimizations here since the page // doesn't "really" have an LSN at all, but we need to be careful // about log truncation... - return p->LSN; + return p->LSN; } -void rawPageWriteLSN(int xid, Page * p, lsn_t lsn) { +void rawPageWriteLSN(int xid, Page * p, lsn_t lsn) { assertlocked(p->rwlatch); if(p->LSN < lsn) { p->LSN = lsn; } } -void rawPageCommit(int xid) { +void rawPageCommit(int xid) { // no-op } -void rawPageAbort(int xid) { +void rawPageAbort(int xid) { // no-op } diff --git a/src/stasis/pageHandle.c b/src/stasis/pageHandle.c index a3df4fc..f4298f7 100644 --- a/src/stasis/pageHandle.c +++ b/src/stasis/pageHandle.c @@ -21,7 +21,7 @@ static void phWrite(stasis_page_handle_t * ph, Page * ret) { // or we'll deadlock. writelock(ret->rwlatch,0); stasis_page_flushed(ret); - LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL); + if(ph->log) { stasis_log_force(ph->log, ret->LSN, LOG_FORCE_WAL); } int err = ((stasis_handle_t*)ph->impl)->write(ph->impl, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); if(err) { printf("Couldn't write to page file: %s\n", strerror(err)); @@ -66,7 +66,8 @@ static void phClose(stasis_page_handle_t * ph) { } free(ph); } -stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle) { +stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle, + stasis_log_t * log) { DEBUG("Using pageHandle implementation\n"); stasis_page_handle_t * ret = malloc(sizeof(*ret)); ret->write = phWrite; @@ -74,6 +75,7 @@ stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle) { ret->force_file = phForce; ret->force_range = phForceRange; ret->close = phClose; + ret->log = log; ret->impl = handle; return ret; } diff --git a/src/stasis/recovery2.c b/src/stasis/recovery2.c index 4577d44..92c41ee 100644 --- a/src/stasis/recovery2.c +++ b/src/stasis/recovery2.c @@ -290,7 +290,7 @@ static void stasis_recovery_undo(stasis_log_t* log, int recovery) { } // Log a CLR for this entry - lsn_t clr_lsn = LogCLR(log, e); + lsn_t clr_lsn = stasis_log_write_clr(log, e); DEBUG("logged clr\n"); stasis_transaction_table_roll_forward(e->xid, e->LSN, e->prevLSN); @@ -316,7 +316,7 @@ static void stasis_recovery_undo(stasis_log_t* log, int recovery) { stasis_operation_undo(ce, 0, 0); // compensated_lsn = -1 -> that the logical undo is a NOOP. // that way, we don't undo this operation twice. - LogDummyCLR(log, ce->xid, ce->prevLSN, -1); + stasis_log_write_dummy_clr(log, ce->xid, ce->prevLSN, -1); } else { DEBUG("physical clr: op %d lsn %lld\n", ce->update.funcID, ce->LSN); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 38eb834..0a38d15 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -31,6 +31,8 @@ TransactionLog stasis_transaction_table[MAX_TRANSACTIONS]; static int stasis_transaction_table_num_active = 0; static int stasis_transaction_table_xid_count = 0; +static stasis_log_t* stasis_log_file = 0; + /** This mutex protects stasis_transaction_table, numActiveXactions and xidCount. @@ -61,12 +63,17 @@ int Tinit() { stasis_operation_table_init(); dirtyPagesInit(); + stasis_log_file = 0; + if(LOG_TO_FILE == stasis_log_type) { stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file_mode, stasis_log_file_permissions); + stasis_log_file->group_force = + stasis_log_group_force_init(stasis_log_file, 10 * 1000 * 1000); // timeout in nsec; want 10msec. } else if(LOG_TO_MEMORY == stasis_log_type) { stasis_log_file = stasis_log_impl_in_memory_open(); + stasis_log_file->group_force = 0; } else { assert(stasis_log_file != NULL); } @@ -75,12 +82,13 @@ int Tinit() { stasis_page_handle_t * page_handle; if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) { printf("\nWarning: Using old I/O routines (with known bugs).\n"); - page_handle = openPageFile(); + page_handle = openPageFile(stasis_log_file); } else { stasis_handle_t * h = stasis_handle_open(stasis_store_file_name); // XXX should not be global. - page_handle = stasis_page_handle_open(h); + page_handle = stasis_page_handle_open(h, stasis_log_file); } + stasis_buffer_manager_open(bufferManagerType, page_handle); DEBUG("Buffer manager type = %d\n", bufferManagerType); pageOperationsInit(); @@ -135,7 +143,7 @@ int Tbegin() { pthread_mutex_unlock(&stasis_transaction_table_mutex); - LogTransBegin(stasis_log_file, xidCount_tmp, &stasis_transaction_table[index]); + stasis_log_begin_transaction(stasis_log_file, xidCount_tmp, &stasis_transaction_table[index]); if(globalLockManager.begin) { globalLockManager.begin(stasis_transaction_table[index].xid); } @@ -155,7 +163,7 @@ static compensated_function void TactionHelper(int xid, writelock(p->rwlatch,0); - e = LogUpdate(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS], + e = stasis_log_write_update(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS], p, op, dat, datlen); assert(stasis_transaction_table[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); @@ -183,7 +191,7 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page, p ? p->id : INVALID_PAGE, dat, datlen); - stasis_log_reordering_handle_append(h, p, op, dat, datlen, sizeofLogEntry(e)); + stasis_log_reordering_handle_append(h, p, op, dat, datlen, sizeofLogEntry(0, e)); e->LSN = 0; writelock(p->rwlatch,0); @@ -216,7 +224,7 @@ void TreorderableWritebackUpdate(int xid, void* hp, assert(xid >= 0 && stasis_transaction_table[xid % MAX_TRANSACTIONS].xid == xid); pthread_mutex_lock(&h->mut); LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen); - stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(e)); + stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(0, e)); pthread_mutex_unlock(&h->mut); } compensated_function void TupdateStr(int xid, pageid_t page, @@ -283,7 +291,7 @@ int Tcommit(int xid) { pthread_mutex_unlock(&stasis_transaction_table_mutex); #endif - lsn = LogTransCommit(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS]); + lsn = stasis_log_commit_transaction(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS]); if(globalLockManager.commit) { globalLockManager.commit(xid); } stasis_alloc_committed(xid); @@ -303,7 +311,7 @@ int Tprepare(int xid) { assert(xid >= 0); off_t i = xid % MAX_TRANSACTIONS; assert(stasis_transaction_table[i].xid == xid); - LogTransPrepare(stasis_log_file, &stasis_transaction_table[i]); + stasis_log_prepare_transaction(stasis_log_file, &stasis_transaction_table[i]); return 0; } @@ -314,7 +322,7 @@ int Tabort(int xid) { TransactionLog * t =&stasis_transaction_table[xid%MAX_TRANSACTIONS]; assert(t->xid == xid); - lsn = LogTransAbort(stasis_log_file, t); + lsn = stasis_log_abort_transaction(stasis_log_file, t); /** @todo is the order of the next two calls important? */ undoTrans(stasis_log_file, *t); @@ -327,7 +335,7 @@ int Tabort(int xid) { int Tforget(int xid) { TransactionLog * t = &stasis_transaction_table[xid%MAX_TRANSACTIONS]; assert(t->xid == xid); - LogTransEnd(stasis_log_file, t); + stasis_log_end_aborted_transaction(stasis_log_file, t); stasis_transaction_table_forget(t->xid); return 0; } @@ -350,7 +358,9 @@ int Tdeinit() { stasis_buffer_manager_close(); DEBUG("Closing page file tdeinit\n"); stasis_page_deinit(); + stasis_log_group_force_t * group_force = stasis_log_file->group_force; stasis_log_file->close(stasis_log_file); + if(group_force) { stasis_log_group_force_deinit(group_force); } dirtyPagesDeinit(); stasis_initted = 0; @@ -480,6 +490,9 @@ int TdurabilityLevel() { } } +void TtruncateLog() { + stasis_truncation_truncate(stasis_log_file, 1); +} typedef struct { lsn_t prev_lsn; lsn_t compensated_lsn; @@ -487,7 +500,7 @@ typedef struct { int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) { assert(xid >= 0); - LogEntry * e = LogUpdate(stasis_log_file, + LogEntry * e = stasis_log_write_update(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS], NULL, op, dat, datSize); lsn_t prev_lsn = e->prevLSN; @@ -497,7 +510,7 @@ int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) { freeLogEntry(e); - lsn_t clrLSN = LogDummyCLR(stasis_log_file, xid, prev_lsn, compensated_lsn); + lsn_t clrLSN = stasis_log_write_dummy_clr(stasis_log_file, xid, prev_lsn, compensated_lsn); stasis_transaction_table[xid % MAX_TRANSACTIONS].prevLSN = clrLSN; @@ -506,7 +519,7 @@ int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) { void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) { assert(xid >= 0); - LogEntry * e = LogUpdate(stasis_log_file, + LogEntry * e = stasis_log_write_update(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS], NULL, op, dat, datSize); DEBUG("Begin Nested Top Action e->LSN: %ld\n", e->LSN); @@ -528,7 +541,7 @@ lsn_t TendNestedTopAction(int xid, void * handle) { assert(xid >= 0); // Write a CLR. - lsn_t clrLSN = LogDummyCLR(stasis_log_file, xid, + lsn_t clrLSN = stasis_log_write_dummy_clr(stasis_log_file, xid, h->prev_lsn, h->compensated_lsn); // Ensure that the next action in this transaction points to the CLR. @@ -541,3 +554,6 @@ lsn_t TendNestedTopAction(int xid, void * handle) { return clrLSN; } +void * stasis_log() { + return stasis_log_file; +} diff --git a/stasis/bufferManager.h b/stasis/bufferManager.h index c3fe97a..9a36498 100644 --- a/stasis/bufferManager.h +++ b/stasis/bufferManager.h @@ -145,7 +145,7 @@ extern void (*forcePages)(); extern void (*forcePageRange)(pageid_t start, pageid_t stop); extern void (*stasis_buffer_manager_simulate_crash)(); -int stasis_buffer_manager_open(int type, stasis_page_handle_t* ph); +int stasis_buffer_manager_open(int type, stasis_page_handle_t * ph); /** * will write out any dirty pages, assumes that there are no running * transactions diff --git a/stasis/bufferManager/legacy/pageFile.h b/stasis/bufferManager/legacy/pageFile.h index c9bcb31..e4c050b 100644 --- a/stasis/bufferManager/legacy/pageFile.h +++ b/stasis/bufferManager/legacy/pageFile.h @@ -3,7 +3,8 @@ #define __PAGE_FILE_H #include +#include -stasis_page_handle_t* openPageFile(); +stasis_page_handle_t* openPageFile(stasis_log_t * log); #endif /* __PAGE_FILE_H */ diff --git a/stasis/flags.h b/stasis/flags.h index 4c6128e..9cd1db1 100644 --- a/stasis/flags.h +++ b/stasis/flags.h @@ -98,5 +98,5 @@ extern const int stasis_log_dir_name_lsn_chars; /** Number of bytes that stasis' log may buffer before writeback. */ -extern lsn_t stasis_log_write_buffer_size; +extern lsn_t stasis_log_file_write_buffer_size; #endif diff --git a/stasis/logger/groupForce.h b/stasis/logger/groupForce.h new file mode 100644 index 0000000..887374c --- /dev/null +++ b/stasis/logger/groupForce.h @@ -0,0 +1,18 @@ +/* + * groupForce.h + * + * Created on: May 12, 2009 + * Author: sears + */ + +#ifndef GROUPFORCE_H_ +#define GROUPFORCE_H_ + +#include +#include + +stasis_log_group_force_t * stasis_log_group_force_init(stasis_log_t * log, uint64_t wait_nsec); +void stasis_log_group_force_deinit(stasis_log_group_force_t * lh); +void stasis_log_group_force(stasis_log_group_force_t* lh, lsn_t lsn); + +#endif /* GROUPFORCE_H_ */ diff --git a/stasis/logger/logEntry.h b/stasis/logger/logEntry.h index a65b85b..b0d1802 100644 --- a/stasis/logger/logEntry.h +++ b/stasis/logger/logEntry.h @@ -43,14 +43,10 @@ terms specified in this license. #ifndef __LLADD_LOGGING_LOGENTRY_H #define __LLADD_LOGGING_LOGENTRY_H -#include - -BEGIN_C_DECLS - /** @file - Structs and memory managment routines for log entries + Structs and memory management routines for log entries @todo Other than some typedefs, is there anything in logEntry that belongs in the API? @@ -59,14 +55,23 @@ BEGIN_C_DECLS $Id$ */ -typedef struct { +#include + +BEGIN_C_DECLS + +typedef struct UpdateLogEntry UpdateLogEntry; +typedef struct LogEntry LogEntry; +typedef struct __raw_log_entry CLRLogEntry; + +#include +struct UpdateLogEntry { unsigned int funcID : 8; pageid_t page; int64_t arg_size; /* Implicit members: args; @ ((byte*)ule) + sizeof(UpdateLogEntry) */ -} UpdateLogEntry; +}; struct __raw_log_entry { lsn_t LSN; @@ -75,15 +80,14 @@ struct __raw_log_entry { unsigned int type; }; -typedef struct { +struct LogEntry { lsn_t LSN; lsn_t prevLSN; int xid; unsigned int type; UpdateLogEntry update; -} LogEntry; +}; -typedef struct __raw_log_entry CLRLogEntry; /** Allocate a log entry that does not contain any extra payload information. (Eg: Tbegin, Tcommit, etc.) @@ -118,9 +122,10 @@ LogEntry * allocCLRLogEntry(const LogEntry * e); */ void freeLogEntry(const LogEntry * e); /** + @param lh The log handle the entry will be stored in. (Needed because some log entries are of type INTERNALLOG) May be NULL if e is not of type INTERNALLOG. @return the length, in bytes, of e. */ -lsn_t sizeofLogEntry(const LogEntry * e); +lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e); /** @todo Remove explicit casts from getUpdateArgs calls (so we don't accidentally strip the const). @return the operation's arguments. diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index 813b3d7..e80edf6 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -57,6 +57,15 @@ terms specified in this license. #include +typedef struct stasis_log_t stasis_log_t; +typedef struct stasis_log_group_force_t stasis_log_group_force_t; + +typedef enum { + LOG_FORCE_COMMIT, LOG_FORCE_WAL +} stasis_log_force_mode_t; + +#include +#include /** Contains the state needed by the logging layer to perform operations on a transaction. @@ -68,7 +77,6 @@ typedef struct TransactionLog { pthread_mutex_t mut; } TransactionLog; -typedef struct stasis_log_t stasis_log_t; #include @@ -80,9 +88,6 @@ typedef struct stasis_log_t stasis_log_t; typedef int (guard_fcn_t)(const LogEntry *, void *); -typedef enum { - LOG_FORCE_COMMIT, LOG_FORCE_WAL -} stasis_log_force_mode_t; /** XXX TransactionTable should be private to transactional2.c! @@ -158,14 +163,11 @@ struct stasis_log_t { int (*is_durable)(struct stasis_log_t* log); + stasis_log_group_force_t * group_force; + void* impl; }; -/** - @todo get rid of this! - */ -extern stasis_log_t* stasis_log_file; - /** Synchronously make a prefix of the log durable. @@ -182,13 +184,13 @@ extern stasis_log_t* stasis_log_file; @see stasis_log_force_mode_t @see logger2.h for information about force_tail(). */ -void LogForce(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode); +void stasis_log_force(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode); /** Inform the logging layer that a new transaction has begun, and obtain a handle. */ -void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* l); +void stasis_log_begin_transaction(stasis_log_t* log, int xid, TransactionLog* l); /** Write a transaction PREPARE to the log tail. Blocks until the @@ -196,35 +198,35 @@ void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* l); @return the lsn of the prepare log entry */ -lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l); +lsn_t stasis_log_prepare_transaction(stasis_log_t* log, TransactionLog * l); /** Write a transaction COMMIT to the log tail. Blocks until the commit record is stable. @return the lsn of the commit log entry. */ -lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l); +lsn_t stasis_log_commit_transaction(stasis_log_t* log, TransactionLog * l); /** Write a transaction ABORT to the log tail. Does not force the log. @return the lsn of the abort log entry. */ -lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l); +lsn_t stasis_log_abort_transaction(stasis_log_t* log, TransactionLog * l); /** Write a end transaction record. This entry tells recovery's undo phase that it may safely ignore the transaction. */ -lsn_t LogTransEnd (stasis_log_t* log, TransactionLog * l); +lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, TransactionLog * l); /** - LogUpdate writes an UPDATELOG log record to the log tail. It + stasis_log_write_update writes an UPDATELOG log record to the log tail. It also interprets its operation argument to the extent necessary for allocating and laying out the log entry. Finally, it updates the state of the parameter l. */ -LogEntry * LogUpdate(stasis_log_t* log, +LogEntry * stasis_log_write_update(stasis_log_t* log, TransactionLog * l, Page * p, unsigned int operation, const byte * arg, size_t arg_size); @@ -237,11 +239,9 @@ LogEntry * LogUpdate(stasis_log_t* log, (Needed so that the lsn slot of the page in question can be updated.) */ -lsn_t LogCLR(stasis_log_t* log, const LogEntry * e); +lsn_t stasis_log_write_clr(stasis_log_t* log, const LogEntry * e); -lsn_t LogDummyCLR(stasis_log_t* log, int xid, +lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid, lsn_t prev_lsn, lsn_t compensated_lsn); - - #endif diff --git a/stasis/pageHandle.h b/stasis/pageHandle.h index 3c67855..99d9236 100644 --- a/stasis/pageHandle.h +++ b/stasis/pageHandle.h @@ -1,9 +1,20 @@ #ifndef STASIS_PAGEHANDLE_H #define STASIS_PAGEHANDLE_H + +typedef struct stasis_page_handle_t stasis_page_handle_t; + #include #include +#include -typedef struct stasis_page_handle_t { +/** + * Provides page-based, write-ahead access to the page file. + * + * The operations provided by page handles maintain the write-ahead invariant, + * and callers to write pages to file-handle buffers, and to force write the + * buffers to disk. + */ +struct stasis_page_handle_t { /** * Write page to disk, including correct LSN. Doing so may require a * call to logSync(). There is not much that can be done to avoid @@ -55,12 +66,26 @@ typedef struct stasis_page_handle_t { Force the page file to disk, then close it. */ void (*close)(struct stasis_page_handle_t* ph); + /** + The write ahead log associated with this page handle. + + If this is non-null, stasis_page_handle implementations will call + stasis_log_force on this to maintain the write-ahead invariant. + */ + stasis_log_t * log; /** * Pointer to implementation-specific state. */ void * impl; -} stasis_page_handle_t; +}; +/** + Open a stasis page handle. -stasis_page_handle_t * stasis_page_handle_open(struct stasis_handle_t * handle); + @param handle A stasis_handle_t that will perform I/O to the page file. + @param log A stasis_log_t that will be used to maintain the write ahead invariant. + If null, then write ahead will not be maintained. + @return a handle that performs high-level (page based, write-ahead) page file I/O. + */ +stasis_page_handle_t * stasis_page_handle_open(struct stasis_handle_t * handle, stasis_log_t * log); #endif //STASIS_PAGEHANDLE_H diff --git a/stasis/transactional.h b/stasis/transactional.h index 3a0a040..caf6333 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -539,7 +539,6 @@ terms specified in this license. #include "common.h" #include "flags.h" BEGIN_C_DECLS - /** * Initialize Stasis. This opens the pagefile and log, initializes * subcomponents, and runs recovery. @@ -769,6 +768,18 @@ int Tforget(int xid); */ int TdurabilityLevel(); +/** + * Force any dirty pages to disk, and truncate the log. After this + * function returns, the log will be as short as possible (outstanding + * transactions can prevent it from completely emptying the log). + */ +void TtruncateLog(); +/** + * XXX hack: return a pointer to stasis' log handle. This works around the fact + * that stasis_log_file is no longer global. + */ +void * stasis_log(void); + #include "operations.h" END_C_DECLS diff --git a/test/stasis/check_logEntry.c b/test/stasis/check_logEntry.c index ba2cde2..05c039c 100644 --- a/test/stasis/check_logEntry.c +++ b/test/stasis/check_logEntry.c @@ -54,7 +54,7 @@ START_TEST(rawLogEntryAlloc) assert(log->prevLSN == 200); assert(log->xid == 1); assert(log->type == XABORT); - assert(sizeofLogEntry(log) == sizeof(struct __raw_log_entry)); + assert(sizeofLogEntry(0, log) == sizeof(struct __raw_log_entry)); free(log); } END_TEST @@ -94,7 +94,7 @@ START_TEST(updateLogEntryAlloc) // printf("sizes %d %d\n",sizeofLogEntry(log),(sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + (sizeof(char)))); - assert(sizeofLogEntry(log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 3 * (sizeof(char)))); + assert(sizeofLogEntry(0, log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 3 * (sizeof(char)))); free(log); Tdeinit(); } @@ -120,7 +120,7 @@ START_TEST(updateLogEntryAllocNoExtras) assert(getUpdateArgs(log) == NULL); - assert(sizeofLogEntry(log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 0 * (sizeof(int)+sizeof(char)))); + assert(sizeofLogEntry(0, log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 0 * (sizeof(int)+sizeof(char)))); free(log); } END_TEST diff --git a/test/stasis/check_logWriter.c b/test/stasis/check_logWriter.c index 335180b..82fc55f 100644 --- a/test/stasis/check_logWriter.c +++ b/test/stasis/check_logWriter.c @@ -62,7 +62,7 @@ terms specified in this license. #define LOG_NAME "check_logWriter.log" -static void setup_log() { +static stasis_log_t * setup_log() { int i; lsn_t prevLSN = -1; int xid = 42; @@ -71,7 +71,7 @@ static void setup_log() { Tinit(); lsn_t firstLSN = -1; int first = 1; - + stasis_log_t * stasis_log_file = stasis_log(); for(i = 0 ; i < 1000; i++) { LogEntry * e = allocCommonLogEntry(prevLSN, xid, XBEGIN); const LogEntry * f; @@ -97,8 +97,8 @@ static void setup_log() { f = stasis_log_file->read_entry(stasis_log_file, prevLSN); - fail_unless(sizeofLogEntry(e) == sizeofLogEntry(f), "Log entry changed size!!"); - fail_unless(0 == memcmp(e,f,sizeofLogEntry(e)), "Log entries did not agree!!"); + fail_unless(sizeofLogEntry(0, e) == sizeofLogEntry(0, f), "Log entry changed size!!"); + fail_unless(0 == memcmp(e,f,sizeofLogEntry(0, e)), "Log entries did not agree!!"); freeLogEntry(e); freeLogEntry(f); @@ -118,6 +118,7 @@ static void setup_log() { freeLogEntry (e); freeLogEntry (g); } + return stasis_log_file; } /** @test @@ -143,8 +144,7 @@ static void loggerTest(int logType) { const LogEntry * e; LogHandle* h; int i = 0; - - setup_log(); + stasis_log_t * stasis_log_file = setup_log(); h = getLogHandle(stasis_log_file); while((e = nextInLog(h))) { @@ -175,7 +175,7 @@ START_TEST(loggerMemTest) { static void logHandleColdReverseIterator(int logType) { const LogEntry * e; stasis_log_type = logType; - setup_log(); + stasis_log_t * stasis_log_file = setup_log(); LogHandle* lh = getLogHandle(stasis_log_file); int i = 0; @@ -213,7 +213,8 @@ static void loggerTruncate(int logType) { const LogEntry * le2; const LogEntry * le3 = NULL; const LogEntry * tmp; - setup_log(); + + stasis_log_t * stasis_log_file = setup_log(); LogHandle* lh = getLogHandle(stasis_log_file); int i = 0; @@ -296,6 +297,7 @@ static void* worker_thread(void * arg) { lsns[i] = 0; } i = 0; + stasis_log_t * stasis_log_file = stasis_log(); while(i < ENTRIES_PER_THREAD) { LogEntry * le = allocCommonLogEntry(-1, -1, XBEGIN); @@ -423,6 +425,7 @@ void reopenLogWorkload(int truncating) { const int ENTRY_COUNT = 1000; const int SYNC_POINT = 900; + stasis_log_t * stasis_log_file = 0; stasis_transaction_table_active_transaction_count_set(0); @@ -439,14 +442,14 @@ void reopenLogWorkload(int truncating) { int xid = 1; TransactionLog l; pthread_mutex_init(&l.mut,0); - LogTransBegin(stasis_log_file, xid, &l); + stasis_log_begin_transaction(stasis_log_file, xid, &l); lsn_t startLSN = 0; LogEntry * entries[ENTRY_COUNT]; for(int i = 0; i < ENTRY_COUNT; i++) { - entries[i] = LogUpdate(stasis_log_file, + entries[i] = stasis_log_write_update(stasis_log_file, &l, NULL, OPERATION_NOOP, NULL, 0); if(i == SYNC_POINT) { @@ -483,8 +486,8 @@ void reopenLogWorkload(int truncating) { const LogEntry * e; while((e = nextInLog(h))) { if(e->type != INTERNALLOG) { - assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i])); - assert(!memcmp(e, entries[i], sizeofLogEntry(entries[i]))); + assert(sizeofLogEntry(0, e) == sizeofLogEntry(0, entries[i])); + assert(!memcmp(e, entries[i], sizeofLogEntry(0, entries[i]))); assert(i < ENTRY_COUNT); i++; } @@ -494,7 +497,7 @@ void reopenLogWorkload(int truncating) { LogEntry * entries2[ENTRY_COUNT]; for(int i = 0; i < ENTRY_COUNT; i++) { - entries2[i] = LogUpdate(stasis_log_file, &l, NULL, OPERATION_NOOP, + entries2[i] = stasis_log_write_update(stasis_log_file, &l, NULL, OPERATION_NOOP, NULL, 0); if(i == SYNC_POINT) { stasis_log_file->force_tail(stasis_log_file, LOG_FORCE_COMMIT); @@ -514,12 +517,12 @@ void reopenLogWorkload(int truncating) { while((e = nextInLog(h))) { if(e->type != INTERNALLOG) { if( i < ENTRY_COUNT) { - assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i])); - assert(!memcmp(e, entries[i], sizeofLogEntry(entries[i]))); + assert(sizeofLogEntry(0, e) == sizeofLogEntry(0, entries[i])); + assert(!memcmp(e, entries[i], sizeofLogEntry(0, entries[i]))); } else { - assert(i < ENTRY_COUNT * 2); - assert(sizeofLogEntry(e) == sizeofLogEntry(entries2[i-ENTRY_COUNT])); - assert(!memcmp(e, entries2[i-ENTRY_COUNT], sizeofLogEntry(entries2[i-ENTRY_COUNT]))); + assert(i < ENTRY_COUNT * 2); + assert(sizeofLogEntry(0, e) == sizeofLogEntry(0, entries2[i-ENTRY_COUNT])); + assert(!memcmp(e, entries2[i-ENTRY_COUNT], sizeofLogEntry(0, entries2[i-ENTRY_COUNT]))); } i++; } diff --git a/test/stasis/check_multiplexer.c b/test/stasis/check_multiplexer.c index 5ace442..8b1d188 100644 --- a/test/stasis/check_multiplexer.c +++ b/test/stasis/check_multiplexer.c @@ -186,7 +186,7 @@ START_TEST(multiplexTest) { LogEntry * e = allocUpdateLogEntry(-1, -1, OPERATION_LINEAR_HASH_INSERT, INVALID_PAGE, (byte*)arg, sizeof(linearHash_remove_arg) + sizeof(lsn_t) + sizeof(char)); - ThashInsert(xid, hash, (byte*)&i, sizeof(lsn_t), (byte*)e, sizeofLogEntry(e)); + ThashInsert(xid, hash, (byte*)&i, sizeof(lsn_t), (byte*)e, sizeofLogEntry(0, e)); free(e); diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 1eae237..e207642 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -179,7 +179,7 @@ START_TEST(operation_physical_do_undo) { // XXX This is a hack to put some stuff in the log. Otherwise, Tdeinit() fails. for(int i = 0; i < 10; i++) - stasis_log_file->write_entry(stasis_log_file, + ((stasis_log_t*)stasis_log())->write_entry(stasis_log(), allocCommonLogEntry(-1, -1, -1)); /** @todo need to re-think check_operations. The test is pretty broken. */ @@ -639,7 +639,7 @@ START_TEST(operation_reorderable) { stasis_log_reordering_handle_t * rh = stasis_log_reordering_handle_open( &stasis_transaction_table[xid[0]% MAX_TRANSACTIONS], - stasis_log_file, + stasis_log(), 100, // bytes (far too low!) 10, // log entries 500 // max byte size diff --git a/utilities/truncate_log.c b/utilities/truncate_log.c index 231e06b..760a4e1 100644 --- a/utilities/truncate_log.c +++ b/utilities/truncate_log.c @@ -2,7 +2,7 @@ #include int main(void) { Tinit(); - stasis_truncation_truncate(stasis_log_file, 1); + TtruncateLog(); Tdeinit(); return compensation_error();