diff --git a/benchmarks/berkeleyDB/bdbHashThreaded.c b/benchmarks/berkeleyDB/bdbHashThreaded.c index 640a087..109fa79 100644 --- a/benchmarks/berkeleyDB/bdbHashThreaded.c +++ b/benchmarks/berkeleyDB/bdbHashThreaded.c @@ -38,6 +38,8 @@ void env_dir_create(void); void env_open(DB_ENV **); void usage(void); +int alwaysCommit; + DB_ENV *dbenv; DB *db_cats; /*, *db_color, *db_fruit; */ @@ -61,8 +63,10 @@ main(int argc, char *argv[]) argc -= optind; argv += optind; */ - assert(argc == 3); + assert(argc == 3 || argc == 4); /* threads have static thread sizes. Ughh. */ + + alwaysCommit = (argc == 4); pthread_attr_t attr; pthread_attr_init(&attr); @@ -122,8 +126,15 @@ main(int argc, char *argv[]) NULL);*/ int r; int num_threads = atoi(argv[1]); - /*int */num_xact = 1; //atoi(argv[2]); //100; - /*int */insert_per_xact = atoi(argv[2]); //1000; + if(alwaysCommit) { + num_xact = atoi(argv[2]); //100; + insert_per_xact = 1; + } else { + num_xact = 1; + insert_per_xact = atoi(argv[2]); + } + /*int *///insert_per_xact = //atoi(argv[2]); //1000; + // insert_per_xact = 1; // int num_threads = 100; @@ -225,10 +236,10 @@ env_open(DB_ENV **dbenvp) dbenv->set_errfile(dbenv, stderr); /* Do deadlock detection internally. */ - if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) { + /* if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) { dbenv->err(dbenv, ret, "set_lk_detect: DB_LOCK_DEFAULT"); exit (1); - } + } */ /* * Open a transactional environment: @@ -238,7 +249,7 @@ env_open(DB_ENV **dbenvp) * read/write owner only */ if ((ret = dbenv->open(dbenv, ENV_DIRECTORY, - DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | + DB_CREATE /*| DB_INIT_LOCK */| DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD, S_IRUSR | S_IWUSR)) != 0) { dbenv->err(dbenv, ret, "dbenv->open: %s", ENV_DIRECTORY); @@ -514,12 +525,12 @@ retry: /* Begin the transaction. */ } /* Delete any previously existing item -- LLADD always does this during insert.*/ - switch (ret = db->del(db, tid, &key, 0)) { + /* switch (ret = db->del(db, tid, &key, 0)) { case 0: case DB_NOTFOUND: break; case DB_LOCK_DEADLOCK: - /* Deadlock: retry the operation. */ + // Deadlock: retry the operation. if ((ret = tid->abort(tid)) != 0) { dbenv->err(dbenv, ret, "DB_TXN->abort"); exit (1); @@ -528,7 +539,7 @@ retry: /* Begin the transaction. */ default: dbenv->err(dbenv, ret, "db->del: %s", name); exit (1); - } + }*/ /* Create a cursor. */ if ((ret = db->cursor(db, tid, &dbc, 0)) != 0) { @@ -546,7 +557,7 @@ retry: /* Begin the transaction. */ break; case DB_LOCK_DEADLOCK: va_end(ap); - + abort(); /* Deadlock: retry the operation. */ if ((ret = dbc->c_close(dbc)) != 0) { dbenv->err( @@ -600,30 +611,34 @@ run_xact(DB_ENV *dbenv, DB *db, int offset, int count) retry: /* Begin the transaction. */ if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0) { dbenv->err(dbenv, ret, "DB_ENV->txn_begin"); - exit (1); + // exit (1); + abort(); } /* Delete any previously existing item. */ - /* switch (ret = db->del(db, tid, &key, 0)) { + switch (ret = db->del(db, tid, &key, 0)) { case 0: case DB_NOTFOUND: - break; + break; case DB_LOCK_DEADLOCK: - / * Deadlock: retry the operation. * / - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); - } - goto retry; + abort(); + /* Deadlock: retry the operation. */ + if ((ret = tid->abort(tid)) != 0) { + dbenv->err(dbenv, ret, "DB_TXN->abort"); + exit (1); + } + goto retry; default: - dbenv->err(dbenv, ret, "db->del: %s", name); - exit (1); - } */ + // dbenv->err(dbenv, ret, "db->del: %s", name); + abort(); + exit (1); + } /* Create a cursor. */ if ((ret = db->cursor(db, tid, &dbc, 0)) != 0) { dbenv->err(dbenv, ret, "db->cursor"); - exit (1); + abort(); + // exit (1); } /* Append the items, in order. */ @@ -636,22 +651,24 @@ retry: /* Begin the transaction. */ switch (ret = db->del(db, tid, &key, 0)) { case 0: - abort(); // we dont insert dups in this test! + // abort(); // we dont insert dups in this test! case DB_NOTFOUND: break; case DB_LOCK_DEADLOCK: - /* Deadlock: retry the operation. */ + /// Deadlock: retry the operation. abort(); // the lock manager should be disabled for this test... if ((ret = tid->abort(tid)) != 0) { dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); + // exit (1); + abort(); } goto retry; default: dbenv->err(dbenv, ret, "db->del: %d", q); - exit (1); + // exit (1); + abort(); } - + /* data.data = s; data.size = strlen(s); */ // printf("A"); fflush(NULL); @@ -662,22 +679,25 @@ retry: /* Begin the transaction. */ break; case DB_LOCK_DEADLOCK: // va_end(ap); - + abort(); // no locking! /* Deadlock: retry the operation. */ if ((ret = dbc->c_close(dbc)) != 0) { dbenv->err( dbenv, ret, "dbc->c_close"); - exit (1); + // exit (1); + abort(); } if ((ret = tid->abort(tid)) != 0) { dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); + // exit (1); + abort(); } goto retry; default: /* Error: run recovery. */ dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q); - exit (1); + // exit (1); + abort(); } } // va_end(ap); @@ -685,12 +705,15 @@ retry: /* Begin the transaction. */ /* Success: commit the change. */ if ((ret = dbc->c_close(dbc)) != 0) { dbenv->err(dbenv, ret, "dbc->c_close"); - exit (1); + // exit (1); + abort(); } if ((ret = tid->commit(tid, 0)) != 0) { dbenv->err(dbenv, ret, "DB_TXN->commit"); - exit (1); + // exit (1); + abort(); } + // printf("commit"); } void diff --git a/benchmarks/berkeleyDB/bdbHashWriteRequests.c b/benchmarks/berkeleyDB/bdbHashWriteRequests.c index 81be5f9..3135bbc 100644 --- a/benchmarks/berkeleyDB/bdbHashWriteRequests.c +++ b/benchmarks/berkeleyDB/bdbHashWriteRequests.c @@ -265,10 +265,10 @@ env_open(DB_ENV **dbenvp) dbenv->set_errfile(dbenv, stderr); /* Do deadlock detection internally. */ - if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) { + /* if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) { dbenv->err(dbenv, ret, "set_lk_detect: DB_LOCK_DEFAULT"); exit (1); - } + }*/ /* * Open a transactional environment: diff --git a/benchmarks/linearHashNTAThreaded.c b/benchmarks/linearHashNTAThreaded.c index 84e515b..c7dfddf 100644 --- a/benchmarks/linearHashNTAThreaded.c +++ b/benchmarks/linearHashNTAThreaded.c @@ -20,6 +20,7 @@ int activeThreads = 0; int max_active = 0; +int alwaysCommit; /* double avg_var = 0; double max_var = 0; @@ -68,10 +69,17 @@ static void * go (void * arg_ptr) { */ // printf("(%d)", k); + + if(alwaysCommit) { + // printf("Commit"); + Tcommit(xid); + xid = Tbegin(); + + } } - Tcommit(xid); + /* for(j = k * count; j < (k+1) *(count) ; j++) { int tmp = -100; @@ -105,11 +113,13 @@ static void * go (void * arg_ptr) { int main(int argc, char** argv) { - assert(argc == 3); + assert(argc == 3 || argc == 4); int thread_count = atoi(argv[1]); count = atoi(argv[2]); + alwaysCommit = (argc==4); + unlink("storefile.txt"); unlink("logfile.txt"); unlink("blob0_file.txt"); @@ -120,6 +130,7 @@ int main(int argc, char** argv) { Tinit(); int xid = Tbegin(); hash = ThashCreate(xid, sizeof(int), sizeof(int)); + // hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); Tcommit(xid); diff --git a/doc/paper2/LLADD.tex b/doc/paper2/LLADD.tex index 90910de..d5fa566 100644 --- a/doc/paper2/LLADD.tex +++ b/doc/paper2/LLADD.tex @@ -94,6 +94,9 @@ available.% narrow interfaces, since transactional storage algorithms' interdependencies and requirements are notoriously complicated.} +%Not implementing ARIES any more! + + \item {\bf With these trends in mind, we have implemented a modular version of ARIES that makes as few assumptions as possible about application data structures or workload. Where such assumptions are diff --git a/lladd/constants.h b/lladd/constants.h index 07f53c4..9a0c940 100644 --- a/lladd/constants.h +++ b/lladd/constants.h @@ -82,9 +82,9 @@ terms specified in this license. /* #define MAX_BUFFER_SIZE 100003 */ /*#define MAX_BUFFER_SIZE 20029 */ -#define MAX_BUFFER_SIZE 10007 +//#define MAX_BUFFER_SIZE 10007 /*#define MAX_BUFFER_SIZE 5003*/ -/*#define MAX_BUFFER_SIZE 2003 */ +#define MAX_BUFFER_SIZE 2003 /* #define MAX_BUFFER_SIZE 71 */ /*#define MAX_BUFFER_SIZE 7 */ /*#define BUFFER_ASOOCIATIVE 2 */ @@ -130,6 +130,9 @@ terms specified in this license. #define OPERATION_LINEAR_HASH_INSERT 31 #define OPERATION_LINEAR_HASH_REMOVE 32 +// this operation is specific to OASYS +#define OPERATION_SET_DIFF 33 + /* number above should be less than number below */ #define MAX_OPERATIONS 40 diff --git a/lladd/iterator.h b/lladd/iterator.h index d714db3..b9b45ab 100644 --- a/lladd/iterator.h +++ b/lladd/iterator.h @@ -1,8 +1,11 @@ #include + #ifndef __ITERATOR_H #define __ITERATOR_H +BEGIN_C_DECLS + #define MAX_ITERATOR_TYPES 10 #define LINEAR_HASH_NTA_ITERATOR 0 #define ARRAY_ITERATOR 1 @@ -16,6 +19,7 @@ typedef struct { int (*key) (int xid, void * it, byte ** key); int (*value)(int xid, void * it, byte ** value); void (*tupleDone)(int xid, void * it); + void (*releaseLock)(int xid, void *it); } lladdIterator_def_t; typedef struct { @@ -59,7 +63,7 @@ int Titerator_tryNext(int xid, lladdIterator_t * it); provides , allows iterator to clean up if necessary > such as release lock */ -int (*releaseTuple)(int xid, void * it); +//int (*releaseTuple)(int xid, void * it); /** This function allows the caller to access the current iterator @@ -98,6 +102,9 @@ int Titerator_value(int xid, lladdIterator_t * it, byte ** value); all iterators are reentrant.) */ void Titerator_tupleDone(int xid, lladdIterator_t * it); +void Titerator_releaseLock(int xid, lladdIterator_t * it); + +END_C_DECLS #endif diff --git a/src/lladd/iterator.c b/src/lladd/iterator.c index f3c4d3f..990ecd7 100644 --- a/src/lladd/iterator.c +++ b/src/lladd/iterator.c @@ -39,7 +39,8 @@ void iterator_init() { logMemory_Iterator_tryNext, logMemory_Iterator_key, logMemory_Iterator_value, - logMemory_Iterator_releaseTuple + logMemory_Iterator_releaseTuple, + logMemory_Iterator_releaseLock, }; lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def); } @@ -52,3 +53,4 @@ int Titerator_tryNext(int xid, lladdIterator_t * it) { return itera int Titerator_key (int xid, lladdIterator_t * it, byte ** key){ return iterators[it->type].key (xid, it->impl, key); } int Titerator_value(int xid, lladdIterator_t * it, byte ** value){ return iterators[it->type].value(xid, it->impl, value); } void Titerator_tupleDone(int xid, lladdIterator_t * it) { iterators[it->type].tupleDone(xid, it->impl); } +void Titerator_releaseLock(int xid, lladdIterator_t * it) { iterators[it->type].releaseLock(xid, it->impl); } diff --git a/src/lladd/logger/logMemory.c b/src/lladd/logger/logMemory.c index abbd909..6a9b63a 100644 --- a/src/lladd/logger/logMemory.c +++ b/src/lladd/logger/logMemory.c @@ -6,11 +6,12 @@ #include #include #include "logMemory.h" - +#include #include typedef struct { pthread_mutex_t mutex; + pthread_mutex_t readerMutex; pthread_cond_t readReady; pthread_cond_t writeReady; ringBufferLog_t * ringBuffer; @@ -35,6 +36,7 @@ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) { iterator->impl = malloc(sizeof(logMemory_fifo_t)); ((logMemory_fifo_t *)iterator->impl)->ringBuffer = openLogRingBuffer(size, initialOffset); pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->mutex), NULL); + pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->readerMutex), NULL); pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->readReady), NULL); pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->writeReady), NULL); ((logMemory_fifo_t *)iterator->impl)->cached_value = NULL; @@ -54,10 +56,12 @@ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) { -/* iterator interface implementation */ +/*------------- iterator interface implementation --------------------*/ -/* NOTE: assumes currently that the consumer interface is done so we can - safely deallocate resources +/** This function should not be called until next() or one of its + variants indicates that the entire fifo has been consumed, since + this function assumes currently that the consumer interface is + done so that it can deallocate resources. */ void logMemory_Iterator_close(int xid, void * impl) { closeLogRingBuffer( ((logMemory_fifo_t *) impl)->ringBuffer ); @@ -66,6 +70,7 @@ void logMemory_Iterator_close(int xid, void * impl) { compensated_function int logMemory_Iterator_next (int xid, void * impl) { logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl; + pthread_mutex_lock(&(fifo->readerMutex)); pthread_mutex_lock(&(fifo->mutex)); size_t size; int lsn; @@ -73,6 +78,7 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) { if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) { pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); return 0; } @@ -82,12 +88,14 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) { pthread_cond_wait(&(fifo->readReady), &(fifo->mutex)); if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) { pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); return 0; } } if (ret == -1) { compensation_set_error(LLADD_INTERNAL_ERROR); pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); return LLADD_INTERNAL_ERROR; } @@ -99,6 +107,7 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) { if(tmp == NULL) { compensation_set_error(LLADD_INTERNAL_ERROR); pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); return LLADD_INTERNAL_ERROR; } @@ -111,6 +120,171 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) { if (ret == -1) { compensation_set_error(LLADD_INTERNAL_ERROR); pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + return LLADD_INTERNAL_ERROR; + + } + + assert(!ret); + + fifo->cached_lsn = (lsn_t)lsn; + + pthread_cond_broadcast(&(fifo->writeReady)); + pthread_mutex_unlock(&(fifo->mutex)); + return 1; + +} + +/** @todo logMemory_Iterator_tryNext is a cut and pasted version of + .._next. The functionality should be broken into modules and + reused... */ + +compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) { + logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl; + if(EBUSY == pthread_mutex_trylock(&(fifo->readerMutex))) { + return 0; + } + pthread_mutex_lock(&(fifo->mutex)); + size_t size; + int lsn; + int ret; + + if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) { + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + return 0; + } + + // TODO Check to see if we're done reading... + + //From here on, we need to continue as normal since we consumed data from the ringbuffer... + if(-2 == (ret = ringBufferTruncateRead((byte *)&size, fifo->ringBuffer, sizeof(size_t)))) { + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + return 0; + } + + if (ret == -1) { + compensation_set_error(LLADD_INTERNAL_ERROR); + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + return LLADD_INTERNAL_ERROR; + + } + assert(!ret); + + byte * tmp; + + tmp = realloc(fifo->cached_value, size); + if(tmp == NULL) { + compensation_set_error(LLADD_INTERNAL_ERROR); + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + return LLADD_INTERNAL_ERROR; + } + + fifo->cached_value = tmp; + fifo->cached_value_size = size; + + while(-2 == (lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size))) { + pthread_cond_wait(&(fifo->readReady), &(fifo->mutex)); + } + if (ret == -1) { + compensation_set_error(LLADD_INTERNAL_ERROR); + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + return LLADD_INTERNAL_ERROR; + + } + + assert(!ret); + + fifo->cached_lsn = (lsn_t)lsn; + + pthread_cond_broadcast(&(fifo->writeReady)); + pthread_mutex_unlock(&(fifo->mutex)); + return 1; + +} + +compensated_function void logMemory_Iterator_releaseLock (int xid, void * impl) { + logMemory_fifo_t * fifo = (logMemory_fifo_t *) impl; + + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + +} + +/** Blocks until it can advance the iterator a single step or until + the iterator is empty. If this function returns 0 the caller can + safely assume the iterator is currently empty (and any _push() + requests are blocking). Otherwise, this function works + analagously to the normal _next() call + + @return 1 (and require tupleDone() to be called) if the iterator was advanced. + @return 0 (and require releaseIteratorLock() to be called) if the iterator currently contains + no more values, and is not waiting for another thread to call tupleDone()) + + @todo logMemory_Iterator_nextOrEmpty is a cut and pasted version of + .._next. The functionality should be broken into modules and + reused... */ + +compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) { + logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl; + pthread_mutex_lock(&(fifo->readerMutex)); + pthread_mutex_lock(&(fifo->mutex)); + size_t size; + int lsn; + int ret; + + if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) { + /* pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); */ + return 0; + } + + // TODO Check to see if we're done reading... + + //From here on, we need to continue as normal since we consumed data from the ringbuffer... + if(-2 == (ret = ringBufferTruncateRead((byte *)&size, fifo->ringBuffer, sizeof(size_t)))) { + /* pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); */ + // At this point, just assume the ring buffer is empty, since + // anything in the process of doing an append is blocked. (under + // normal circumstances, there really won't be anything in the + // ringbuffer anyway.. + return 0; + } + + if (ret == -1) { + compensation_set_error(LLADD_INTERNAL_ERROR); + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + return LLADD_INTERNAL_ERROR; + + } + assert(!ret); + + byte * tmp; + + tmp = realloc(fifo->cached_value, size); + if(tmp == NULL) { + compensation_set_error(LLADD_INTERNAL_ERROR); + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); + return LLADD_INTERNAL_ERROR; + } + + fifo->cached_value = tmp; + fifo->cached_value_size = size; + + while(-2 == (lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size))) { + pthread_cond_wait(&(fifo->readReady), &(fifo->mutex)); + } + if (ret == -1) { + compensation_set_error(LLADD_INTERNAL_ERROR); + pthread_mutex_unlock(&(fifo->mutex)); + pthread_mutex_unlock(&(fifo->readerMutex)); return LLADD_INTERNAL_ERROR; } @@ -138,16 +312,12 @@ int logMemory_Iterator_value (int xid, void * impl, byte ** value) { return fifo->cached_value_size; } -int logMemory_Iterator_releaseTuple(int xid, void *it) { - /* NO-OP */ - return 0; +void logMemory_Iterator_releaseTuple(int xid, void *it) { + logMemory_fifo_t * fifo = (logMemory_fifo_t *) it; + pthread_mutex_unlock(&(fifo->readerMutex)); } - - - -/* consumer implementation */ - +/* ------------------- consumer implementation ------------------------------*/ void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it){ /* This needs to tell the iterator where the end of the ring buffer is. */ diff --git a/src/lladd/logger/logMemory.h b/src/lladd/logger/logMemory.h index 0e0b0f0..a51c75f 100644 --- a/src/lladd/logger/logMemory.h +++ b/src/lladd/logger/logMemory.h @@ -7,38 +7,26 @@ #ifndef __LOGMEMORY_H #define __LOGMEMORY_H -/*typedef struct ringBufferLog_s ringBufferLog_t; +/** + @file -ringBufferLog_t * openLogRingBuffer(size_t size, lsn_t initialOffset); -void closeLogRingBuffer(ringBufferLog_t * log); -int ringBufferAppend(ringBufferLog_t * log, byte * dat, size_t size); -int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log,size_t size); */ + A poorly named in-memory fifo based on ringbuffer.h -// int writeLogEntry(LogEntry * e); (as implemented by ring buffer, but need to remember size of entry too + @todo Move this all to some reasonably named interface. :) -// void syncLog() (block writers until log is empty / call sync log on consumers?) - -// lsn_t flushedLSN return lsn as of last syncLog? - -// int truncateLog(lsn_t) (no-op?) - -// lsn_t firstLogEntry() Unimplemented? Or return first thing in current ring buffer? - -// void closeLogWriter() (syncLog + closeRingBuffer) - -// deleteLogWriter no-op + $Id$ +*/ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset); -// LogEntry* readLSNEntry(lsn_t) if LSN is the next available log entry, return it, else error. - void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it); compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize); void logMemory_Iterator_close(int xid, void * impl); -compensated_function int logMemory_Iterator_next (int xid, void * impl); -int logMemory_Iterator_key (int xid, void * impl, byte ** key); -int logMemory_Iterator_value (int xid, void * impl, byte ** value); -int logMemory_Iterator_releaseTuple(int xid, void *it); - +compensated_function int logMemory_Iterator_next(int xid, void * impl); +compensated_function int logMemory_Iterator_tryNext(int xid, void * impl); +compensated_function int logMemory_Iterator_key (int xid, void * impl, byte ** key); +compensated_function int logMemory_Iterator_value (int xid, void * impl, byte ** value); +compensated_function void logMemory_Iterator_releaseTuple(int xid, void *it); +compensated_function void logMemory_Iterator_releaseLock (int xid, void * impl); #endif diff --git a/src/lladd/logger/logger2.c b/src/lladd/logger/logger2.c index 084d709..f725753 100644 --- a/src/lladd/logger/logger2.c +++ b/src/lladd/logger/logger2.c @@ -75,9 +75,59 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) { } +extern int numActiveXactions; lsn_t LogTransCommit(TransactionLog * l) { - syncLog(); - return LogTransCommon(l, XCOMMIT); + static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER; + static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER; + static int pendingCommits = 0; + static int syncLogCount; + + lsn_t ret = LogTransCommon(l, XCOMMIT); + + struct timeval now; + struct timespec timeout; + // int retcode; + + pthread_mutex_lock(&check_commit); + if(flushedLSN() >= ret) { + pthread_mutex_unlock(&check_commit); + return ret; + } + gettimeofday(&now, NULL); + timeout.tv_sec = now.tv_sec; + timeout.tv_nsec = now.tv_usec * 1000; + // 0123456789 <- number of zeros on the next three lines... + timeout.tv_nsec += 100000000; // wait ten msec. + if(timeout.tv_nsec > 1000000000) { + timeout.tv_nsec -= 1000000000; + timeout.tv_sec++; + } + + pendingCommits++; + // if(pendingCommits <= (numActiveXactions / 2)) { + if((numActiveXactions > 1 && pendingCommits < numActiveXactions) || + (numActiveXactions > 20 && pendingCommits < (int)((double)numActiveXactions * 0.95))) { + while(ETIMEDOUT != (pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) { + if(flushedLSN() >= ret) { + pendingCommits--; + pthread_mutex_unlock(&check_commit); + return ret; + } + } + // printf("Timed out"); + } else { + // printf("Didn't wait %d < %d\n", (numActiveXactions / 2), pendingCommits); + } + if(flushedLSN() < ret) { + syncLog(); + syncLogCount++; + // printf(" %d ", syncLogCount); + pthread_cond_broadcast(&tooFewXacts); + } + assert(flushedLSN() >= ret); + pendingCommits--; + pthread_mutex_unlock(&check_commit); + return ret; } lsn_t LogTransAbort(TransactionLog * l) { diff --git a/test/lladd/Makefile.am b/test/lladd/Makefile.am index 0a77bda..ea4a59e 100644 --- a/test/lladd/Makefile.am +++ b/test/lladd/Makefile.am @@ -9,4 +9,4 @@ endif noinst_PROGRAMS = $(TESTS) LDADD = $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a #-lefence CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log check_indirect.log check_bufferMananger.log check_lladdhash.log check_pageOperations.log check_linearhash.log check_linkedListNTA.log check_linearHashNTA.log check_pageOrientedListNTA.log check_lockManager.log check_compensations.log check_errorhandling.log check_header.logg check_iterator.log check_linearHash.log check_ringbuffer.log -AM_CFLAGS= -g -Wall -pedantic -std=gnu99 \ No newline at end of file +AM_CFLAGS= -g -Wall -pedantic -std=gnu99 \ No newline at end of file