diff --git a/lladd/fifo.h b/lladd/fifo.h new file mode 100644 index 0000000..dc773f9 --- /dev/null +++ b/lladd/fifo.h @@ -0,0 +1,32 @@ + +#include +#include +#ifndef __FIFO_H +#define __FIFO_H + +typedef struct { + lladdIterator_t *iterator; + lladdConsumer_t *consumer; +} lladdFifo_t; + +typedef struct lladdFifoPool_t { + lladdFifo_t ** pool; + lladdFifo_t * (*getFifo)(struct lladdFifoPool_t * pool, + byte * multiplexKey, + size_t multiplexKeySize); + int fifoCount; + lladdFifo_t * dirtyPoolFifo; +} lladdFifoPool_t; + + +typedef lladdFifo_t * (lladdFifoPool_getFifo_t)(lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize); + +lladdFifoPool_getFifo_t lladdFifoPool_getFifoCRC32; + +lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSize, + lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo); + +void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) ; + + +#endif // __FIFO_H diff --git a/lladd/iterator.h b/lladd/iterator.h index c982979..d714db3 100644 --- a/lladd/iterator.h +++ b/lladd/iterator.h @@ -12,6 +12,7 @@ typedef struct { // void * new(void * arg); void (*close)(int xid, void * it); int (*next) (int xid, void * it); + int (*tryNext) (int xid, void * it); int (*key) (int xid, void * it, byte ** key); int (*value)(int xid, void * it, byte ** value); void (*tupleDone)(int xid, void * it); @@ -40,6 +41,18 @@ void Titerator_close(int xid, lladdIterator_t * it); */ int Titerator_next(int xid, lladdIterator_t * it); +/** + @param it the iterator + + @return 1 if the iterator position advanced, and releaseTuple must be called, + 0 if the iterator has been locked by another reader, no tuples are ready, or the iterator has been closed. + + @todo think more carefully about the return value of Titerator_tryNext(). I'm not convinced that a 0/1 + return value is adequate. +*/ + +int Titerator_tryNext(int xid, lladdIterator_t * it); + /** NOTE: next acquires a mutex, releaseTuple releases mutex, next key value --atomic diff --git a/lladd/multiplexer.h b/lladd/multiplexer.h index 40ea588..1aa6080 100644 --- a/lladd/multiplexer.h +++ b/lladd/multiplexer.h @@ -4,9 +4,6 @@ #include - - - void multiplexHashLogByKey(byte * key, size_t keySize, byte * value, @@ -26,10 +23,10 @@ typedef struct { /** A hash of consumer implementations, keyed on the output of the multiplexKey parameter of *multiplex */ pblHashTable_t * consumerHash; /** The next two fields are used to create new consumers on demand. */ - lladdConsumer_t * (*getConsumer)(struct lladdFifoPool_t * newConsumerArg, + /* lladdConsumer_t * (*getConsumer)(struct lladdFifoPool_t * newConsumerArg, byte* multiplexKey, - size_t multiplexKeySize); - lladdFifoPool_t * getConsumerArg; + size_t multiplexKeySize); */ + lladdFifoPool_t * fifoPool; pthread_t worker; int xid; } lladdMultiplexer_t; @@ -42,9 +39,9 @@ lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it, size_t valueSize, byte ** multiplexKey, size_t * multiplexKeySize), - lladdConsumer_t * getConsumer(lladdFifoPool_t * getConsumerArg, + /* lladdConsumer_t * getConsumer(lladdFifoPool_t * fifoPool, byte* multiplexKey, - size_t multiplexKeySize), + size_t multiplexKeySize), */ lladdFifoPool_t * fifoPool); /** @@ -70,5 +67,3 @@ int lladdMultiplexer_start(lladdMultiplexer_t * multiplexer, pthread_attr_t * th */ int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer); -lladdConsumer_t * fifoPool_getConsumerCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize); -lladdFifoPool_t * fifoPool_ringBufferInit (int consumerCount, int bufferSize); diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index b14ecb5..6f8d373 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -5,9 +5,9 @@ lib_LIBRARIES=liblladd.a # removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c liblladd_a_SOURCES=crc32.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \ pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c \ - lockManager.c iterator.c arrayCollection.c ringbuffer.c \ + lockManager.c iterator.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c\ logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c \ - logger/logMemory.c multiplexer.c\ + logger/logMemory.c \ page/slotted.c page/header.c page/fixed.c compensations.c \ operations/pageOperations.c page/indirect.c operations/decrement.c \ operations/increment.c operations/prepare.c operations/set.c \ diff --git a/src/lladd/fifo.c b/src/lladd/fifo.c new file mode 100644 index 0000000..b5bac96 --- /dev/null +++ b/src/lladd/fifo.c @@ -0,0 +1,39 @@ +#include +#include +#include +#include "logger/logMemory.h" + +/** + Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer. +*/ +lladdFifo_t * lladdFifoPool_getFifoCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) { + int memberId = crc32(multiplexKey, multiplexKeySize, (unsigned long)-1L) % pool->fifoCount; + return pool->pool[memberId]; +} +void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) { + if(pool->dirtyPoolFifo) { + Tconsumer_push(xid, pool->dirtyPoolFifo->consumer, NULL, 0, (byte*)&fifo, sizeof(lladdFifo_t*)); + } +} + +/** + Create a new pool of ringBuffer based fifos + + @param consumerCount the number of consumers in the pool. + @todo this function should be generalized to other consumer implementations. +*/ +lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSize, + lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) { + lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t)); + + pool->getFifo = getFifo ? getFifo : lladdFifoPool_getFifoCRC32; + pool->dirtyPoolFifo = dirtyPoolFifo; + + pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount); + int i; + for(i = 0; i < consumerCount; i++) { + pool->pool[i] = logMemoryFifo(bufferSize, 0); + } + pool->fifoCount = consumerCount; + return pool; +} diff --git a/src/lladd/iterator.c b/src/lladd/iterator.c index 186ab96..f3c4d3f 100644 --- a/src/lladd/iterator.c +++ b/src/lladd/iterator.c @@ -16,23 +16,39 @@ static void noopTupDone(int xid, void * foo) { } void iterator_init() { lladdIterator_def_t linearHashNTA_def = { - linearHashNTAIterator_close, linearHashNTAIterator_next, linearHashNTAIterator_key, linearHashNTAIterator_value, noopTupDone + linearHashNTAIterator_close, + linearHashNTAIterator_next, + linearHashNTAIterator_next, + linearHashNTAIterator_key, + linearHashNTAIterator_value, + noopTupDone }; lladdIterator_register(LINEAR_HASH_NTA_ITERATOR, linearHashNTA_def); lladdIterator_def_t array_def = { - arrayIterator_close, arrayIterator_next, arrayIterator_key, arrayIterator_value, noopTupDone + arrayIterator_close, + arrayIterator_next, + arrayIterator_next, + arrayIterator_key, + arrayIterator_value, + noopTupDone }; lladdIterator_register(ARRAY_ITERATOR, array_def); lladdIterator_def_t logMemory_def = { - logMemory_Iterator_close, logMemory_Iterator_next, logMemory_Iterator_key, logMemory_Iterator_value, noopTupDone + logMemory_Iterator_close, + logMemory_Iterator_next, + logMemory_Iterator_tryNext, + logMemory_Iterator_key, + logMemory_Iterator_value, + logMemory_Iterator_releaseTuple }; lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def); } //lladdIterator_t Titerator(int type, void * arg); -void Titerator_close(int xid, lladdIterator_t * it) { iterators[it->type].close(xid, it->impl); free(it); } -int Titerator_next (int xid, lladdIterator_t * it) { return iterators[it->type].next (xid, it->impl); } -int Titerator_key (int xid, lladdIterator_t * it, byte ** key) { return iterators[it->type].key (xid, it->impl, key); } +void Titerator_close (int xid, lladdIterator_t * it) { iterators[it->type].close(xid, it->impl); free(it); } +int Titerator_next (int xid, lladdIterator_t * it) { return iterators[it->type].next (xid, it->impl); } +int Titerator_tryNext(int xid, lladdIterator_t * it) { return iterators[it->type].tryNext (xid, it->impl); } +int Titerator_key (int xid, lladdIterator_t * it, byte ** key){ return iterators[it->type].key (xid, it->impl, key); } int Titerator_value(int xid, lladdIterator_t * it, byte ** value){ return iterators[it->type].value(xid, it->impl, value); } void Titerator_tupleDone(int xid, lladdIterator_t * it) { iterators[it->type].tupleDone(xid, it->impl); } diff --git a/src/lladd/multiplexer.c b/src/lladd/multiplexer.c index 825507f..8213465 100644 --- a/src/lladd/multiplexer.c +++ b/src/lladd/multiplexer.c @@ -12,16 +12,16 @@ lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it, size_t valueSize, byte ** multiplexKey, size_t * multiplexKeySize), - lladdConsumer_t * getConsumer(struct lladdFifoPool_t* getConsumerArg, + /* lladdConsumer_t * getConsumer(struct lladdFifoPool_t* fifoPool, byte* multiplexKey, - size_t multiplexKeySize), + size_t multiplexKeySize), */ lladdFifoPool_t * fifoPool) { lladdMultiplexer_t * ret = malloc(sizeof(lladdMultiplexer_t)); ret->it = it; ret->multiplexer = multiplexer; ret->consumerHash = pblHtCreate(); - ret->getConsumer = getConsumer; - ret->getConsumerArg = fifoPool; + // ret->getConsumer = getConsumer; + ret->fifoPool = fifoPool; ret->xid = xid; return ret; } @@ -51,29 +51,28 @@ void * multiplexer_worker(void * arg) { m->multiplexer(key, keySize, value, valueSize, &mkey, &mkeySize); - - consumer = m->getConsumer(m->getConsumerArg, mkey, mkeySize); - - /* lladdConsumer_t * consumer = pblHtLookup(m->consumerHash); - if(consumer == NULL) { - consumer = m->newConsumer(m->newConsumerArg, mkey, mkeySize); - pblHtInsert(m->consumerHash, mkey, mkeySize, consumer); - } */ - + lladdFifo_t * fifo = m->fifoPool->getFifo(m->fifoPool, mkey, mkeySize); + consumer = fifo->consumer; Tconsumer_push(m->xid, consumer, key, keySize, value, valueSize); - + lladdFifoPool_markDirty(m->xid, m->fifoPool, fifo); } // iterate over pblhash, closing consumers. Titerator_close(m->xid, m->it); - lladdFifoPool_t * pool = m->getConsumerArg; + /** @todo Does this belong in its own function in fifo.c? */ + + lladdFifoPool_t * pool = m->fifoPool; int i; for(i = 0; i < pool->fifoCount; i++) { Tconsumer_close(m->xid, pool->pool[i]->consumer); } + if(m->fifoPool->dirtyPoolFifo) { + Tconsumer_close(m->xid, m->fifoPool->dirtyPoolFifo->consumer); + } + return (void*)compensation_error(); } @@ -120,28 +119,3 @@ void multiplexHashLogByKey(byte * key, } } -/** - Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer. */ -lladdConsumer_t * fifoPool_getConsumerCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) { - int memberId = crc32(multiplexKey, multiplexKeySize, (unsigned long)-1L) % pool->fifoCount; - return pool->pool[memberId]->consumer; -} - - -/** - Create a new pool of ringBuffer based fifos - - @param consumerCount the number of consumers in the pool. - @todo this function should be generalized to other consumer implementations. -*/ -lladdFifoPool_t * fifoPool_ringBufferInit (int consumerCount, int bufferSize) { - lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t)); - - pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount); - int i; - for(i = 0; i < consumerCount; i++) { - pool->pool[i] = logMemoryFifo(bufferSize, 0); - } - pool->fifoCount = consumerCount; - return pool; -}