2005-03-16 00:52:21 +00:00
|
|
|
#include <lladd/fifo.h>
|
|
|
|
#include <lladd/crc32.h>
|
|
|
|
#include <stdlib.h>
|
2005-03-23 08:07:53 +00:00
|
|
|
#include <stdio.h>
|
2005-03-16 00:52:21 +00:00
|
|
|
#include "logger/logMemory.h"
|
|
|
|
|
2005-03-23 08:07:53 +00:00
|
|
|
#include <string.h>
|
|
|
|
#include <assert.h>
|
|
|
|
|
2005-03-16 00:52:21 +00:00
|
|
|
/**
|
|
|
|
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer.
|
|
|
|
*/
|
|
|
|
lladdFifo_t * lladdFifoPool_getFifoCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) {
|
2006-03-20 23:11:46 +00:00
|
|
|
int memberId = crc32(multiplexKey, multiplexKeySize, (unsigned int)-1) % pool->fifoCount;
|
2005-03-16 00:52:21 +00:00
|
|
|
return pool->pool[memberId];
|
|
|
|
}
|
|
|
|
void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) {
|
|
|
|
if(pool->dirtyPoolFifo) {
|
2005-03-24 04:43:22 +00:00
|
|
|
assert(fifo->consumer->type < 10);
|
|
|
|
assert(fifo->iterator->type < 10);
|
2005-03-16 00:52:21 +00:00
|
|
|
Tconsumer_push(xid, pool->dirtyPoolFifo->consumer, NULL, 0, (byte*)&fifo, sizeof(lladdFifo_t*));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
Create a new pool of ringBuffer based fifos
|
|
|
|
|
|
|
|
@param consumerCount the number of consumers in the pool.
|
|
|
|
@todo this function should be generalized to other consumer implementations.
|
|
|
|
*/
|
|
|
|
lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSize,
|
|
|
|
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) {
|
|
|
|
lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t));
|
|
|
|
|
|
|
|
pool->getFifo = getFifo ? getFifo : lladdFifoPool_getFifoCRC32;
|
|
|
|
pool->dirtyPoolFifo = dirtyPoolFifo;
|
|
|
|
|
|
|
|
pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount);
|
|
|
|
int i;
|
|
|
|
for(i = 0; i < consumerCount; i++) {
|
|
|
|
pool->pool[i] = logMemoryFifo(bufferSize, 0);
|
|
|
|
}
|
|
|
|
pool->fifoCount = consumerCount;
|
|
|
|
return pool;
|
|
|
|
}
|
2005-03-23 08:07:53 +00:00
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
int maxPtrs;
|
|
|
|
int outPtrs;
|
|
|
|
pthread_mutex_t mutex;
|
|
|
|
pthread_cond_t writeOK;
|
|
|
|
} lladdFifoPointerPool_t;
|
|
|
|
|
|
|
|
typedef struct pointerFifoEntry {
|
|
|
|
struct pointerFifoEntry * prev;
|
|
|
|
struct pointerFifoEntry * next;
|
|
|
|
int keySize;
|
|
|
|
int valSize;
|
|
|
|
} pointerFifoEntry;
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
pointerFifoEntry * first;
|
|
|
|
pointerFifoEntry * last;
|
|
|
|
pointerFifoEntry * current;
|
|
|
|
pthread_mutex_t mutex;
|
|
|
|
int eof;
|
|
|
|
pthread_cond_t readOK;
|
|
|
|
//pthread_cond_t writeOK;
|
|
|
|
lladdFifoPointerPool_t * pool;
|
|
|
|
} pointerFifoImpl;
|
|
|
|
|
|
|
|
static void doNext(int xid, pointerFifoImpl* impl) {
|
|
|
|
|
|
|
|
impl->current = impl->last;
|
|
|
|
if(impl->last->prev) {
|
|
|
|
impl->last->prev->next = NULL;
|
|
|
|
impl->last=impl->last->prev;
|
|
|
|
} else {
|
|
|
|
impl->first = NULL;
|
|
|
|
impl->last = NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
int lladdFifoPool_iterator_next(int xid, void * it) {
|
|
|
|
pointerFifoImpl * impl = (pointerFifoImpl *) it;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&(impl->mutex));
|
|
|
|
|
|
|
|
while(impl->last == NULL && (!impl->eof)) {
|
|
|
|
pthread_cond_wait(&(impl->readOK), &(impl->mutex));
|
|
|
|
}
|
|
|
|
|
|
|
|
if(impl->eof) {
|
|
|
|
pthread_mutex_unlock(&(impl->mutex));
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
doNext(xid, impl);
|
|
|
|
|
|
|
|
// pthread_mutex_unlock(&(impl->mutex));
|
|
|
|
|
|
|
|
return 1;
|
|
|
|
|
|
|
|
}
|
|
|
|
int lladdFifoPool_iterator_tryNext(int xid, void * it) {
|
|
|
|
pointerFifoImpl * impl = (pointerFifoImpl *) it;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&(impl->mutex));
|
|
|
|
|
|
|
|
if(impl->last == NULL) {
|
|
|
|
pthread_mutex_unlock(&(impl->mutex));
|
|
|
|
return 0;
|
|
|
|
} else {
|
|
|
|
doNext(xid, impl);
|
|
|
|
}
|
|
|
|
|
|
|
|
// pthread_mutex_unlock(&(impl->mutex));
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
void lladdFifoPool_iterator_releaseLock(int xid, void * it) {
|
|
|
|
abort(); // didn't implement nextOrEmpty yet...
|
|
|
|
}
|
|
|
|
|
|
|
|
void lladdFifoPool_iterator_tupleDone(int xid, void * it) {
|
|
|
|
pointerFifoImpl * impl = (pointerFifoImpl *) it;
|
|
|
|
|
|
|
|
free(impl->current);
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&(impl->mutex));
|
|
|
|
|
|
|
|
pthread_mutex_lock(&(impl->pool->mutex));
|
|
|
|
impl->pool->outPtrs--;
|
|
|
|
pthread_mutex_unlock(&(impl->pool->mutex));
|
|
|
|
pthread_cond_broadcast(&(impl->pool->writeOK));
|
|
|
|
}
|
|
|
|
|
|
|
|
int lladdFifoPool_iterator_key (int xid, void * it, byte ** key) {
|
|
|
|
pointerFifoImpl * impl = (pointerFifoImpl *) it;
|
|
|
|
*key = (byte*)(impl->current+1);
|
|
|
|
return impl->current->keySize;
|
|
|
|
}
|
|
|
|
int lladdFifoPool_iterator_value (int xid, void * it, byte ** val) {
|
|
|
|
pointerFifoImpl * impl = (pointerFifoImpl *) it;
|
|
|
|
*val = ((byte*)(impl->current+1))+impl->current->keySize;
|
|
|
|
return impl->current->valSize;
|
|
|
|
}
|
|
|
|
|
|
|
|
void lladdFifoPool_iterator_close(int xid, void * it) {
|
|
|
|
pointerFifoImpl * impl = (pointerFifoImpl *) it;
|
|
|
|
pthread_mutex_lock(&(impl->mutex));
|
|
|
|
assert(impl->eof);
|
|
|
|
assert((!impl->first) && (!impl->last));
|
|
|
|
printf("Leaking iterator in lladdFifoPool_iterator_close\n");
|
|
|
|
pthread_mutex_unlock(&(impl->mutex));
|
|
|
|
}
|
|
|
|
|
|
|
|
void lladdFifoPool_consumer_close(int xid, void * it) {
|
|
|
|
pointerFifoImpl * impl = (pointerFifoImpl *) it;
|
|
|
|
pthread_mutex_lock(&(impl->mutex));
|
|
|
|
impl->eof = 1;
|
|
|
|
pthread_cond_broadcast(&(impl->readOK));
|
|
|
|
pthread_mutex_unlock(&(impl->mutex));
|
|
|
|
}
|
|
|
|
|
|
|
|
int lladdFifoPool_consumer_push(int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize) {
|
|
|
|
pointerFifoImpl * impl = (pointerFifoImpl *) it;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&(impl->pool->mutex));
|
|
|
|
while(impl->pool->outPtrs == impl->pool->maxPtrs) {
|
|
|
|
pthread_cond_wait(&(impl->pool->writeOK), &(impl->pool->mutex));
|
|
|
|
}
|
|
|
|
impl->pool->outPtrs++;
|
|
|
|
pthread_mutex_unlock(&(impl->pool->mutex));
|
|
|
|
|
|
|
|
pointerFifoEntry * entry = malloc(sizeof(pointerFifoEntry) + keySize + valSize);
|
|
|
|
|
|
|
|
// do 'expensive stuff' w/o a mutex
|
|
|
|
memcpy(entry+1, key, keySize);
|
|
|
|
memcpy(((byte*)(entry+1))+keySize, val, valSize);
|
|
|
|
entry->keySize = keySize;
|
|
|
|
entry->valSize = valSize;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&(impl->mutex));
|
|
|
|
|
|
|
|
entry->next = impl->first;
|
|
|
|
if(impl->last == NULL) {
|
|
|
|
impl->last = entry;
|
|
|
|
assert(!impl->first);
|
|
|
|
} else {
|
|
|
|
assert(impl->first);
|
|
|
|
entry->next->prev = entry;
|
|
|
|
}
|
|
|
|
entry->prev = NULL;
|
|
|
|
impl->first = entry;
|
|
|
|
|
|
|
|
pthread_cond_broadcast(&(impl->readOK));
|
|
|
|
pthread_mutex_unlock(&(impl->mutex));
|
|
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerCount,
|
|
|
|
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) {
|
|
|
|
|
|
|
|
lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t));
|
|
|
|
|
|
|
|
pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount);
|
|
|
|
int i;
|
|
|
|
|
|
|
|
lladdFifoPointerPool_t * poolImpl = malloc(sizeof(lladdFifoPointerPool_t));
|
|
|
|
poolImpl->maxPtrs = pointerCount;
|
|
|
|
poolImpl->outPtrs = 0;
|
|
|
|
pthread_mutex_init(&(poolImpl->mutex), NULL);
|
|
|
|
pthread_cond_init(&(poolImpl->writeOK), NULL);
|
|
|
|
|
|
|
|
for(i = 0; i < consumerCount; i++) {
|
|
|
|
pool->pool[i] = malloc(sizeof(lladdFifo_t));
|
|
|
|
pool->pool[i]->iterator = malloc(sizeof(lladdIterator_t));
|
|
|
|
pool->pool[i]->consumer = malloc(sizeof(lladdConsumer_t));
|
|
|
|
pool->pool[i]->iterator->type = POINTER_ITERATOR;
|
|
|
|
pool->pool[i]->consumer->type = POINTER_CONSUMER;
|
|
|
|
pointerFifoImpl * impl =
|
|
|
|
(pointerFifoImpl*) (pool->pool[i]->consumer->impl = pool->pool[i]->iterator->impl = malloc(sizeof(pointerFifoImpl)));
|
|
|
|
impl->first = NULL;
|
|
|
|
impl->last = NULL;
|
|
|
|
impl->eof = 0;
|
|
|
|
pthread_mutex_init(&(impl->mutex), NULL);
|
|
|
|
pthread_cond_init(&(impl->readOK), NULL);
|
|
|
|
// pthread_cond_init(&(impl->writeOK), NULL);
|
|
|
|
|
|
|
|
impl->pool = poolImpl;
|
|
|
|
}
|
|
|
|
pool->getFifo = getFifo ? getFifo : lladdFifoPool_getFifoCRC32;
|
|
|
|
pool->fifoCount = consumerCount;
|
|
|
|
pool->dirtyPoolFifo = dirtyPoolFifo;
|
|
|
|
|
|
|
|
return pool;
|
|
|
|
}
|