Major benchmark update; log force merging.

This commit is contained in:
Sears Russell 2005-03-17 04:38:55 +00:00
parent 3812f36e6b
commit d8eb6c9aa7
11 changed files with 338 additions and 81 deletions

View file

@ -38,6 +38,8 @@ void env_dir_create(void);
void env_open(DB_ENV **);
void usage(void);
int alwaysCommit;
DB_ENV *dbenv;
DB *db_cats; /*, *db_color, *db_fruit; */
@ -61,8 +63,10 @@ main(int argc, char *argv[])
argc -= optind;
argv += optind; */
assert(argc == 3);
assert(argc == 3 || argc == 4);
/* threads have static thread sizes. Ughh. */
alwaysCommit = (argc == 4);
pthread_attr_t attr;
pthread_attr_init(&attr);
@ -122,8 +126,15 @@ main(int argc, char *argv[])
NULL);*/
int r;
int num_threads = atoi(argv[1]);
/*int */num_xact = 1; //atoi(argv[2]); //100;
/*int */insert_per_xact = atoi(argv[2]); //1000;
if(alwaysCommit) {
num_xact = atoi(argv[2]); //100;
insert_per_xact = 1;
} else {
num_xact = 1;
insert_per_xact = atoi(argv[2]);
}
/*int *///insert_per_xact = //atoi(argv[2]); //1000;
// insert_per_xact = 1;
// int num_threads = 100;
@ -225,10 +236,10 @@ env_open(DB_ENV **dbenvp)
dbenv->set_errfile(dbenv, stderr);
/* Do deadlock detection internally. */
if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) {
/* if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) {
dbenv->err(dbenv, ret, "set_lk_detect: DB_LOCK_DEFAULT");
exit (1);
}
} */
/*
* Open a transactional environment:
@ -238,7 +249,7 @@ env_open(DB_ENV **dbenvp)
* read/write owner only
*/
if ((ret = dbenv->open(dbenv, ENV_DIRECTORY,
DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
DB_CREATE /*| DB_INIT_LOCK */| DB_INIT_LOG |
DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD,
S_IRUSR | S_IWUSR)) != 0) {
dbenv->err(dbenv, ret, "dbenv->open: %s", ENV_DIRECTORY);
@ -514,12 +525,12 @@ retry: /* Begin the transaction. */
}
/* Delete any previously existing item -- LLADD always does this during insert.*/
switch (ret = db->del(db, tid, &key, 0)) {
/* switch (ret = db->del(db, tid, &key, 0)) {
case 0:
case DB_NOTFOUND:
break;
case DB_LOCK_DEADLOCK:
/* Deadlock: retry the operation. */
// Deadlock: retry the operation.
if ((ret = tid->abort(tid)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->abort");
exit (1);
@ -528,7 +539,7 @@ retry: /* Begin the transaction. */
default:
dbenv->err(dbenv, ret, "db->del: %s", name);
exit (1);
}
}*/
/* Create a cursor. */
if ((ret = db->cursor(db, tid, &dbc, 0)) != 0) {
@ -546,7 +557,7 @@ retry: /* Begin the transaction. */
break;
case DB_LOCK_DEADLOCK:
va_end(ap);
abort();
/* Deadlock: retry the operation. */
if ((ret = dbc->c_close(dbc)) != 0) {
dbenv->err(
@ -600,30 +611,34 @@ run_xact(DB_ENV *dbenv, DB *db, int offset, int count)
retry: /* Begin the transaction. */
if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0) {
dbenv->err(dbenv, ret, "DB_ENV->txn_begin");
exit (1);
// exit (1);
abort();
}
/* Delete any previously existing item. */
/* switch (ret = db->del(db, tid, &key, 0)) {
switch (ret = db->del(db, tid, &key, 0)) {
case 0:
case DB_NOTFOUND:
break;
case DB_LOCK_DEADLOCK:
/ * Deadlock: retry the operation. * /
abort();
/* Deadlock: retry the operation. */
if ((ret = tid->abort(tid)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->abort");
exit (1);
}
goto retry;
default:
dbenv->err(dbenv, ret, "db->del: %s", name);
// dbenv->err(dbenv, ret, "db->del: %s", name);
abort();
exit (1);
} */
}
/* Create a cursor. */
if ((ret = db->cursor(db, tid, &dbc, 0)) != 0) {
dbenv->err(dbenv, ret, "db->cursor");
exit (1);
abort();
// exit (1);
}
/* Append the items, in order. */
@ -636,20 +651,22 @@ retry: /* Begin the transaction. */
switch (ret = db->del(db, tid, &key, 0)) {
case 0:
abort(); // we dont insert dups in this test!
// abort(); // we dont insert dups in this test!
case DB_NOTFOUND:
break;
case DB_LOCK_DEADLOCK:
/* Deadlock: retry the operation. */
/// Deadlock: retry the operation.
abort(); // the lock manager should be disabled for this test...
if ((ret = tid->abort(tid)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->abort");
exit (1);
// exit (1);
abort();
}
goto retry;
default:
dbenv->err(dbenv, ret, "db->del: %d", q);
exit (1);
// exit (1);
abort();
}
/* data.data = s;
@ -662,22 +679,25 @@ retry: /* Begin the transaction. */
break;
case DB_LOCK_DEADLOCK:
// va_end(ap);
abort(); // no locking!
/* Deadlock: retry the operation. */
if ((ret = dbc->c_close(dbc)) != 0) {
dbenv->err(
dbenv, ret, "dbc->c_close");
exit (1);
// exit (1);
abort();
}
if ((ret = tid->abort(tid)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->abort");
exit (1);
// exit (1);
abort();
}
goto retry;
default:
/* Error: run recovery. */
dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q);
exit (1);
// exit (1);
abort();
}
}
// va_end(ap);
@ -685,12 +705,15 @@ retry: /* Begin the transaction. */
/* Success: commit the change. */
if ((ret = dbc->c_close(dbc)) != 0) {
dbenv->err(dbenv, ret, "dbc->c_close");
exit (1);
// exit (1);
abort();
}
if ((ret = tid->commit(tid, 0)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->commit");
exit (1);
// exit (1);
abort();
}
// printf("commit");
}
void

View file

@ -265,10 +265,10 @@ env_open(DB_ENV **dbenvp)
dbenv->set_errfile(dbenv, stderr);
/* Do deadlock detection internally. */
if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) {
/* if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) {
dbenv->err(dbenv, ret, "set_lk_detect: DB_LOCK_DEFAULT");
exit (1);
}
}*/
/*
* Open a transactional environment:

View file

@ -20,6 +20,7 @@
int activeThreads = 0;
int max_active = 0;
int alwaysCommit;
/*
double avg_var = 0;
double max_var = 0;
@ -68,10 +69,17 @@ static void * go (void * arg_ptr) {
*/
// printf("(%d)", k);
if(alwaysCommit) {
// printf("Commit");
Tcommit(xid);
xid = Tbegin();
}
}
Tcommit(xid);
/*
for(j = k * count; j < (k+1) *(count) ; j++) {
int tmp = -100;
@ -105,11 +113,13 @@ static void * go (void * arg_ptr) {
int main(int argc, char** argv) {
assert(argc == 3);
assert(argc == 3 || argc == 4);
int thread_count = atoi(argv[1]);
count = atoi(argv[2]);
alwaysCommit = (argc==4);
unlink("storefile.txt");
unlink("logfile.txt");
unlink("blob0_file.txt");
@ -120,6 +130,7 @@ int main(int argc, char** argv) {
Tinit();
int xid = Tbegin();
hash = ThashCreate(xid, sizeof(int), sizeof(int));
// hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
Tcommit(xid);

View file

@ -94,6 +94,9 @@ available.%
narrow interfaces, since transactional storage algorithms'
interdependencies and requirements are notoriously complicated.}
%Not implementing ARIES any more!
\item {\bf With these trends in mind, we have implemented a modular
version of ARIES that makes as few assumptions as possible about
application data structures or workload. Where such assumptions are

View file

@ -82,9 +82,9 @@ terms specified in this license.
/* #define MAX_BUFFER_SIZE 100003 */
/*#define MAX_BUFFER_SIZE 20029 */
#define MAX_BUFFER_SIZE 10007
//#define MAX_BUFFER_SIZE 10007
/*#define MAX_BUFFER_SIZE 5003*/
/*#define MAX_BUFFER_SIZE 2003 */
#define MAX_BUFFER_SIZE 2003
/* #define MAX_BUFFER_SIZE 71 */
/*#define MAX_BUFFER_SIZE 7 */
/*#define BUFFER_ASOOCIATIVE 2 */
@ -130,6 +130,9 @@ terms specified in this license.
#define OPERATION_LINEAR_HASH_INSERT 31
#define OPERATION_LINEAR_HASH_REMOVE 32
// this operation is specific to OASYS
#define OPERATION_SET_DIFF 33
/* number above should be less than number below */
#define MAX_OPERATIONS 40

View file

@ -1,8 +1,11 @@
#include <lladd/transactional.h>
#ifndef __ITERATOR_H
#define __ITERATOR_H
BEGIN_C_DECLS
#define MAX_ITERATOR_TYPES 10
#define LINEAR_HASH_NTA_ITERATOR 0
#define ARRAY_ITERATOR 1
@ -16,6 +19,7 @@ typedef struct {
int (*key) (int xid, void * it, byte ** key);
int (*value)(int xid, void * it, byte ** value);
void (*tupleDone)(int xid, void * it);
void (*releaseLock)(int xid, void *it);
} lladdIterator_def_t;
typedef struct {
@ -59,7 +63,7 @@ int Titerator_tryNext(int xid, lladdIterator_t * it);
provides , allows iterator to clean up if necessary
> such as release lock
*/
int (*releaseTuple)(int xid, void * it);
//int (*releaseTuple)(int xid, void * it);
/**
This function allows the caller to access the current iterator
@ -98,6 +102,9 @@ int Titerator_value(int xid, lladdIterator_t * it, byte ** value);
all iterators are reentrant.)
*/
void Titerator_tupleDone(int xid, lladdIterator_t * it);
void Titerator_releaseLock(int xid, lladdIterator_t * it);
END_C_DECLS
#endif

View file

@ -39,7 +39,8 @@ void iterator_init() {
logMemory_Iterator_tryNext,
logMemory_Iterator_key,
logMemory_Iterator_value,
logMemory_Iterator_releaseTuple
logMemory_Iterator_releaseTuple,
logMemory_Iterator_releaseLock,
};
lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def);
}
@ -52,3 +53,4 @@ int Titerator_tryNext(int xid, lladdIterator_t * it) { return itera
int Titerator_key (int xid, lladdIterator_t * it, byte ** key){ return iterators[it->type].key (xid, it->impl, key); }
int Titerator_value(int xid, lladdIterator_t * it, byte ** value){ return iterators[it->type].value(xid, it->impl, value); }
void Titerator_tupleDone(int xid, lladdIterator_t * it) { iterators[it->type].tupleDone(xid, it->impl); }
void Titerator_releaseLock(int xid, lladdIterator_t * it) { iterators[it->type].releaseLock(xid, it->impl); }

View file

@ -6,11 +6,12 @@
#include <stdlib.h>
#include <assert.h>
#include "logMemory.h"
#include <errno.h>
#include <lladd/compensations.h>
typedef struct {
pthread_mutex_t mutex;
pthread_mutex_t readerMutex;
pthread_cond_t readReady;
pthread_cond_t writeReady;
ringBufferLog_t * ringBuffer;
@ -35,6 +36,7 @@ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) {
iterator->impl = malloc(sizeof(logMemory_fifo_t));
((logMemory_fifo_t *)iterator->impl)->ringBuffer = openLogRingBuffer(size, initialOffset);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->mutex), NULL);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->readerMutex), NULL);
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->readReady), NULL);
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->writeReady), NULL);
((logMemory_fifo_t *)iterator->impl)->cached_value = NULL;
@ -54,10 +56,12 @@ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) {
/* iterator interface implementation */
/*------------- iterator interface implementation --------------------*/
/* NOTE: assumes currently that the consumer interface is done so we can
safely deallocate resources
/** This function should not be called until next() or one of its
variants indicates that the entire fifo has been consumed, since
this function assumes currently that the consumer interface is
done so that it can deallocate resources.
*/
void logMemory_Iterator_close(int xid, void * impl) {
closeLogRingBuffer( ((logMemory_fifo_t *) impl)->ringBuffer );
@ -66,6 +70,7 @@ void logMemory_Iterator_close(int xid, void * impl) {
compensated_function int logMemory_Iterator_next (int xid, void * impl) {
logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl;
pthread_mutex_lock(&(fifo->readerMutex));
pthread_mutex_lock(&(fifo->mutex));
size_t size;
int lsn;
@ -73,6 +78,7 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return 0;
}
@ -82,12 +88,14 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return 0;
}
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
@ -99,6 +107,7 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
@ -111,6 +120,171 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
fifo->cached_lsn = (lsn_t)lsn;
pthread_cond_broadcast(&(fifo->writeReady));
pthread_mutex_unlock(&(fifo->mutex));
return 1;
}
/** @todo logMemory_Iterator_tryNext is a cut and pasted version of
.._next. The functionality should be broken into modules and
reused... */
compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl;
if(EBUSY == pthread_mutex_trylock(&(fifo->readerMutex))) {
return 0;
}
pthread_mutex_lock(&(fifo->mutex));
size_t size;
int lsn;
int ret;
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return 0;
}
// TODO Check to see if we're done reading...
//From here on, we need to continue as normal since we consumed data from the ringbuffer...
if(-2 == (ret = ringBufferTruncateRead((byte *)&size, fifo->ringBuffer, sizeof(size_t)))) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return 0;
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
byte * tmp;
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
fifo->cached_value = tmp;
fifo->cached_value_size = size;
while(-2 == (lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size))) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
fifo->cached_lsn = (lsn_t)lsn;
pthread_cond_broadcast(&(fifo->writeReady));
pthread_mutex_unlock(&(fifo->mutex));
return 1;
}
compensated_function void logMemory_Iterator_releaseLock (int xid, void * impl) {
logMemory_fifo_t * fifo = (logMemory_fifo_t *) impl;
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
}
/** Blocks until it can advance the iterator a single step or until
the iterator is empty. If this function returns 0 the caller can
safely assume the iterator is currently empty (and any _push()
requests are blocking). Otherwise, this function works
analagously to the normal _next() call
@return 1 (and require tupleDone() to be called) if the iterator was advanced.
@return 0 (and require releaseIteratorLock() to be called) if the iterator currently contains
no more values, and is not waiting for another thread to call tupleDone())
@todo logMemory_Iterator_nextOrEmpty is a cut and pasted version of
.._next. The functionality should be broken into modules and
reused... */
compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl;
pthread_mutex_lock(&(fifo->readerMutex));
pthread_mutex_lock(&(fifo->mutex));
size_t size;
int lsn;
int ret;
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
/* pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex)); */
return 0;
}
// TODO Check to see if we're done reading...
//From here on, we need to continue as normal since we consumed data from the ringbuffer...
if(-2 == (ret = ringBufferTruncateRead((byte *)&size, fifo->ringBuffer, sizeof(size_t)))) {
/* pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex)); */
// At this point, just assume the ring buffer is empty, since
// anything in the process of doing an append is blocked. (under
// normal circumstances, there really won't be anything in the
// ringbuffer anyway..
return 0;
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
byte * tmp;
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
fifo->cached_value = tmp;
fifo->cached_value_size = size;
while(-2 == (lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size))) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
@ -138,16 +312,12 @@ int logMemory_Iterator_value (int xid, void * impl, byte ** value) {
return fifo->cached_value_size;
}
int logMemory_Iterator_releaseTuple(int xid, void *it) {
/* NO-OP */
return 0;
void logMemory_Iterator_releaseTuple(int xid, void *it) {
logMemory_fifo_t * fifo = (logMemory_fifo_t *) it;
pthread_mutex_unlock(&(fifo->readerMutex));
}
/* consumer implementation */
/* ------------------- consumer implementation ------------------------------*/
void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it){
/* This needs to tell the iterator where the end of the ring buffer is. */

View file

@ -7,38 +7,26 @@
#ifndef __LOGMEMORY_H
#define __LOGMEMORY_H
/*typedef struct ringBufferLog_s ringBufferLog_t;
/**
@file
ringBufferLog_t * openLogRingBuffer(size_t size, lsn_t initialOffset);
void closeLogRingBuffer(ringBufferLog_t * log);
int ringBufferAppend(ringBufferLog_t * log, byte * dat, size_t size);
int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log,size_t size); */
A poorly named in-memory fifo based on ringbuffer.h
// int writeLogEntry(LogEntry * e); (as implemented by ring buffer, but need to remember size of entry too
@todo Move this all to some reasonably named interface. :)
// void syncLog() (block writers until log is empty / call sync log on consumers?)
// lsn_t flushedLSN return lsn as of last syncLog?
// int truncateLog(lsn_t) (no-op?)
// lsn_t firstLogEntry() Unimplemented? Or return first thing in current ring buffer?
// void closeLogWriter() (syncLog + closeRingBuffer)
// deleteLogWriter no-op
$Id$
*/
lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset);
// LogEntry* readLSNEntry(lsn_t) if LSN is the next available log entry, return it, else error.
void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it);
compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize);
void logMemory_Iterator_close(int xid, void * impl);
compensated_function int logMemory_Iterator_next (int xid, void * impl);
int logMemory_Iterator_key (int xid, void * impl, byte ** key);
int logMemory_Iterator_value (int xid, void * impl, byte ** value);
int logMemory_Iterator_releaseTuple(int xid, void *it);
compensated_function int logMemory_Iterator_next(int xid, void * impl);
compensated_function int logMemory_Iterator_tryNext(int xid, void * impl);
compensated_function int logMemory_Iterator_key (int xid, void * impl, byte ** key);
compensated_function int logMemory_Iterator_value (int xid, void * impl, byte ** value);
compensated_function void logMemory_Iterator_releaseTuple(int xid, void *it);
compensated_function void logMemory_Iterator_releaseLock (int xid, void * impl);
#endif

View file

@ -75,9 +75,59 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) {
}
extern int numActiveXactions;
lsn_t LogTransCommit(TransactionLog * l) {
static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
static int pendingCommits = 0;
static int syncLogCount;
lsn_t ret = LogTransCommon(l, XCOMMIT);
struct timeval now;
struct timespec timeout;
// int retcode;
pthread_mutex_lock(&check_commit);
if(flushedLSN() >= ret) {
pthread_mutex_unlock(&check_commit);
return ret;
}
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec;
timeout.tv_nsec = now.tv_usec * 1000;
// 0123456789 <- number of zeros on the next three lines...
timeout.tv_nsec += 100000000; // wait ten msec.
if(timeout.tv_nsec > 1000000000) {
timeout.tv_nsec -= 1000000000;
timeout.tv_sec++;
}
pendingCommits++;
// if(pendingCommits <= (numActiveXactions / 2)) {
if((numActiveXactions > 1 && pendingCommits < numActiveXactions) ||
(numActiveXactions > 20 && pendingCommits < (int)((double)numActiveXactions * 0.95))) {
while(ETIMEDOUT != (pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) {
if(flushedLSN() >= ret) {
pendingCommits--;
pthread_mutex_unlock(&check_commit);
return ret;
}
}
// printf("Timed out");
} else {
// printf("Didn't wait %d < %d\n", (numActiveXactions / 2), pendingCommits);
}
if(flushedLSN() < ret) {
syncLog();
return LogTransCommon(l, XCOMMIT);
syncLogCount++;
// printf(" %d ", syncLogCount);
pthread_cond_broadcast(&tooFewXacts);
}
assert(flushedLSN() >= ret);
pendingCommits--;
pthread_mutex_unlock(&check_commit);
return ret;
}
lsn_t LogTransAbort(TransactionLog * l) {