yet another refactoring; broke groupForce into its own module, removed more static variables and namespace violations

This commit is contained in:
Sears Russell 2009-05-13 18:04:53 +00:00
parent bc554efc85
commit 87218ad929
35 changed files with 386 additions and 265 deletions

View file

@ -23,7 +23,7 @@
<tool id="cdt.managedbuild.tool.gnu.archiver.base.788388724" name="GCC Archiver" superClass="cdt.managedbuild.tool.gnu.archiver.base"/>
<tool id="cdt.managedbuild.tool.gnu.cpp.compiler.base.761900506" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.base"/>
<tool id="cdt.managedbuild.tool.gnu.c.compiler.base.1946158569" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base">
<option id="gnu.c.compiler.option.include.paths.2049135479" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<option id="gnu.c.compiler.option.include.paths.2049135479" name="Include paths (-I)" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/stasis/src/}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/stasis/}&quot;"/>
</option>
@ -211,6 +211,7 @@
</profile>
</scannerConfigBuildInfo>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.internal.ui.text.commentOwnerProjectMappings"/>
</cconfiguration>
</storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0">

View file

@ -15,10 +15,10 @@ int main(int argc, char ** argv) {
/*if(!(strcmp(mode, "writeback-pipeline"))) {
// pipelining likes big queues
// stasis_log_write_buffer_size = 50 * 1024 * 1024;
// stasis_log_file_write_buffer_size = 50 * 1024 * 1024;
} else {
}*/
stasis_log_write_buffer_size = 50 * 1024 * 1024;
stasis_log_file_write_buffer_size = 50 * 1024 * 1024;
Tinit();
alloc_rids(num_rids,&rids,&fast);
@ -77,7 +77,7 @@ int main(int argc, char ** argv) {
handles[i] = stasis_log_reordering_handle_open(
&stasis_transaction_table[xid%MAX_TRANSACTIONS],
stasis_log_file,
(0.9*stasis_log_write_buffer_size)/num_workers,
(0.9*stasis_log_file_write_buffer_size)/num_workers,
//512*1024/ua->divisor, // 0.5 mb in log tail at once
1000000/num_workers, // max num outstanding requests
(50 * 1024 * 1024)/num_workers // max backlog in bytes

View file

@ -55,7 +55,7 @@ typedef struct {
void build_cache(recordid * rids, cached_addr** cache, long long count) {
*cache = malloc (sizeof(**cache) * count);
lsn_t log_trunc = stasis_log_file->truncation_point(stasis_log_file);
lsn_t log_trunc = ((stasis_log_t*)stasis_log())->truncation_point(stasis_log());
for(long long i = 0; i < count; i++) {
(*cache)[i].pid = rids[i].page;
@ -91,8 +91,8 @@ int my_write_entry(struct stasis_log_t* log, LogEntry *e) {
}
void emulate_remote_log() {
original_write_entry = stasis_log_file->write_entry;
stasis_log_file->write_entry = my_write_entry;
original_write_entry = ((stasis_log_t*)stasis_log())->write_entry;
((stasis_log_t*)stasis_log())->write_entry = my_write_entry;
}
void emulate_remote_pages() {
origWrite = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite;
@ -157,8 +157,8 @@ void * writeback_unit_of_work(void * ap) {
stasis_log_reordering_handle_t * rh
= stasis_log_reordering_handle_open(
&stasis_transaction_table[ua->xid%MAX_TRANSACTIONS],
stasis_log_file,
(0.9*stasis_log_write_buffer_size)/ua->divisor,
stasis_log(),
(0.9*stasis_log_file_write_buffer_size)/ua->divisor,
//512*1024/ua->divisor, // 0.5 mb in log tail at once
1000000/ua->divisor, // max num outstanding requests
(50 * 1024 * 1024)/ua->divisor // max backlog in bytes
@ -166,7 +166,7 @@ void * writeback_unit_of_work(void * ap) {
/*
stasis_log_reordering_handle_open(&stasis_transaction_table[ua->xid%MAX_TRANSACTIONS],
stasis_log_file,
(stasis_log_write_buffer_size * 0.25)/ua->divisor,
(stasis_log_file_write_buffer_size * 0.25)/ua->divisor,
//512*1024/ua->divisor, // 0.5 mb in log tail at once
1000000/ua->divisor, // max num outstanding requests
(50 * 1024 * 1024)/ua->divisor // max backlog in bytes
@ -178,7 +178,7 @@ stasis_log_reordering_handle_open(&stasis_transaction_table[ua->xid%MAX_TRANSACT
TsetReorderableWriteBack(ua->xid, rh, ua->cache[idx].pid,
ua->cache[idx].off, ua->cache[idx].len,&ua->cache[idx].val,&old);
// TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val);
}
stasis_log_reordering_handle_close(rh);
return 0;
@ -199,8 +199,8 @@ void * bg_unit_of_work(void * ap) {
stasis_log_reordering_handle_t * rh
= stasis_log_reordering_handle_open(&stasis_transaction_table[ua->xid%MAX_TRANSACTIONS],
stasis_log_file,
(stasis_log_write_buffer_size * 0.25)/ua->divisor,
stasis_log(),
(stasis_log_file_write_buffer_size * 0.25)/ua->divisor,
//512*1024/ua->divisor, // 0.5 mb in log tail at once
1000000/ua->divisor, // max num outstanding requests
(50 * 1024 * 1024)/ua->divisor // max backlog in bytes
@ -224,7 +224,7 @@ void* bg_worker(void * ap) {
long long start = tv.tv_usec + tv.tv_sec * 1000000;
int xid = Tbegin();
if(stasis_log_file->write_entry == my_write_entry) {
if(((stasis_log_t*)stasis_log())->write_entry == my_write_entry) {
// based on tweaking; also, normal-net is ~ 100x slower than nromal
int num_worker = 100;
pthread_t workers[num_worker];

View file

@ -17,10 +17,10 @@ int main(int argc, char** argv) {
// XXX instead of overriding this, set tail of priority log to 80%
// stasis log buf or something...
// stasis_log_write_buffer_size = 50 * 1024 * 1024;
// stasis_log_file_write_buffer_size = 50 * 1024 * 1024;
printf("%s %s %s %s %lld\n", argv[0], argv[1], argv[2], argv[3],
stasis_log_write_buffer_size);
stasis_log_file_write_buffer_size);
Tinit();

View file

@ -8,12 +8,12 @@
#include <sys/time.h>
#include <time.h>
int main(int argc, char** argv) {
int main(int argc, char** argv) {
Tinit();
char * key;
char * value;
int ret;
int xid = Tbegin();
@ -31,8 +31,8 @@ int main(int argc, char** argv) {
// bleah; gcc would warn without the casts, since it doesn't understand that %as = Allocate String
char ** keyp = &key; // The extra garbage is to avoid type punning warnings...
char ** valuep = &value;
while(EOF != (ret=scanf("%as\t%as\n", (float*)keyp, (float*)valuep))) {
if(!ret) {
while(EOF != (ret=scanf("%as\t%as\n", (float*)keyp, (float*)valuep))) {
if(!ret) {
printf("Could not parse input!\n");
Tabort(xid);
Tdeinit();
@ -44,8 +44,8 @@ int main(int argc, char** argv) {
free(key);
free(value);
count ++;
if(!(count % 10000)) {
if(!(count % 10000)) {
gettimeofday(&now,0);
double rate = ((double)count)/((double)(now.tv_sec-start.tv_sec));
printf("%d tuples inserted (%f per sec)\n", count, rate);
@ -53,7 +53,7 @@ int main(int argc, char** argv) {
}
Tcommit(xid);
stasis_truncation_truncate(stasis_log_file, 1);
TtruncateLog();
Tdeinit();
return 0;
}

View file

@ -1,20 +1,21 @@
ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c
common.c flags.c stats.c bufferManager.c
linkedlist.c operations.c pageHandle.c
bufferManager/legacy/pageFile.c
bufferManager/legacy/pageFile.c
bufferManager/legacy/pageCache.c
bufferManager/legacy/legacyBufferManager.c
page.c bufferPool.c blobManager.c
page.c bufferPool.c blobManager.c
recovery2.c truncation.c transactional2.c
allocationPolicy.c lockManager.c iterator.c
consumer.c arrayCollection.c ringbuffer.c fifo.c
multiplexer.c graph.c logger/logEntry.c
logger/safeWrites.c logger/logWriterUtils.c
logger/filePool.c
logger/inMemoryLog.c
logger/filePool.c
logger/inMemoryLog.c
logger/logHandle.c logger/logger2.c
logger/logMemory.c page/raw.c page/slotted.c page/lsnFree.c
logger/reorderingHandle.c
logger/groupForce.c
page/fixed.c compensations.c
operations/pageOperations.c page/indirect.c
operations/decrement.c operations/increment.c

View file

@ -11,6 +11,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common
logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \
logger/logMemory.c \
logger/reorderingHandle.c \
logger/groupForce.c \
page/raw.c page/slotted.c page/lsnFree.c page/fixed.c compensations.c \
operations/pageOperations.c page/indirect.c operations/decrement.c \
operations/increment.c operations/prepare.c operations/set.c \

View file

@ -94,7 +94,7 @@ static void pfPageWrite(stasis_page_handle_t * h, Page * ret) {
// If necessary, force the log to disk so that ret's LSN will be stable.
assert(ret->LSN == stasis_page_lsn_read(ret));
LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL);
if(h->log) { stasis_log_force(h->log, ret->LSN, LOG_FORCE_WAL); }
pthread_mutex_lock(&stable_mutex);
@ -133,14 +133,14 @@ static void pfPageWrite(stasis_page_handle_t * h, Page * ret) {
//#define PAGE_FILE_O_DIRECT
/** @todo O_DIRECT is broken in older linuxes (eg 2.4). The build script should disable it on such platforms. */
stasis_page_handle_t* openPageFile() {
stasis_page_handle_t* openPageFile(stasis_log_t * log) {
stasis_page_handle_t * ret = malloc(sizeof(*ret));
ret->read = pfPageRead;
ret->write = pfPageWrite;
ret->force_file = pfForcePageFile;
ret->force_range = pfForceRangePageFile;
ret->close = pfClosePageFile;
ret->log = log;
DEBUG("Opening storefile.\n");
#ifdef PAGE_FILE_O_DIRECT

View file

@ -85,8 +85,8 @@ const char* stasis_log_dir_name = "stasis_log";
#endif //STASIS_LOG_DIR_LSN_CHARS
const int stasis_log_dir_name_lsn_chars = 20;
#ifdef STASIS_LOG_WRITE_BUFFER_SIZE
lsn_t stasis_log_write_buffer_size = STASIS_LOG_WRITE_BUFFER_SIZE;
#ifdef STASIS_LOG_FILE_WRITE_BUFFER_SIZE
lsn_t stasis_log_file_write_buffer_size = STASIS_LOG_FILE_WRITE_BUFFER_SIZE;
#else
lsn_t stasis_log_write_buffer_size = 1024 * 1024;
lsn_t stasis_log_file_write_buffer_size = 1024 * 1024;
#endif

View file

@ -218,8 +218,8 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f
pthread_mutex_init(&fp->read_mutex,0);
fp->state_latch = initlock();
fp->buffer = calloc(stasis_log_write_buffer_size, sizeof(char));
setbuffer(fp->fp, fp->buffer, stasis_log_write_buffer_size);
fp->buffer = calloc(stasis_log_file_write_buffer_size, sizeof(char));
setbuffer(fp->fp, fp->buffer, stasis_log_file_write_buffer_size);
}

View file

@ -0,0 +1,103 @@
/*
* groupForce.c
*
* Created on: May 12, 2009
* Author: sears
*/
#include <config.h>
#include <stasis/common.h>
#include <stasis/logger/logger2.h>
#include <stdio.h>
#include <assert.h>
struct stasis_log_group_force_t {
stasis_log_t * log;
pthread_mutex_t check_commit;
pthread_cond_t tooFewXacts;
int pendingCommits;
uint64_t wait_nsec;
};
stasis_log_group_force_t * stasis_log_group_force_init(stasis_log_t * log, uint64_t wait_nsec) {
static int warned = 0;
if(wait_nsec > (1000 * 1000 * 1000)) {
warned = 1;
fprintf(stderr, "TODO stasis_log_group_force: Efficiently support wait "
"times > 1 second. (%llu second wait time requested)\n",
(long long unsigned int) (wait_nsec / (1000 * 1000 * 1000)));
}
stasis_log_group_force_t * ret = malloc(sizeof(*ret));
ret->log = log;
pthread_mutex_init(&ret->check_commit,0);
pthread_cond_init(&ret->tooFewXacts,0);
ret->pendingCommits = 0;
ret->wait_nsec = wait_nsec;
return ret;
}
void stasis_log_group_force(stasis_log_group_force_t* lh, lsn_t lsn) {
// static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
// static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
// static int pendingCommits;
pthread_mutex_lock(&lh->check_commit);
if(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) > lsn) {
pthread_mutex_unlock(&lh->check_commit);
return;
}
if(lh->log->is_durable(lh->log)) {
struct timeval now;
struct timespec timeout;
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec;
timeout.tv_nsec = now.tv_usec * 1000;
// 0123456789 <- number of zeros on the next three lines...
// timeout.tv_nsec += 10000000; // wait ten msec.
timeout.tv_nsec += lh->wait_nsec;
while(timeout.tv_nsec > (1000 * 1000 * 1000)) {
timeout.tv_nsec -= (1000 * 1000 * 1000);
timeout.tv_sec++;
}
lh->pendingCommits++;
int xactcount = TactiveTransactionCount();
if((xactcount > 1 && lh->pendingCommits < xactcount) ||
(xactcount > 20 && lh->pendingCommits < (int)((double)xactcount * 0.95))) {
int retcode;
while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&lh->tooFewXacts, &lh->check_commit, &timeout))) {
if(retcode != 0) {
printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by "
"a signal in groupCommit(). Acting as though it timed out.\n",
__FILE__, __LINE__);
break;
}
if(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) > lsn) {
(lh->pendingCommits)--;
pthread_mutex_unlock(&lh->check_commit);
return;
}
}
}
} else {
(lh->pendingCommits)++;
}
if(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) <= lsn) {
lh->log->force_tail(lh->log, LOG_FORCE_COMMIT);
assert(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) > lsn);
pthread_cond_broadcast(&lh->tooFewXacts);
}
assert(lh->log->first_unstable_lsn(lh->log,LOG_FORCE_COMMIT) > lsn);
(lh->pendingCommits)--;
pthread_mutex_unlock(&lh->check_commit);
return;
}
void stasis_log_group_force_deinit(stasis_log_group_force_t * lh) {
pthread_mutex_destroy(&lh->check_commit);
pthread_cond_destroy(&lh->tooFewXacts);
free(lh);
}

View file

@ -41,8 +41,8 @@ static int stasis_log_impl_in_memory_write_entry(stasis_log_t * log, LogEntry *e
e->LSN = impl->nextAvailableLSN;
LogEntry * cpy = malloc(sizeofLogEntry(e));
memcpy(cpy, e, sizeofLogEntry(e));
LogEntry * cpy = malloc(sizeofLogEntry(log, e));
memcpy(cpy, e, sizeofLogEntry(log, e));
DEBUG("lsn: %ld\n", e->LSN);
impl->buffer[bufferOffset] = cpy;
@ -129,9 +129,9 @@ static const LogEntry * stasis_log_impl_in_memory_read_entry(stasis_log_t* log,
assert(ptr);
assert(ptr->LSN == lsn);
LogEntry * ret = malloc(sizeofLogEntry(ptr));
LogEntry * ret = malloc(sizeofLogEntry(log, ptr));
memcpy(ret, ptr, sizeofLogEntry(ptr));
memcpy(ret, ptr, sizeofLogEntry(log, ptr));
DEBUG("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN);
return ret;

View file

@ -111,14 +111,14 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
}
LogEntry * allocCLRLogEntry(const LogEntry * old_e) {
CLRLogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeofLogEntry(old_e));
CLRLogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeofLogEntry(0, old_e));
ret->LSN = -1;
ret->prevLSN = old_e->prevLSN;
ret->xid = old_e->xid;
ret->type = CLRLOG;
DEBUG("compensates: %lld\n", old_e->LSN);
memcpy((void*)getCLRCompensated(ret), old_e, sizeofLogEntry(old_e));
memcpy((void*)getCLRCompensated(ret), old_e, sizeofLogEntry(0, old_e));
return (LogEntry*)ret;
}
@ -127,13 +127,13 @@ void freeLogEntry(const LogEntry* e) {
}
lsn_t sizeofLogEntry(const LogEntry * e) {
lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e) {
switch (e->type) {
case CLRLOG:
{
const LogEntry * contents = getCLRCompensated((const CLRLogEntry*) e);
assert(contents->type != CLRLOG);
return sizeof(struct __raw_log_entry) + sizeofLogEntry(contents);
return sizeof(struct __raw_log_entry) + sizeofLogEntry(lh, contents);
}
case UPDATELOG:
{
@ -141,7 +141,8 @@ lsn_t sizeofLogEntry(const LogEntry * e) {
sizeof(UpdateLogEntry) + e->update.arg_size;
}
case INTERNALLOG:
return stasis_log_file->sizeof_internal_entry(stasis_log_file, e);
assert(lh);
return lh->sizeof_internal_entry(lh,e);
case XPREPARE:
return sizeof(struct __raw_log_entry)+sizeof(lsn_t);
default:

View file

@ -52,31 +52,13 @@ terms specified in this license.
#include <config.h>
#include <stasis/common.h>
#include <stdio.h>
#include <assert.h>
#include <stasis/logger/logger2.h>
#include <stasis/logger/safeWrites.h>
#include <stasis/logger/inMemoryLog.h>
#include <stasis/page.h>
/**
@todo stasis_log_file should be in transactional2.c, and not global
*/
stasis_log_t* stasis_log_file = 0;
static int pendingCommits;
void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* tl) {
tl->xid = xid;
DEBUG("Log Begin %d\n", xid);
tl->prevLSN = -1;
tl->recLSN = -1;
}
static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) {
static lsn_t stasis_log_write_common(stasis_log_t* log, TransactionLog * l, int type) {
LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type);
lsn_t ret;
@ -95,9 +77,9 @@ static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) {
freeLogEntry(e);
return ret;
}
static lsn_t LogTransCommonPrepare(stasis_log_t* log, TransactionLog * l) {
static lsn_t stasis_log_write_prepare(stasis_log_t* log, TransactionLog * l) {
LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN);
lsn_t ret;
@ -120,7 +102,7 @@ static lsn_t LogTransCommonPrepare(stasis_log_t* log, TransactionLog * l) {
}
LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l,
LogEntry * stasis_log_write_update(stasis_log_t* log, TransactionLog * l,
Page * p, unsigned int op,
const byte * arg, size_t arg_size) {
@ -138,7 +120,7 @@ LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l,
return e;
}
lsn_t LogCLR(stasis_log_t* log, const LogEntry * old_e) {
lsn_t stasis_log_write_clr(stasis_log_t* log, const LogEntry * old_e) {
LogEntry * e = allocCLRLogEntry(old_e);
log->write_entry(log, e);
@ -150,7 +132,7 @@ lsn_t LogCLR(stasis_log_t* log, const LogEntry * old_e) {
return ret;
}
lsn_t LogDummyCLR(stasis_log_t* log, int xid, lsn_t prevLSN,
lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid, lsn_t prevLSN,
lsn_t compensatedLSN) {
const LogEntry * const_e;
LogEntry * e;
@ -160,99 +142,49 @@ lsn_t LogDummyCLR(stasis_log_t* log, int xid, lsn_t prevLSN,
} else {
const_e = log->read_entry(log, compensatedLSN);
}
e = malloc(sizeofLogEntry(const_e));
memcpy(e, const_e, sizeofLogEntry(const_e));
e = malloc(sizeofLogEntry(log, const_e));
memcpy(e, const_e, sizeofLogEntry(log, const_e));
e->LSN = compensatedLSN;
lsn_t ret = LogCLR(log, e);
lsn_t ret = stasis_log_write_clr(log, e);
freeLogEntry(const_e);
free(e);
return ret;
}
lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l) {
lsn_t lsn = LogTransCommon(log, l, XCOMMIT);
LogForce(log, lsn, LOG_FORCE_COMMIT);
void stasis_log_begin_transaction(stasis_log_t* log, int xid, TransactionLog* tl) {
tl->xid = xid;
DEBUG("Log Begin %d\n", xid);
tl->prevLSN = -1;
tl->recLSN = -1;
}
lsn_t stasis_log_abort_transaction(stasis_log_t* log, TransactionLog * l) {
return stasis_log_write_common(log, l, XABORT);
}
lsn_t stasis_log_end_aborted_transaction(stasis_log_t* log, TransactionLog * l) {
return stasis_log_write_common(log, l, XEND);
}
lsn_t stasis_log_prepare_transaction(stasis_log_t* log, TransactionLog * l) {
lsn_t lsn = stasis_log_write_prepare(log, l);
stasis_log_force(log, lsn, LOG_FORCE_COMMIT);
return lsn;
}
lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l) {
return LogTransCommon(log, l, XABORT);
}
lsn_t LogTransEnd(stasis_log_t* log, TransactionLog * l) {
return LogTransCommon(log, l, XEND);
}
lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l) {
lsn_t lsn = LogTransCommonPrepare(log, l);
LogForce(log, lsn, LOG_FORCE_COMMIT);
lsn_t stasis_log_commit_transaction(stasis_log_t* log, TransactionLog * l) {
lsn_t lsn = stasis_log_write_common(log, l, XCOMMIT);
stasis_log_force(log, lsn, LOG_FORCE_COMMIT);
return lsn;
}
static void groupCommit(stasis_log_t* log, lsn_t lsn);
void LogForce(stasis_log_t* log, lsn_t lsn,
void stasis_log_force(stasis_log_t* log, lsn_t lsn,
stasis_log_force_mode_t mode) {
if(mode == LOG_FORCE_COMMIT) {
groupCommit(log, lsn);
if((mode == LOG_FORCE_COMMIT) && log->group_force) {
stasis_log_group_force(log->group_force, lsn);
} else {
if(log->first_unstable_lsn(log,mode) <= lsn) {
log->force_tail(log,mode);
}
}
}
static void groupCommit(stasis_log_t* log, lsn_t lsn) {
static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&check_commit);
if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn) {
pthread_mutex_unlock(&check_commit);
return;
}
if(log->is_durable(log)) {
struct timeval now;
struct timespec timeout;
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec;
timeout.tv_nsec = now.tv_usec * 1000;
// 0123456789 <- number of zeros on the next three lines...
timeout.tv_nsec += 10000000; // wait ten msec.
if(timeout.tv_nsec > 1000000000) {
timeout.tv_nsec -= 1000000000;
timeout.tv_sec++;
}
pendingCommits++;
int xactcount = TactiveTransactionCount();
if((xactcount > 1 && pendingCommits < xactcount) ||
(xactcount > 20 && pendingCommits < (int)((double)xactcount * 0.95))) {
int retcode;
while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) {
if(retcode != 0) {
printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by "
"a signal in groupCommit(). Acting as though it timed out.\n",
__FILE__, __LINE__);
break;
}
if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn) {
pendingCommits--;
pthread_mutex_unlock(&check_commit);
return;
}
}
}
} else {
pendingCommits++;
}
if(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) <= lsn) {
log->force_tail(log, LOG_FORCE_COMMIT);
assert(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn);
pthread_cond_broadcast(&tooFewXacts);
}
assert(log->first_unstable_lsn(log,LOG_FORCE_COMMIT) > lsn);
pendingCommits--;
pthread_mutex_unlock(&check_commit);
return;
}

View file

@ -11,14 +11,14 @@ static void* stasis_log_reordering_handle_worker(void * a) {
while(h->cur_len) {
size_t chunk_len = 0;
while(chunk_len < h->chunk_len && h->cur_len) {
LogEntry * e = LogUpdate(h->log,
LogEntry * e = stasis_log_write_update(h->log,
h->l,
h->queue[h->cur_off].p,
h->queue[h->cur_off].op,
h->queue[h->cur_off].arg,
h->queue[h->cur_off].arg_size);
assert(e->xid != INVALID_XID);
chunk_len += sizeofLogEntry(e);
chunk_len += sizeofLogEntry(h->log, e);
if(h->queue[h->cur_off].p) {
Page * p = h->queue[h->cur_off].p;
@ -31,14 +31,14 @@ static void* stasis_log_reordering_handle_worker(void * a) {
xaction table for prevLSN is their friend. */
h->cur_len--;
h->phys_size -= sizeofLogEntry(e);
h->phys_size -= sizeofLogEntry(h->log, e);
h->cur_off = (h->cur_off+1)%h->max_len;
}
if(chunk_len > 0) {
lsn_t to_force = h->l->prevLSN;
pthread_mutex_unlock(&h->mut);
LogForce(h->log, to_force, LOG_FORCE_COMMIT);
stasis_log_force(h->log, to_force, LOG_FORCE_COMMIT);
if(stasis_log_reordering_usleep_after_flush) {
usleep(stasis_log_reordering_usleep_after_flush);
}

View file

@ -225,7 +225,7 @@ static inline int isDurable_LogWriter(stasis_log_t* log) {
static inline lsn_t nextEntry_LogWriter(stasis_log_t* log,
const LogEntry* e) {
return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t);
return e->LSN + sizeofLogEntry(log, e) + sizeof(lsn_t);
}
// crc handling
@ -233,8 +233,8 @@ static inline lsn_t nextEntry_LogWriter(stasis_log_t* log,
static inline void log_crc_reset(stasis_log_safe_writes_state* sw) {
sw->crc = 0;
}
static inline void log_crc_update(const LogEntry * le, unsigned int * crc) {
*crc = stasis_crc32(le, sizeofLogEntry(le), *crc);
static inline void log_crc_update(stasis_log_t* log, const LogEntry * e, unsigned int * crc) {
*crc = stasis_crc32(e, sizeofLogEntry(log, e), *crc);
}
static LogEntry* log_crc_dummy_entry() {
LogEntry* ret = allocCommonLogEntry(0, -1, INTERNALLOG);
@ -275,7 +275,7 @@ static inline lsn_t log_crc_next_lsn(stasis_log_t* log, lsn_t ret) {
break;
}
} else {
log_crc_update(le, &crc);
log_crc_update(log, le, &crc);
}
freeLogEntry(le);
}
@ -303,14 +303,14 @@ static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) {
stasis_log_safe_writes_state* sw = log->impl;
const lsn_t size = sizeofLogEntry(e);
const lsn_t size = sizeofLogEntry(log, e);
/* Set the log entry's LSN. */
pthread_mutex_lock(&sw->nextAvailableLSN_mutex);
e->LSN = sw->nextAvailableLSN;
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
log_crc_update(e, &sw->crc);
log_crc_update(log, e, &sw->crc);
DEBUG("Writing Log entry type = %d lsn = %ld, size = %ld\n",
e->type, e->LSN, size);
@ -600,7 +600,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
int firstInternalEntry = 1;
lsn_t nextLSN = 0;
while((le = nextInLog(lh))) {
size = sizeofLogEntry(le);
size = sizeofLogEntry(log, le);
if(nextLSN) {
assert(nextLSN == le->LSN);
}
@ -636,7 +636,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
assert(sw->nextAvailableLSN == LSN + lengthOfCopiedLog);
size = sizeofLogEntry(crc_entry);
size = sizeofLogEntry(log, crc_entry);
sw->nextAvailableLSN = nextEntry_LogWriter(log, crc_entry);
@ -700,7 +700,7 @@ int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
return LLADD_IO_ERROR;
}
setbuffer(sw->fp, sw->buffer, stasis_log_write_buffer_size);
setbuffer(sw->fp, sw->buffer, stasis_log_file_write_buffer_size);
sw->global_offset = LSN - sizeof(lsn_t);
@ -776,10 +776,8 @@ stasis_log_t* stasis_log_safe_writes_open(const char * filename,
stasis_log_t* log = malloc(sizeof(*log));
memcpy(log,&proto, sizeof(proto));
log->impl = sw;
// XXX hack; we call things that call into this object during init!
stasis_log_file = log;
sw->buffer = calloc(stasis_log_write_buffer_size, sizeof(char));
sw->buffer = calloc(stasis_log_file_write_buffer_size, sizeof(char));
if(!sw->buffer) { return 0; /*LLADD_NO_MEM;*/ }
@ -804,7 +802,7 @@ stasis_log_t* stasis_log_safe_writes_open(const char * filename,
}
/* Increase the length of log's buffer, since it's in O_SYNC mode. */
setbuffer(sw->fp, sw->buffer, stasis_log_write_buffer_size);
setbuffer(sw->fp, sw->buffer, stasis_log_file_write_buffer_size);
/* fread() doesn't notice when another handle writes to its file,
even if fflush() is used to push the changes out to disk.

View file

@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
@ -42,7 +42,7 @@ terms specified in this license.
/**********************************************
* $Id$
*
*
* sets the given reference to dat
**********************************************/
@ -55,16 +55,16 @@ terms specified in this license.
recordid prepare_bogus_rec = { 0, 0, 0};
static int op_prepare(const LogEntry * e, Page * p) {
LogForce(stasis_log_file, e->LSN, LOG_FORCE_COMMIT);
//NO-OP.
return 0;
}
stasis_operation_impl stasis_op_impl_prepare() {
stasis_operation_impl o = {
OPERATION_PREPARE,
OPERATION_PREPARE, /* id */
OPERATION_PREPARE,
OPERATION_NOOP,
&op_prepare /* Function */
&op_prepare/* Function */
};
return o;
}
@ -78,7 +78,7 @@ typedef struct{
int aborted;
} PrepareGuardState;
void * getPrepareGuardState() {
void * getPrepareGuardState() {
PrepareGuardState * s = malloc (sizeof(PrepareGuardState));
s->continueIterating = 1;
s->prevLSN = -1;
@ -89,10 +89,10 @@ void * getPrepareGuardState() {
int prepareGuard(const LogEntry * e, void * state) {
PrepareGuardState * pgs = state;
PrepareGuardState * pgs = state;
int ret = pgs->continueIterating;
if(e->type == UPDATELOG && !pgs->aborted) {
if(e->update.funcID == OPERATION_PREPARE) {
if(e->update.funcID == OPERATION_PREPARE) {
pgs->continueIterating = 0;
pgs->prevLSN = e->prevLSN;
}
@ -112,7 +112,7 @@ int prepareGuard(const LogEntry * e, void * state) {
/** @todo When fleshing out the logHandle's prepareAction interface,
figure out what the return value should mean... */
int prepareAction(void * state) {
PrepareGuardState * pgs = state;
PrepareGuardState * pgs = state;
int ret;
if(!(pgs->continueIterating || pgs->aborted)) {
//assert(pgs->prevLSN != -1);
@ -121,6 +121,6 @@ int prepareAction(void * state) {
ret = 1;
} else {
ret = 0;
}
}
return ret;
}

View file

@ -8,8 +8,9 @@ void stasis_slotted_lsn_free_initialize_page(Page * p) {
*stasis_page_type_ptr(p) = SLOTTED_LSN_FREE_PAGE;
*stasis_page_lsn_ptr(p) = -1;
}
// XXX still not correct; need to have an "LSN_FREE" constant.
static void lsnFreeLoaded(Page * p) {
p->LSN = stasis_log_file->next_available_lsn(stasis_log_file);
p->LSN = 0; //stasis_log_file->next_available_lsn(stasis_log_file);
}
static void lsnFreeFlushed(Page * p) { }

View file

@ -7,9 +7,11 @@
XXX rawPageInferMetadata is wrong; setting lsn to LogFlushedLSN() breaks
recovery.
XXX still not correct; need an "LSN_FREE" constant.
*/
void rawPageInferMetadata(Page * p) {
p->LSN = stasis_log_file->first_unstable_lsn(stasis_log_file, LOG_FORCE_WAL);
p->LSN = 0; //stasis_log_file->first_unstable_lsn(stasis_log_file, LOG_FORCE_WAL);
}
byte* rawPageGetData(int xid, Page * p) {
@ -17,7 +19,7 @@ byte* rawPageGetData(int xid, Page * p) {
return units_from_start_raw(byte, p, 0);
}
void rawPageSetData(int xid, lsn_t lsn, Page * p) {
void rawPageSetData(int xid, lsn_t lsn, Page * p) {
assertlocked(p->rwlatch);
// writelock(p->rwlatch, 255);
rawPageWriteLSN(xid, p, lsn);
@ -26,23 +28,23 @@ void rawPageSetData(int xid, lsn_t lsn, Page * p) {
return;
}
lsn_t rawPageReadLSN(const Page * p) {
lsn_t rawPageReadLSN(const Page * p) {
assertlocked(p->rwlatch);
// There are some potential optimizations here since the page
// doesn't "really" have an LSN at all, but we need to be careful
// about log truncation...
return p->LSN;
return p->LSN;
}
void rawPageWriteLSN(int xid, Page * p, lsn_t lsn) {
void rawPageWriteLSN(int xid, Page * p, lsn_t lsn) {
assertlocked(p->rwlatch);
if(p->LSN < lsn) { p->LSN = lsn; }
}
void rawPageCommit(int xid) {
void rawPageCommit(int xid) {
// no-op
}
void rawPageAbort(int xid) {
void rawPageAbort(int xid) {
// no-op
}

View file

@ -21,7 +21,7 @@ static void phWrite(stasis_page_handle_t * ph, Page * ret) {
// or we'll deadlock.
writelock(ret->rwlatch,0);
stasis_page_flushed(ret);
LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL);
if(ph->log) { stasis_log_force(ph->log, ret->LSN, LOG_FORCE_WAL); }
int err = ((stasis_handle_t*)ph->impl)->write(ph->impl, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE);
if(err) {
printf("Couldn't write to page file: %s\n", strerror(err));
@ -66,7 +66,8 @@ static void phClose(stasis_page_handle_t * ph) {
}
free(ph);
}
stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle) {
stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle,
stasis_log_t * log) {
DEBUG("Using pageHandle implementation\n");
stasis_page_handle_t * ret = malloc(sizeof(*ret));
ret->write = phWrite;
@ -74,6 +75,7 @@ stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle) {
ret->force_file = phForce;
ret->force_range = phForceRange;
ret->close = phClose;
ret->log = log;
ret->impl = handle;
return ret;
}

View file

@ -290,7 +290,7 @@ static void stasis_recovery_undo(stasis_log_t* log, int recovery) {
}
// Log a CLR for this entry
lsn_t clr_lsn = LogCLR(log, e);
lsn_t clr_lsn = stasis_log_write_clr(log, e);
DEBUG("logged clr\n");
stasis_transaction_table_roll_forward(e->xid, e->LSN, e->prevLSN);
@ -316,7 +316,7 @@ static void stasis_recovery_undo(stasis_log_t* log, int recovery) {
stasis_operation_undo(ce, 0, 0);
// compensated_lsn = -1 -> that the logical undo is a NOOP.
// that way, we don't undo this operation twice.
LogDummyCLR(log, ce->xid, ce->prevLSN, -1);
stasis_log_write_dummy_clr(log, ce->xid, ce->prevLSN, -1);
} else {
DEBUG("physical clr: op %d lsn %lld\n",
ce->update.funcID, ce->LSN);

View file

@ -31,6 +31,8 @@ TransactionLog stasis_transaction_table[MAX_TRANSACTIONS];
static int stasis_transaction_table_num_active = 0;
static int stasis_transaction_table_xid_count = 0;
static stasis_log_t* stasis_log_file = 0;
/**
This mutex protects stasis_transaction_table, numActiveXactions and
xidCount.
@ -61,12 +63,17 @@ int Tinit() {
stasis_operation_table_init();
dirtyPagesInit();
stasis_log_file = 0;
if(LOG_TO_FILE == stasis_log_type) {
stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name,
stasis_log_file_mode,
stasis_log_file_permissions);
stasis_log_file->group_force =
stasis_log_group_force_init(stasis_log_file, 10 * 1000 * 1000); // timeout in nsec; want 10msec.
} else if(LOG_TO_MEMORY == stasis_log_type) {
stasis_log_file = stasis_log_impl_in_memory_open();
stasis_log_file->group_force = 0;
} else {
assert(stasis_log_file != NULL);
}
@ -75,12 +82,13 @@ int Tinit() {
stasis_page_handle_t * page_handle;
if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) {
printf("\nWarning: Using old I/O routines (with known bugs).\n");
page_handle = openPageFile();
page_handle = openPageFile(stasis_log_file);
} else {
stasis_handle_t * h = stasis_handle_open(stasis_store_file_name);
// XXX should not be global.
page_handle = stasis_page_handle_open(h);
page_handle = stasis_page_handle_open(h, stasis_log_file);
}
stasis_buffer_manager_open(bufferManagerType, page_handle);
DEBUG("Buffer manager type = %d\n", bufferManagerType);
pageOperationsInit();
@ -135,7 +143,7 @@ int Tbegin() {
pthread_mutex_unlock(&stasis_transaction_table_mutex);
LogTransBegin(stasis_log_file, xidCount_tmp, &stasis_transaction_table[index]);
stasis_log_begin_transaction(stasis_log_file, xidCount_tmp, &stasis_transaction_table[index]);
if(globalLockManager.begin) { globalLockManager.begin(stasis_transaction_table[index].xid); }
@ -155,7 +163,7 @@ static compensated_function void TactionHelper(int xid,
writelock(p->rwlatch,0);
e = LogUpdate(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS],
e = stasis_log_write_update(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS],
p, op, dat, datlen);
assert(stasis_transaction_table[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
@ -183,7 +191,7 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page,
p ? p->id : INVALID_PAGE,
dat, datlen);
stasis_log_reordering_handle_append(h, p, op, dat, datlen, sizeofLogEntry(e));
stasis_log_reordering_handle_append(h, p, op, dat, datlen, sizeofLogEntry(0, e));
e->LSN = 0;
writelock(p->rwlatch,0);
@ -216,7 +224,7 @@ void TreorderableWritebackUpdate(int xid, void* hp,
assert(xid >= 0 && stasis_transaction_table[xid % MAX_TRANSACTIONS].xid == xid);
pthread_mutex_lock(&h->mut);
LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen);
stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(e));
stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(0, e));
pthread_mutex_unlock(&h->mut);
}
compensated_function void TupdateStr(int xid, pageid_t page,
@ -283,7 +291,7 @@ int Tcommit(int xid) {
pthread_mutex_unlock(&stasis_transaction_table_mutex);
#endif
lsn = LogTransCommit(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS]);
lsn = stasis_log_commit_transaction(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS]);
if(globalLockManager.commit) { globalLockManager.commit(xid); }
stasis_alloc_committed(xid);
@ -303,7 +311,7 @@ int Tprepare(int xid) {
assert(xid >= 0);
off_t i = xid % MAX_TRANSACTIONS;
assert(stasis_transaction_table[i].xid == xid);
LogTransPrepare(stasis_log_file, &stasis_transaction_table[i]);
stasis_log_prepare_transaction(stasis_log_file, &stasis_transaction_table[i]);
return 0;
}
@ -314,7 +322,7 @@ int Tabort(int xid) {
TransactionLog * t =&stasis_transaction_table[xid%MAX_TRANSACTIONS];
assert(t->xid == xid);
lsn = LogTransAbort(stasis_log_file, t);
lsn = stasis_log_abort_transaction(stasis_log_file, t);
/** @todo is the order of the next two calls important? */
undoTrans(stasis_log_file, *t);
@ -327,7 +335,7 @@ int Tabort(int xid) {
int Tforget(int xid) {
TransactionLog * t = &stasis_transaction_table[xid%MAX_TRANSACTIONS];
assert(t->xid == xid);
LogTransEnd(stasis_log_file, t);
stasis_log_end_aborted_transaction(stasis_log_file, t);
stasis_transaction_table_forget(t->xid);
return 0;
}
@ -350,7 +358,9 @@ int Tdeinit() {
stasis_buffer_manager_close();
DEBUG("Closing page file tdeinit\n");
stasis_page_deinit();
stasis_log_group_force_t * group_force = stasis_log_file->group_force;
stasis_log_file->close(stasis_log_file);
if(group_force) { stasis_log_group_force_deinit(group_force); }
dirtyPagesDeinit();
stasis_initted = 0;
@ -480,6 +490,9 @@ int TdurabilityLevel() {
}
}
void TtruncateLog() {
stasis_truncation_truncate(stasis_log_file, 1);
}
typedef struct {
lsn_t prev_lsn;
lsn_t compensated_lsn;
@ -487,7 +500,7 @@ typedef struct {
int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) {
assert(xid >= 0);
LogEntry * e = LogUpdate(stasis_log_file,
LogEntry * e = stasis_log_write_update(stasis_log_file,
&stasis_transaction_table[xid % MAX_TRANSACTIONS],
NULL, op, dat, datSize);
lsn_t prev_lsn = e->prevLSN;
@ -497,7 +510,7 @@ int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) {
freeLogEntry(e);
lsn_t clrLSN = LogDummyCLR(stasis_log_file, xid, prev_lsn, compensated_lsn);
lsn_t clrLSN = stasis_log_write_dummy_clr(stasis_log_file, xid, prev_lsn, compensated_lsn);
stasis_transaction_table[xid % MAX_TRANSACTIONS].prevLSN = clrLSN;
@ -506,7 +519,7 @@ int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) {
void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
assert(xid >= 0);
LogEntry * e = LogUpdate(stasis_log_file,
LogEntry * e = stasis_log_write_update(stasis_log_file,
&stasis_transaction_table[xid % MAX_TRANSACTIONS],
NULL, op, dat, datSize);
DEBUG("Begin Nested Top Action e->LSN: %ld\n", e->LSN);
@ -528,7 +541,7 @@ lsn_t TendNestedTopAction(int xid, void * handle) {
assert(xid >= 0);
// Write a CLR.
lsn_t clrLSN = LogDummyCLR(stasis_log_file, xid,
lsn_t clrLSN = stasis_log_write_dummy_clr(stasis_log_file, xid,
h->prev_lsn, h->compensated_lsn);
// Ensure that the next action in this transaction points to the CLR.
@ -541,3 +554,6 @@ lsn_t TendNestedTopAction(int xid, void * handle) {
return clrLSN;
}
void * stasis_log() {
return stasis_log_file;
}

View file

@ -145,7 +145,7 @@ extern void (*forcePages)();
extern void (*forcePageRange)(pageid_t start, pageid_t stop);
extern void (*stasis_buffer_manager_simulate_crash)();
int stasis_buffer_manager_open(int type, stasis_page_handle_t* ph);
int stasis_buffer_manager_open(int type, stasis_page_handle_t * ph);
/**
* will write out any dirty pages, assumes that there are no running
* transactions

View file

@ -3,7 +3,8 @@
#define __PAGE_FILE_H
#include <stasis/pageHandle.h>
#include <stasis/logger/logger2.h>
stasis_page_handle_t* openPageFile();
stasis_page_handle_t* openPageFile(stasis_log_t * log);
#endif /* __PAGE_FILE_H */

View file

@ -98,5 +98,5 @@ extern const int stasis_log_dir_name_lsn_chars;
/**
Number of bytes that stasis' log may buffer before writeback.
*/
extern lsn_t stasis_log_write_buffer_size;
extern lsn_t stasis_log_file_write_buffer_size;
#endif

View file

@ -0,0 +1,18 @@
/*
* groupForce.h
*
* Created on: May 12, 2009
* Author: sears
*/
#ifndef GROUPFORCE_H_
#define GROUPFORCE_H_
#include <stasis/common.h>
#include <stasis/logger/logger2.h>
stasis_log_group_force_t * stasis_log_group_force_init(stasis_log_t * log, uint64_t wait_nsec);
void stasis_log_group_force_deinit(stasis_log_group_force_t * lh);
void stasis_log_group_force(stasis_log_group_force_t* lh, lsn_t lsn);
#endif /* GROUPFORCE_H_ */

View file

@ -43,14 +43,10 @@ terms specified in this license.
#ifndef __LLADD_LOGGING_LOGENTRY_H
#define __LLADD_LOGGING_LOGENTRY_H
#include <stasis/common.h>
BEGIN_C_DECLS
/**
@file
Structs and memory managment routines for log entries
Structs and memory management routines for log entries
@todo Other than some typedefs, is there anything in logEntry that belongs in the API?
@ -59,14 +55,23 @@ BEGIN_C_DECLS
$Id$
*/
typedef struct {
#include <stasis/common.h>
BEGIN_C_DECLS
typedef struct UpdateLogEntry UpdateLogEntry;
typedef struct LogEntry LogEntry;
typedef struct __raw_log_entry CLRLogEntry;
#include <stasis/logger/logger2.h>
struct UpdateLogEntry {
unsigned int funcID : 8;
pageid_t page;
int64_t arg_size;
/* Implicit members:
args; @ ((byte*)ule) + sizeof(UpdateLogEntry)
*/
} UpdateLogEntry;
};
struct __raw_log_entry {
lsn_t LSN;
@ -75,15 +80,14 @@ struct __raw_log_entry {
unsigned int type;
};
typedef struct {
struct LogEntry {
lsn_t LSN;
lsn_t prevLSN;
int xid;
unsigned int type;
UpdateLogEntry update;
} LogEntry;
};
typedef struct __raw_log_entry CLRLogEntry;
/**
Allocate a log entry that does not contain any extra payload
information. (Eg: Tbegin, Tcommit, etc.)
@ -118,9 +122,10 @@ LogEntry * allocCLRLogEntry(const LogEntry * e);
*/
void freeLogEntry(const LogEntry * e);
/**
@param lh The log handle the entry will be stored in. (Needed because some log entries are of type INTERNALLOG) May be NULL if e is not of type INTERNALLOG.
@return the length, in bytes, of e.
*/
lsn_t sizeofLogEntry(const LogEntry * e);
lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e);
/**
@todo Remove explicit casts from getUpdateArgs calls (so we don't accidentally strip the const).
@return the operation's arguments.

View file

@ -57,6 +57,15 @@ terms specified in this license.
#include <stasis/common.h>
typedef struct stasis_log_t stasis_log_t;
typedef struct stasis_log_group_force_t stasis_log_group_force_t;
typedef enum {
LOG_FORCE_COMMIT, LOG_FORCE_WAL
} stasis_log_force_mode_t;
#include <stasis/logger/groupForce.h>
#include <stasis/constants.h>
/**
Contains the state needed by the logging layer to perform
operations on a transaction.
@ -68,7 +77,6 @@ typedef struct TransactionLog {
pthread_mutex_t mut;
} TransactionLog;
typedef struct stasis_log_t stasis_log_t;
#include <stasis/operations.h>
@ -80,9 +88,6 @@ typedef struct stasis_log_t stasis_log_t;
typedef int (guard_fcn_t)(const LogEntry *, void *);
typedef enum {
LOG_FORCE_COMMIT, LOG_FORCE_WAL
} stasis_log_force_mode_t;
/**
XXX TransactionTable should be private to transactional2.c!
@ -158,14 +163,11 @@ struct stasis_log_t {
int (*is_durable)(struct stasis_log_t* log);
stasis_log_group_force_t * group_force;
void* impl;
};
/**
@todo get rid of this!
*/
extern stasis_log_t* stasis_log_file;
/**
Synchronously make a prefix of the log durable.
@ -182,13 +184,13 @@ extern stasis_log_t* stasis_log_file;
@see stasis_log_force_mode_t
@see logger2.h for information about force_tail().
*/
void LogForce(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode);
void stasis_log_force(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode);
/**
Inform the logging layer that a new transaction has begun, and
obtain a handle.
*/
void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* l);
void stasis_log_begin_transaction(stasis_log_t* log, int xid, TransactionLog* l);
/**
Write a transaction PREPARE to the log tail. Blocks until the
@ -196,35 +198,35 @@ void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* l);
@return the lsn of the prepare log entry
*/
lsn_t LogTransPrepare(stasis_log_t* log, TransactionLog * l);
lsn_t stasis_log_prepare_transaction(stasis_log_t* log, TransactionLog * l);
/**
Write a transaction COMMIT to the log tail. Blocks until the commit
record is stable.
@return the lsn of the commit log entry.
*/
lsn_t LogTransCommit(stasis_log_t* log, TransactionLog * l);
lsn_t stasis_log_commit_transaction(stasis_log_t* log, TransactionLog * l);
/**
Write a transaction ABORT to the log tail. Does not force the log.
@return the lsn of the abort log entry.
*/
lsn_t LogTransAbort(stasis_log_t* log, TransactionLog * l);
lsn_t stasis_log_abort_transaction(stasis_log_t* log, TransactionLog * l);
/**
Write a end transaction record. This entry tells recovery's undo
phase that it may safely ignore the transaction.
*/
lsn_t LogTransEnd (stasis_log_t* log, TransactionLog * l);
lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, TransactionLog * l);
/**
LogUpdate writes an UPDATELOG log record to the log tail. It
stasis_log_write_update writes an UPDATELOG log record to the log tail. It
also interprets its operation argument to the extent necessary for
allocating and laying out the log entry. Finally, it updates the
state of the parameter l.
*/
LogEntry * LogUpdate(stasis_log_t* log,
LogEntry * stasis_log_write_update(stasis_log_t* log,
TransactionLog * l, Page * p, unsigned int operation,
const byte * arg, size_t arg_size);
@ -237,11 +239,9 @@ LogEntry * LogUpdate(stasis_log_t* log,
(Needed so that the lsn slot of the page in question can be
updated.)
*/
lsn_t LogCLR(stasis_log_t* log, const LogEntry * e);
lsn_t stasis_log_write_clr(stasis_log_t* log, const LogEntry * e);
lsn_t LogDummyCLR(stasis_log_t* log, int xid,
lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid,
lsn_t prev_lsn, lsn_t compensated_lsn);
#endif

View file

@ -1,9 +1,20 @@
#ifndef STASIS_PAGEHANDLE_H
#define STASIS_PAGEHANDLE_H
typedef struct stasis_page_handle_t stasis_page_handle_t;
#include <stasis/page.h>
#include <stasis/io/handle.h>
#include <stasis/logger/logger2.h>
typedef struct stasis_page_handle_t {
/**
* Provides page-based, write-ahead access to the page file.
*
* The operations provided by page handles maintain the write-ahead invariant,
* and callers to write pages to file-handle buffers, and to force write the
* buffers to disk.
*/
struct stasis_page_handle_t {
/**
* Write page to disk, including correct LSN. Doing so may require a
* call to logSync(). There is not much that can be done to avoid
@ -55,12 +66,26 @@ typedef struct stasis_page_handle_t {
Force the page file to disk, then close it.
*/
void (*close)(struct stasis_page_handle_t* ph);
/**
The write ahead log associated with this page handle.
If this is non-null, stasis_page_handle implementations will call
stasis_log_force on this to maintain the write-ahead invariant.
*/
stasis_log_t * log;
/**
* Pointer to implementation-specific state.
*/
void * impl;
} stasis_page_handle_t;
};
/**
Open a stasis page handle.
stasis_page_handle_t * stasis_page_handle_open(struct stasis_handle_t * handle);
@param handle A stasis_handle_t that will perform I/O to the page file.
@param log A stasis_log_t that will be used to maintain the write ahead invariant.
If null, then write ahead will not be maintained.
@return a handle that performs high-level (page based, write-ahead) page file I/O.
*/
stasis_page_handle_t * stasis_page_handle_open(struct stasis_handle_t * handle, stasis_log_t * log);
#endif //STASIS_PAGEHANDLE_H

View file

@ -539,7 +539,6 @@ terms specified in this license.
#include "common.h"
#include "flags.h"
BEGIN_C_DECLS
/**
* Initialize Stasis. This opens the pagefile and log, initializes
* subcomponents, and runs recovery.
@ -769,6 +768,18 @@ int Tforget(int xid);
*/
int TdurabilityLevel();
/**
* Force any dirty pages to disk, and truncate the log. After this
* function returns, the log will be as short as possible (outstanding
* transactions can prevent it from completely emptying the log).
*/
void TtruncateLog();
/**
* XXX hack: return a pointer to stasis' log handle. This works around the fact
* that stasis_log_file is no longer global.
*/
void * stasis_log(void);
#include "operations.h"
END_C_DECLS

View file

@ -54,7 +54,7 @@ START_TEST(rawLogEntryAlloc)
assert(log->prevLSN == 200);
assert(log->xid == 1);
assert(log->type == XABORT);
assert(sizeofLogEntry(log) == sizeof(struct __raw_log_entry));
assert(sizeofLogEntry(0, log) == sizeof(struct __raw_log_entry));
free(log);
}
END_TEST
@ -94,7 +94,7 @@ START_TEST(updateLogEntryAlloc)
// printf("sizes %d %d\n",sizeofLogEntry(log),(sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + (sizeof(char))));
assert(sizeofLogEntry(log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 3 * (sizeof(char))));
assert(sizeofLogEntry(0, log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 3 * (sizeof(char))));
free(log);
Tdeinit();
}
@ -120,7 +120,7 @@ START_TEST(updateLogEntryAllocNoExtras)
assert(getUpdateArgs(log) == NULL);
assert(sizeofLogEntry(log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 0 * (sizeof(int)+sizeof(char))));
assert(sizeofLogEntry(0, log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 0 * (sizeof(int)+sizeof(char))));
free(log);
}
END_TEST

View file

@ -62,7 +62,7 @@ terms specified in this license.
#define LOG_NAME "check_logWriter.log"
static void setup_log() {
static stasis_log_t * setup_log() {
int i;
lsn_t prevLSN = -1;
int xid = 42;
@ -71,7 +71,7 @@ static void setup_log() {
Tinit();
lsn_t firstLSN = -1;
int first = 1;
stasis_log_t * stasis_log_file = stasis_log();
for(i = 0 ; i < 1000; i++) {
LogEntry * e = allocCommonLogEntry(prevLSN, xid, XBEGIN);
const LogEntry * f;
@ -97,8 +97,8 @@ static void setup_log() {
f = stasis_log_file->read_entry(stasis_log_file, prevLSN);
fail_unless(sizeofLogEntry(e) == sizeofLogEntry(f), "Log entry changed size!!");
fail_unless(0 == memcmp(e,f,sizeofLogEntry(e)), "Log entries did not agree!!");
fail_unless(sizeofLogEntry(0, e) == sizeofLogEntry(0, f), "Log entry changed size!!");
fail_unless(0 == memcmp(e,f,sizeofLogEntry(0, e)), "Log entries did not agree!!");
freeLogEntry(e);
freeLogEntry(f);
@ -118,6 +118,7 @@ static void setup_log() {
freeLogEntry (e);
freeLogEntry (g);
}
return stasis_log_file;
}
/**
@test
@ -143,8 +144,7 @@ static void loggerTest(int logType) {
const LogEntry * e;
LogHandle* h;
int i = 0;
setup_log();
stasis_log_t * stasis_log_file = setup_log();
h = getLogHandle(stasis_log_file);
while((e = nextInLog(h))) {
@ -175,7 +175,7 @@ START_TEST(loggerMemTest) {
static void logHandleColdReverseIterator(int logType) {
const LogEntry * e;
stasis_log_type = logType;
setup_log();
stasis_log_t * stasis_log_file = setup_log();
LogHandle* lh = getLogHandle(stasis_log_file);
int i = 0;
@ -213,7 +213,8 @@ static void loggerTruncate(int logType) {
const LogEntry * le2;
const LogEntry * le3 = NULL;
const LogEntry * tmp;
setup_log();
stasis_log_t * stasis_log_file = setup_log();
LogHandle* lh = getLogHandle(stasis_log_file);
int i = 0;
@ -296,6 +297,7 @@ static void* worker_thread(void * arg) {
lsns[i] = 0;
}
i = 0;
stasis_log_t * stasis_log_file = stasis_log();
while(i < ENTRIES_PER_THREAD) {
LogEntry * le = allocCommonLogEntry(-1, -1, XBEGIN);
@ -423,6 +425,7 @@ void reopenLogWorkload(int truncating) {
const int ENTRY_COUNT = 1000;
const int SYNC_POINT = 900;
stasis_log_t * stasis_log_file = 0;
stasis_transaction_table_active_transaction_count_set(0);
@ -439,14 +442,14 @@ void reopenLogWorkload(int truncating) {
int xid = 1;
TransactionLog l;
pthread_mutex_init(&l.mut,0);
LogTransBegin(stasis_log_file, xid, &l);
stasis_log_begin_transaction(stasis_log_file, xid, &l);
lsn_t startLSN = 0;
LogEntry * entries[ENTRY_COUNT];
for(int i = 0; i < ENTRY_COUNT; i++) {
entries[i] = LogUpdate(stasis_log_file,
entries[i] = stasis_log_write_update(stasis_log_file,
&l, NULL, OPERATION_NOOP, NULL, 0);
if(i == SYNC_POINT) {
@ -483,8 +486,8 @@ void reopenLogWorkload(int truncating) {
const LogEntry * e;
while((e = nextInLog(h))) {
if(e->type != INTERNALLOG) {
assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i]));
assert(!memcmp(e, entries[i], sizeofLogEntry(entries[i])));
assert(sizeofLogEntry(0, e) == sizeofLogEntry(0, entries[i]));
assert(!memcmp(e, entries[i], sizeofLogEntry(0, entries[i])));
assert(i < ENTRY_COUNT);
i++;
}
@ -494,7 +497,7 @@ void reopenLogWorkload(int truncating) {
LogEntry * entries2[ENTRY_COUNT];
for(int i = 0; i < ENTRY_COUNT; i++) {
entries2[i] = LogUpdate(stasis_log_file, &l, NULL, OPERATION_NOOP,
entries2[i] = stasis_log_write_update(stasis_log_file, &l, NULL, OPERATION_NOOP,
NULL, 0);
if(i == SYNC_POINT) {
stasis_log_file->force_tail(stasis_log_file, LOG_FORCE_COMMIT);
@ -514,12 +517,12 @@ void reopenLogWorkload(int truncating) {
while((e = nextInLog(h))) {
if(e->type != INTERNALLOG) {
if( i < ENTRY_COUNT) {
assert(sizeofLogEntry(e) == sizeofLogEntry(entries[i]));
assert(!memcmp(e, entries[i], sizeofLogEntry(entries[i])));
assert(sizeofLogEntry(0, e) == sizeofLogEntry(0, entries[i]));
assert(!memcmp(e, entries[i], sizeofLogEntry(0, entries[i])));
} else {
assert(i < ENTRY_COUNT * 2);
assert(sizeofLogEntry(e) == sizeofLogEntry(entries2[i-ENTRY_COUNT]));
assert(!memcmp(e, entries2[i-ENTRY_COUNT], sizeofLogEntry(entries2[i-ENTRY_COUNT])));
assert(i < ENTRY_COUNT * 2);
assert(sizeofLogEntry(0, e) == sizeofLogEntry(0, entries2[i-ENTRY_COUNT]));
assert(!memcmp(e, entries2[i-ENTRY_COUNT], sizeofLogEntry(0, entries2[i-ENTRY_COUNT])));
}
i++;
}

View file

@ -186,7 +186,7 @@ START_TEST(multiplexTest) {
LogEntry * e = allocUpdateLogEntry(-1, -1, OPERATION_LINEAR_HASH_INSERT, INVALID_PAGE, (byte*)arg,
sizeof(linearHash_remove_arg) + sizeof(lsn_t) + sizeof(char));
ThashInsert(xid, hash, (byte*)&i, sizeof(lsn_t), (byte*)e, sizeofLogEntry(e));
ThashInsert(xid, hash, (byte*)&i, sizeof(lsn_t), (byte*)e, sizeofLogEntry(0, e));
free(e);

View file

@ -179,7 +179,7 @@ START_TEST(operation_physical_do_undo) {
// XXX This is a hack to put some stuff in the log. Otherwise, Tdeinit() fails.
for(int i = 0; i < 10; i++)
stasis_log_file->write_entry(stasis_log_file,
((stasis_log_t*)stasis_log())->write_entry(stasis_log(),
allocCommonLogEntry(-1, -1, -1));
/** @todo need to re-think check_operations. The test is pretty broken. */
@ -639,7 +639,7 @@ START_TEST(operation_reorderable) {
stasis_log_reordering_handle_t * rh
= stasis_log_reordering_handle_open(
&stasis_transaction_table[xid[0]% MAX_TRANSACTIONS],
stasis_log_file,
stasis_log(),
100, // bytes (far too low!)
10, // log entries
500 // max byte size

View file

@ -2,7 +2,7 @@
#include <stasis/truncation.h>
int main(void) {
Tinit();
stasis_truncation_truncate(stasis_log_file, 1);
TtruncateLog();
Tdeinit();
return compensation_error();