diff --git a/lladd/consumer.h b/lladd/consumer.h index f9b3efd..ac132d5 100644 --- a/lladd/consumer.h +++ b/lladd/consumer.h @@ -3,20 +3,23 @@ #ifndef __CONSUMER_H #define __CONSUMER_H -#define MAX_ITERATOR_TYPES 10 +#define MAX_CONSUMER_TYPES 10 #define FIFO_CONSUMER 0 #define ARRAY_CONSUMER 1 #define LOG_MEMORY_CONSUMER 2 - -typedef struct { - int foo; -} lladdConsumer_def_t; +#define POINTER_CONSUMER 3 typedef struct { int type; void * impl; } lladdConsumer_t; + +typedef struct { + int (*push)(int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize); + void(*close)(int xid, void *it); +} lladdConsumer_def_t; + /* call once per Tinit() call */ void consumer_init(); @@ -38,3 +41,4 @@ int Tconsumer_push(int xid, lladdConsumer_t * it, byte * key, size_t keySize, by //int Tconsumer_tryPush(int xid, ....); #endif // __CONSUMER_H + diff --git a/lladd/fifo.h b/lladd/fifo.h index dc773f9..2d77521 100644 --- a/lladd/fifo.h +++ b/lladd/fifo.h @@ -29,4 +29,17 @@ lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSiz void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) ; +int lladdFifoPool_consumer_push(int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize); +void lladdFifoPool_consumer_close(int xid, void * it); +lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerCount, + lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo); + +void lladdFifoPool_iterator_close(int xid, void * it); +int lladdFifoPool_iterator_next(int xid, void * it); +int lladdFifoPool_iterator_tryNext(int xid, void * it); +int lladdFifoPool_iterator_key (int xid, void * it, byte ** key); +int lladdFifoPool_iterator_value (int xid, void * it, byte ** val); +void lladdFifoPool_iterator_tupleDone(int xid, void * it); +void lladdFifoPool_iterator_releaseLock(int xid, void * it); #endif // __FIFO_H + diff --git a/lladd/iterator.h b/lladd/iterator.h index b9b45ab..41e4279 100644 --- a/lladd/iterator.h +++ b/lladd/iterator.h @@ -10,6 +10,7 @@ BEGIN_C_DECLS #define LINEAR_HASH_NTA_ITERATOR 0 #define ARRAY_ITERATOR 1 #define LOG_MEMORY_ITERATOR 2 +#define POINTER_ITERATOR 3 typedef struct { // void * new(void * arg); diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index cbd6862..84a95ca 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -5,7 +5,7 @@ lib_LIBRARIES=liblladd.a # removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c liblladd_a_SOURCES=crc32.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \ pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c \ - lockManager.c iterator.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\ + lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\ logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c \ logger/logMemory.c \ page/slotted.c page/header.c page/fixed.c compensations.c \ diff --git a/src/lladd/fifo.c b/src/lladd/fifo.c index b5bac96..c208d9a 100644 --- a/src/lladd/fifo.c +++ b/src/lladd/fifo.c @@ -1,8 +1,12 @@ #include #include #include +#include #include "logger/logMemory.h" +#include +#include + /** Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer. */ @@ -37,3 +41,199 @@ lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSiz pool->fifoCount = consumerCount; return pool; } + +typedef struct { + int maxPtrs; + int outPtrs; + pthread_mutex_t mutex; + pthread_cond_t writeOK; +} lladdFifoPointerPool_t; + +typedef struct pointerFifoEntry { + struct pointerFifoEntry * prev; + struct pointerFifoEntry * next; + int keySize; + int valSize; +} pointerFifoEntry; + +typedef struct { + pointerFifoEntry * first; + pointerFifoEntry * last; + pointerFifoEntry * current; + pthread_mutex_t mutex; + int eof; + pthread_cond_t readOK; + //pthread_cond_t writeOK; + lladdFifoPointerPool_t * pool; +} pointerFifoImpl; + +static void doNext(int xid, pointerFifoImpl* impl) { + + impl->current = impl->last; + if(impl->last->prev) { + impl->last->prev->next = NULL; + impl->last=impl->last->prev; + } else { + impl->first = NULL; + impl->last = NULL; + } + +} + +int lladdFifoPool_iterator_next(int xid, void * it) { + pointerFifoImpl * impl = (pointerFifoImpl *) it; + + pthread_mutex_lock(&(impl->mutex)); + + while(impl->last == NULL && (!impl->eof)) { + pthread_cond_wait(&(impl->readOK), &(impl->mutex)); + } + + if(impl->eof) { + pthread_mutex_unlock(&(impl->mutex)); + return 0; + } + doNext(xid, impl); + + // pthread_mutex_unlock(&(impl->mutex)); + + return 1; + +} +int lladdFifoPool_iterator_tryNext(int xid, void * it) { + pointerFifoImpl * impl = (pointerFifoImpl *) it; + + pthread_mutex_lock(&(impl->mutex)); + + if(impl->last == NULL) { + pthread_mutex_unlock(&(impl->mutex)); + return 0; + } else { + doNext(xid, impl); + } + + // pthread_mutex_unlock(&(impl->mutex)); + return 1; +} + +void lladdFifoPool_iterator_releaseLock(int xid, void * it) { + abort(); // didn't implement nextOrEmpty yet... +} + +void lladdFifoPool_iterator_tupleDone(int xid, void * it) { + pointerFifoImpl * impl = (pointerFifoImpl *) it; + + free(impl->current); + + pthread_mutex_unlock(&(impl->mutex)); + + pthread_mutex_lock(&(impl->pool->mutex)); + impl->pool->outPtrs--; + pthread_mutex_unlock(&(impl->pool->mutex)); + pthread_cond_broadcast(&(impl->pool->writeOK)); +} + +int lladdFifoPool_iterator_key (int xid, void * it, byte ** key) { + pointerFifoImpl * impl = (pointerFifoImpl *) it; + *key = (byte*)(impl->current+1); + return impl->current->keySize; +} +int lladdFifoPool_iterator_value (int xid, void * it, byte ** val) { + pointerFifoImpl * impl = (pointerFifoImpl *) it; + *val = ((byte*)(impl->current+1))+impl->current->keySize; + return impl->current->valSize; +} + +void lladdFifoPool_iterator_close(int xid, void * it) { + pointerFifoImpl * impl = (pointerFifoImpl *) it; + pthread_mutex_lock(&(impl->mutex)); + assert(impl->eof); + assert((!impl->first) && (!impl->last)); + printf("Leaking iterator in lladdFifoPool_iterator_close\n"); + pthread_mutex_unlock(&(impl->mutex)); +} + +void lladdFifoPool_consumer_close(int xid, void * it) { + pointerFifoImpl * impl = (pointerFifoImpl *) it; + pthread_mutex_lock(&(impl->mutex)); + impl->eof = 1; + pthread_cond_broadcast(&(impl->readOK)); + pthread_mutex_unlock(&(impl->mutex)); +} + +int lladdFifoPool_consumer_push(int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize) { + pointerFifoImpl * impl = (pointerFifoImpl *) it; + + pthread_mutex_lock(&(impl->pool->mutex)); + while(impl->pool->outPtrs == impl->pool->maxPtrs) { + pthread_cond_wait(&(impl->pool->writeOK), &(impl->pool->mutex)); + } + impl->pool->outPtrs++; + pthread_mutex_unlock(&(impl->pool->mutex)); + + pointerFifoEntry * entry = malloc(sizeof(pointerFifoEntry) + keySize + valSize); + + // do 'expensive stuff' w/o a mutex + memcpy(entry+1, key, keySize); + memcpy(((byte*)(entry+1))+keySize, val, valSize); + entry->keySize = keySize; + entry->valSize = valSize; + + pthread_mutex_lock(&(impl->mutex)); + + entry->next = impl->first; + if(impl->last == NULL) { + impl->last = entry; + assert(!impl->first); + } else { + assert(impl->first); + entry->next->prev = entry; + } + entry->prev = NULL; + impl->first = entry; + + pthread_cond_broadcast(&(impl->readOK)); + pthread_mutex_unlock(&(impl->mutex)); + + + return 0; +} + + +lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerCount, + lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) { + + lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t)); + + pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount); + int i; + + lladdFifoPointerPool_t * poolImpl = malloc(sizeof(lladdFifoPointerPool_t)); + poolImpl->maxPtrs = pointerCount; + poolImpl->outPtrs = 0; + pthread_mutex_init(&(poolImpl->mutex), NULL); + pthread_cond_init(&(poolImpl->writeOK), NULL); + + for(i = 0; i < consumerCount; i++) { + pool->pool[i] = malloc(sizeof(lladdFifo_t)); + pool->pool[i]->iterator = malloc(sizeof(lladdIterator_t)); + pool->pool[i]->consumer = malloc(sizeof(lladdConsumer_t)); + pool->pool[i]->iterator->type = POINTER_ITERATOR; + pool->pool[i]->consumer->type = POINTER_CONSUMER; + pointerFifoImpl * impl = + (pointerFifoImpl*) (pool->pool[i]->consumer->impl = pool->pool[i]->iterator->impl = malloc(sizeof(pointerFifoImpl))); + impl->first = NULL; + impl->last = NULL; + impl->eof = 0; + pthread_mutex_init(&(impl->mutex), NULL); + pthread_cond_init(&(impl->readOK), NULL); + // pthread_cond_init(&(impl->writeOK), NULL); + + impl->pool = poolImpl; + } + pool->getFifo = getFifo ? getFifo : lladdFifoPool_getFifoCRC32; + pool->fifoCount = consumerCount; + pool->dirtyPoolFifo = dirtyPoolFifo; + + return pool; +} diff --git a/src/lladd/iterator.c b/src/lladd/iterator.c index 990ecd7..cd9c47b 100644 --- a/src/lladd/iterator.c +++ b/src/lladd/iterator.c @@ -43,6 +43,16 @@ void iterator_init() { logMemory_Iterator_releaseLock, }; lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def); + lladdIterator_def_t pointer_def = { + lladdFifoPool_iterator_close, + lladdFifoPool_iterator_next, + lladdFifoPool_iterator_tryNext, + lladdFifoPool_iterator_key, + lladdFifoPool_iterator_value, + lladdFifoPool_iterator_tupleDone, + lladdFifoPool_iterator_releaseLock + }; + lladdIterator_register(POINTER_ITERATOR, pointer_def); } diff --git a/src/lladd/logger/logMemory.c b/src/lladd/logger/logMemory.c index 6a9b63a..0d18e07 100644 --- a/src/lladd/logger/logMemory.c +++ b/src/lladd/logger/logMemory.c @@ -319,9 +319,9 @@ void logMemory_Iterator_releaseTuple(int xid, void *it) { /* ------------------- consumer implementation ------------------------------*/ -void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it){ +void logMemory_consumer_close(int xid, void *it){ /* This needs to tell the iterator where the end of the ring buffer is. */ - logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it->impl); + logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it); pthread_mutex_lock(&(fifo->mutex)); fifo->eof = ringBufferAppendPosition(fifo->ringBuffer); assert(fifo->eof != -1); @@ -329,17 +329,17 @@ void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it){ pthread_mutex_unlock(&(fifo->mutex)); } -compensated_function void Tconsumer_close(int xid, lladdConsumer_t * cons) { - logMemory_Tconsumer_close(xid, cons); -} +/*compensated_function void Tconsumer_close(int xid, lladdConsumer_t * cons) { + logMemory_consumer_close(xid, cons); + }*/ -compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize) { +int logMemory_consumer_push(int xid, /*lladdConsumer_t * cons*/ void * it, byte * key, size_t keySize, byte * val, size_t valSize) { int ret; - logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it->impl); + logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it); pthread_mutex_lock(&(fifo->mutex)); while(-2 == (ret = ringBufferAppend(fifo->ringBuffer, - (byte *)&Valsize, + (byte *)&valSize, sizeof(size_t) ))) { pthread_cond_wait(&(fifo->writeReady), &(fifo->mutex)); } @@ -347,7 +347,7 @@ compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, compensation_set_error(LLADD_INTERNAL_ERROR); return LLADD_INTERNAL_ERROR; } - while(-2 == ringBufferAppend( ((logMemory_fifo_t *) it->impl)->ringBuffer, val, Valsize)) { + while(-2 == ringBufferAppend( (fifo)->ringBuffer, val, valSize)) { pthread_cond_wait(&(fifo->writeReady), &(fifo->mutex)); } @@ -360,5 +360,14 @@ compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, pthread_mutex_unlock(&(fifo->mutex)); return ret; - // always succeeds. + } +/* if(it->type == LOG_MEMORY_CONSUMER) { + return logMemory_consumer_push(xid, it, key, keySize, val, valSize); + } + if(it->type == POINTER_CONSUMER) { + return pointer_consumer_push(xid, it, key, keySize, val, valSize); + } + + // always succeeds. + }*/ diff --git a/src/lladd/logger/logMemory.h b/src/lladd/logger/logMemory.h index a51c75f..a678378 100644 --- a/src/lladd/logger/logMemory.h +++ b/src/lladd/logger/logMemory.h @@ -19,9 +19,8 @@ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset); -void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it); -compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize); - +void logMemory_consumer_close(int xid, void *it); +int logMemory_consumer_push (int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize); void logMemory_Iterator_close(int xid, void * impl); compensated_function int logMemory_Iterator_next(int xid, void * impl); compensated_function int logMemory_Iterator_tryNext(int xid, void * impl); diff --git a/src/lladd/multiplexer.c b/src/lladd/multiplexer.c index 8213465..0163d85 100644 --- a/src/lladd/multiplexer.c +++ b/src/lladd/multiplexer.c @@ -54,6 +54,7 @@ void * multiplexer_worker(void * arg) { lladdFifo_t * fifo = m->fifoPool->getFifo(m->fifoPool, mkey, mkeySize); consumer = fifo->consumer; Tconsumer_push(m->xid, consumer, key, keySize, value, valueSize); + Titerator_tupleDone(m->xid, m->it); lladdFifoPool_markDirty(m->xid, m->fifoPool, fifo); } diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 8d8784b..5d01f2c 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -6,6 +6,7 @@ #include #include "logger/logWriter.h" #include +#include #include #include @@ -110,6 +111,7 @@ int Tinit() { LinkedListNTAInit(); compensations_init(); iterator_init(); + consumer_init(); setupLockManagerCallbacksNil(); //setupLockManagerCallbacksPage(); diff --git a/test/lladd/check_multiplexer.c b/test/lladd/check_multiplexer.c index 983aa3d..1d8cc39 100644 --- a/test/lladd/check_multiplexer.c +++ b/test/lladd/check_multiplexer.c @@ -123,6 +123,7 @@ static void * trygo( void * arg) { int i = *(int*)(arg+1); array[i]++; + assert(*(lsn_t*)key == i); assert(array[i] == 1); Titerator_tupleDone(-1, it); @@ -194,7 +195,8 @@ START_TEST(multiplexTest) { lladdIterator_t * it = ThashGenericIterator(xid, hash); lladdFifo_t * dirtyFifos = logMemoryFifo((int)(((double)NUM_INSERTS) * 0.5), 0); // 8 bytes of memory used per queued request. - lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(NUM_THREADS, NUM_BYTES_IN_FIFO, NULL, dirtyFifos); + // lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(NUM_THREADS, NUM_BYTES_IN_FIFO, NULL, dirtyFifos); + lladdFifoPool_t * fifoPool = lladdFifoPool_pointerPoolInit(NUM_THREADS, NUM_BYTES_IN_FIFO/10, NULL, dirtyFifos); lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, it, &multiplexHashLogByKey,