Implemented fifo pool that allocates entries out of a pool (it should be easy to extend this to avoid deadlock, while allowing it to continue to provide backpressure)

This commit is contained in:
Sears Russell 2005-03-23 08:07:53 +00:00
parent 9ede6bec33
commit ee9eb416ea
11 changed files with 261 additions and 20 deletions

View file

@ -3,20 +3,23 @@
#ifndef __CONSUMER_H
#define __CONSUMER_H
#define MAX_ITERATOR_TYPES 10
#define MAX_CONSUMER_TYPES 10
#define FIFO_CONSUMER 0
#define ARRAY_CONSUMER 1
#define LOG_MEMORY_CONSUMER 2
typedef struct {
int foo;
} lladdConsumer_def_t;
#define POINTER_CONSUMER 3
typedef struct {
int type;
void * impl;
} lladdConsumer_t;
typedef struct {
int (*push)(int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize);
void(*close)(int xid, void *it);
} lladdConsumer_def_t;
/* call once per Tinit() call */
void consumer_init();
@ -38,3 +41,4 @@ int Tconsumer_push(int xid, lladdConsumer_t * it, byte * key, size_t keySize, by
//int Tconsumer_tryPush(int xid, ....);
#endif // __CONSUMER_H

View file

@ -29,4 +29,17 @@ lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSiz
void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) ;
int lladdFifoPool_consumer_push(int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize);
void lladdFifoPool_consumer_close(int xid, void * it);
lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerCount,
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo);
void lladdFifoPool_iterator_close(int xid, void * it);
int lladdFifoPool_iterator_next(int xid, void * it);
int lladdFifoPool_iterator_tryNext(int xid, void * it);
int lladdFifoPool_iterator_key (int xid, void * it, byte ** key);
int lladdFifoPool_iterator_value (int xid, void * it, byte ** val);
void lladdFifoPool_iterator_tupleDone(int xid, void * it);
void lladdFifoPool_iterator_releaseLock(int xid, void * it);
#endif // __FIFO_H

View file

@ -10,6 +10,7 @@ BEGIN_C_DECLS
#define LINEAR_HASH_NTA_ITERATOR 0
#define ARRAY_ITERATOR 1
#define LOG_MEMORY_ITERATOR 2
#define POINTER_ITERATOR 3
typedef struct {
// void * new(void * arg);

View file

@ -5,7 +5,7 @@ lib_LIBRARIES=liblladd.a
# removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c
liblladd_a_SOURCES=crc32.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \
pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c \
lockManager.c iterator.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\
lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\
logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c \
logger/logMemory.c \
page/slotted.c page/header.c page/fixed.c compensations.c \

View file

@ -1,8 +1,12 @@
#include <lladd/fifo.h>
#include <lladd/crc32.h>
#include <stdlib.h>
#include <stdio.h>
#include "logger/logMemory.h"
#include <string.h>
#include <assert.h>
/**
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer.
*/
@ -37,3 +41,199 @@ lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSiz
pool->fifoCount = consumerCount;
return pool;
}
typedef struct {
int maxPtrs;
int outPtrs;
pthread_mutex_t mutex;
pthread_cond_t writeOK;
} lladdFifoPointerPool_t;
typedef struct pointerFifoEntry {
struct pointerFifoEntry * prev;
struct pointerFifoEntry * next;
int keySize;
int valSize;
} pointerFifoEntry;
typedef struct {
pointerFifoEntry * first;
pointerFifoEntry * last;
pointerFifoEntry * current;
pthread_mutex_t mutex;
int eof;
pthread_cond_t readOK;
//pthread_cond_t writeOK;
lladdFifoPointerPool_t * pool;
} pointerFifoImpl;
static void doNext(int xid, pointerFifoImpl* impl) {
impl->current = impl->last;
if(impl->last->prev) {
impl->last->prev->next = NULL;
impl->last=impl->last->prev;
} else {
impl->first = NULL;
impl->last = NULL;
}
}
int lladdFifoPool_iterator_next(int xid, void * it) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
pthread_mutex_lock(&(impl->mutex));
while(impl->last == NULL && (!impl->eof)) {
pthread_cond_wait(&(impl->readOK), &(impl->mutex));
}
if(impl->eof) {
pthread_mutex_unlock(&(impl->mutex));
return 0;
}
doNext(xid, impl);
// pthread_mutex_unlock(&(impl->mutex));
return 1;
}
int lladdFifoPool_iterator_tryNext(int xid, void * it) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
pthread_mutex_lock(&(impl->mutex));
if(impl->last == NULL) {
pthread_mutex_unlock(&(impl->mutex));
return 0;
} else {
doNext(xid, impl);
}
// pthread_mutex_unlock(&(impl->mutex));
return 1;
}
void lladdFifoPool_iterator_releaseLock(int xid, void * it) {
abort(); // didn't implement nextOrEmpty yet...
}
void lladdFifoPool_iterator_tupleDone(int xid, void * it) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
free(impl->current);
pthread_mutex_unlock(&(impl->mutex));
pthread_mutex_lock(&(impl->pool->mutex));
impl->pool->outPtrs--;
pthread_mutex_unlock(&(impl->pool->mutex));
pthread_cond_broadcast(&(impl->pool->writeOK));
}
int lladdFifoPool_iterator_key (int xid, void * it, byte ** key) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
*key = (byte*)(impl->current+1);
return impl->current->keySize;
}
int lladdFifoPool_iterator_value (int xid, void * it, byte ** val) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
*val = ((byte*)(impl->current+1))+impl->current->keySize;
return impl->current->valSize;
}
void lladdFifoPool_iterator_close(int xid, void * it) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
pthread_mutex_lock(&(impl->mutex));
assert(impl->eof);
assert((!impl->first) && (!impl->last));
printf("Leaking iterator in lladdFifoPool_iterator_close\n");
pthread_mutex_unlock(&(impl->mutex));
}
void lladdFifoPool_consumer_close(int xid, void * it) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
pthread_mutex_lock(&(impl->mutex));
impl->eof = 1;
pthread_cond_broadcast(&(impl->readOK));
pthread_mutex_unlock(&(impl->mutex));
}
int lladdFifoPool_consumer_push(int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
pthread_mutex_lock(&(impl->pool->mutex));
while(impl->pool->outPtrs == impl->pool->maxPtrs) {
pthread_cond_wait(&(impl->pool->writeOK), &(impl->pool->mutex));
}
impl->pool->outPtrs++;
pthread_mutex_unlock(&(impl->pool->mutex));
pointerFifoEntry * entry = malloc(sizeof(pointerFifoEntry) + keySize + valSize);
// do 'expensive stuff' w/o a mutex
memcpy(entry+1, key, keySize);
memcpy(((byte*)(entry+1))+keySize, val, valSize);
entry->keySize = keySize;
entry->valSize = valSize;
pthread_mutex_lock(&(impl->mutex));
entry->next = impl->first;
if(impl->last == NULL) {
impl->last = entry;
assert(!impl->first);
} else {
assert(impl->first);
entry->next->prev = entry;
}
entry->prev = NULL;
impl->first = entry;
pthread_cond_broadcast(&(impl->readOK));
pthread_mutex_unlock(&(impl->mutex));
return 0;
}
lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerCount,
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) {
lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t));
pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount);
int i;
lladdFifoPointerPool_t * poolImpl = malloc(sizeof(lladdFifoPointerPool_t));
poolImpl->maxPtrs = pointerCount;
poolImpl->outPtrs = 0;
pthread_mutex_init(&(poolImpl->mutex), NULL);
pthread_cond_init(&(poolImpl->writeOK), NULL);
for(i = 0; i < consumerCount; i++) {
pool->pool[i] = malloc(sizeof(lladdFifo_t));
pool->pool[i]->iterator = malloc(sizeof(lladdIterator_t));
pool->pool[i]->consumer = malloc(sizeof(lladdConsumer_t));
pool->pool[i]->iterator->type = POINTER_ITERATOR;
pool->pool[i]->consumer->type = POINTER_CONSUMER;
pointerFifoImpl * impl =
(pointerFifoImpl*) (pool->pool[i]->consumer->impl = pool->pool[i]->iterator->impl = malloc(sizeof(pointerFifoImpl)));
impl->first = NULL;
impl->last = NULL;
impl->eof = 0;
pthread_mutex_init(&(impl->mutex), NULL);
pthread_cond_init(&(impl->readOK), NULL);
// pthread_cond_init(&(impl->writeOK), NULL);
impl->pool = poolImpl;
}
pool->getFifo = getFifo ? getFifo : lladdFifoPool_getFifoCRC32;
pool->fifoCount = consumerCount;
pool->dirtyPoolFifo = dirtyPoolFifo;
return pool;
}

View file

@ -43,6 +43,16 @@ void iterator_init() {
logMemory_Iterator_releaseLock,
};
lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def);
lladdIterator_def_t pointer_def = {
lladdFifoPool_iterator_close,
lladdFifoPool_iterator_next,
lladdFifoPool_iterator_tryNext,
lladdFifoPool_iterator_key,
lladdFifoPool_iterator_value,
lladdFifoPool_iterator_tupleDone,
lladdFifoPool_iterator_releaseLock
};
lladdIterator_register(POINTER_ITERATOR, pointer_def);
}

View file

@ -319,9 +319,9 @@ void logMemory_Iterator_releaseTuple(int xid, void *it) {
/* ------------------- consumer implementation ------------------------------*/
void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it){
void logMemory_consumer_close(int xid, void *it){
/* This needs to tell the iterator where the end of the ring buffer is. */
logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it->impl);
logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it);
pthread_mutex_lock(&(fifo->mutex));
fifo->eof = ringBufferAppendPosition(fifo->ringBuffer);
assert(fifo->eof != -1);
@ -329,17 +329,17 @@ void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it){
pthread_mutex_unlock(&(fifo->mutex));
}
compensated_function void Tconsumer_close(int xid, lladdConsumer_t * cons) {
logMemory_Tconsumer_close(xid, cons);
}
/*compensated_function void Tconsumer_close(int xid, lladdConsumer_t * cons) {
logMemory_consumer_close(xid, cons);
}*/
compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize) {
int logMemory_consumer_push(int xid, /*lladdConsumer_t * cons*/ void * it, byte * key, size_t keySize, byte * val, size_t valSize) {
int ret;
logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it->impl);
logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it);
pthread_mutex_lock(&(fifo->mutex));
while(-2 == (ret = ringBufferAppend(fifo->ringBuffer,
(byte *)&Valsize,
(byte *)&valSize,
sizeof(size_t) ))) {
pthread_cond_wait(&(fifo->writeReady), &(fifo->mutex));
}
@ -347,7 +347,7 @@ compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key,
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}
while(-2 == ringBufferAppend( ((logMemory_fifo_t *) it->impl)->ringBuffer, val, Valsize)) {
while(-2 == ringBufferAppend( (fifo)->ringBuffer, val, valSize)) {
pthread_cond_wait(&(fifo->writeReady), &(fifo->mutex));
}
@ -360,5 +360,14 @@ compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key,
pthread_mutex_unlock(&(fifo->mutex));
return ret;
// always succeeds.
}
/* if(it->type == LOG_MEMORY_CONSUMER) {
return logMemory_consumer_push(xid, it, key, keySize, val, valSize);
}
if(it->type == POINTER_CONSUMER) {
return pointer_consumer_push(xid, it, key, keySize, val, valSize);
}
// always succeeds.
}*/

View file

@ -19,9 +19,8 @@
lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset);
void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it);
compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize);
void logMemory_consumer_close(int xid, void *it);
int logMemory_consumer_push (int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize);
void logMemory_Iterator_close(int xid, void * impl);
compensated_function int logMemory_Iterator_next(int xid, void * impl);
compensated_function int logMemory_Iterator_tryNext(int xid, void * impl);

View file

@ -54,6 +54,7 @@ void * multiplexer_worker(void * arg) {
lladdFifo_t * fifo = m->fifoPool->getFifo(m->fifoPool, mkey, mkeySize);
consumer = fifo->consumer;
Tconsumer_push(m->xid, consumer, key, keySize, value, valueSize);
Titerator_tupleDone(m->xid, m->it);
lladdFifoPool_markDirty(m->xid, m->fifoPool, fifo);
}

View file

@ -6,6 +6,7 @@
#include <lladd/recovery.h>
#include "logger/logWriter.h"
#include <lladd/bufferManager.h>
#include <lladd/consumer.h>
#include <lladd/lockManager.h>
#include <lladd/compensations.h>
@ -110,6 +111,7 @@ int Tinit() {
LinkedListNTAInit();
compensations_init();
iterator_init();
consumer_init();
setupLockManagerCallbacksNil();
//setupLockManagerCallbacksPage();

View file

@ -123,6 +123,7 @@ static void * trygo( void * arg) {
int i = *(int*)(arg+1);
array[i]++;
assert(*(lsn_t*)key == i);
assert(array[i] == 1);
Titerator_tupleDone(-1, it);
@ -194,7 +195,8 @@ START_TEST(multiplexTest) {
lladdIterator_t * it = ThashGenericIterator(xid, hash);
lladdFifo_t * dirtyFifos = logMemoryFifo((int)(((double)NUM_INSERTS) * 0.5), 0); // 8 bytes of memory used per queued request.
lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(NUM_THREADS, NUM_BYTES_IN_FIFO, NULL, dirtyFifos);
// lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(NUM_THREADS, NUM_BYTES_IN_FIFO, NULL, dirtyFifos);
lladdFifoPool_t * fifoPool = lladdFifoPool_pointerPoolInit(NUM_THREADS, NUM_BYTES_IN_FIFO/10, NULL, dirtyFifos);
lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, it,
&multiplexHashLogByKey,