Partially refactored logMemory and multiplexer into a fifo implementation. Added support to fifoPool for

asynchronous threads running round-robin through the dirty fifos consuming entries in idle ones.
This commit is contained in:
Sears Russell 2005-03-16 00:52:21 +00:00
parent dffff067bb
commit bf4be359b3
7 changed files with 127 additions and 58 deletions

32
lladd/fifo.h Normal file
View file

@ -0,0 +1,32 @@
#include <lladd/iterator.h>
#include <lladd/consumer.h>
#ifndef __FIFO_H
#define __FIFO_H
typedef struct {
lladdIterator_t *iterator;
lladdConsumer_t *consumer;
} lladdFifo_t;
typedef struct lladdFifoPool_t {
lladdFifo_t ** pool;
lladdFifo_t * (*getFifo)(struct lladdFifoPool_t * pool,
byte * multiplexKey,
size_t multiplexKeySize);
int fifoCount;
lladdFifo_t * dirtyPoolFifo;
} lladdFifoPool_t;
typedef lladdFifo_t * (lladdFifoPool_getFifo_t)(lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize);
lladdFifoPool_getFifo_t lladdFifoPool_getFifoCRC32;
lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSize,
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo);
void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) ;
#endif // __FIFO_H

View file

@ -12,6 +12,7 @@ typedef struct {
// void * new(void * arg); // void * new(void * arg);
void (*close)(int xid, void * it); void (*close)(int xid, void * it);
int (*next) (int xid, void * it); int (*next) (int xid, void * it);
int (*tryNext) (int xid, void * it);
int (*key) (int xid, void * it, byte ** key); int (*key) (int xid, void * it, byte ** key);
int (*value)(int xid, void * it, byte ** value); int (*value)(int xid, void * it, byte ** value);
void (*tupleDone)(int xid, void * it); void (*tupleDone)(int xid, void * it);
@ -40,6 +41,18 @@ void Titerator_close(int xid, lladdIterator_t * it);
*/ */
int Titerator_next(int xid, lladdIterator_t * it); int Titerator_next(int xid, lladdIterator_t * it);
/**
@param it the iterator
@return 1 if the iterator position advanced, and releaseTuple must be called,
0 if the iterator has been locked by another reader, no tuples are ready, or the iterator has been closed.
@todo think more carefully about the return value of Titerator_tryNext(). I'm not convinced that a 0/1
return value is adequate.
*/
int Titerator_tryNext(int xid, lladdIterator_t * it);
/** /**
NOTE: next acquires a mutex, releaseTuple releases mutex, NOTE: next acquires a mutex, releaseTuple releases mutex,
next key value --atomic next key value --atomic

View file

@ -4,9 +4,6 @@
#include <pbl/pbl.h> #include <pbl/pbl.h>
void multiplexHashLogByKey(byte * key, void multiplexHashLogByKey(byte * key,
size_t keySize, size_t keySize,
byte * value, byte * value,
@ -26,10 +23,10 @@ typedef struct {
/** A hash of consumer implementations, keyed on the output of the multiplexKey parameter of *multiplex */ /** A hash of consumer implementations, keyed on the output of the multiplexKey parameter of *multiplex */
pblHashTable_t * consumerHash; pblHashTable_t * consumerHash;
/** The next two fields are used to create new consumers on demand. */ /** The next two fields are used to create new consumers on demand. */
lladdConsumer_t * (*getConsumer)(struct lladdFifoPool_t * newConsumerArg, /* lladdConsumer_t * (*getConsumer)(struct lladdFifoPool_t * newConsumerArg,
byte* multiplexKey, byte* multiplexKey,
size_t multiplexKeySize); size_t multiplexKeySize); */
lladdFifoPool_t * getConsumerArg; lladdFifoPool_t * fifoPool;
pthread_t worker; pthread_t worker;
int xid; int xid;
} lladdMultiplexer_t; } lladdMultiplexer_t;
@ -42,9 +39,9 @@ lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it,
size_t valueSize, size_t valueSize,
byte ** multiplexKey, byte ** multiplexKey,
size_t * multiplexKeySize), size_t * multiplexKeySize),
lladdConsumer_t * getConsumer(lladdFifoPool_t * getConsumerArg, /* lladdConsumer_t * getConsumer(lladdFifoPool_t * fifoPool,
byte* multiplexKey, byte* multiplexKey,
size_t multiplexKeySize), size_t multiplexKeySize), */
lladdFifoPool_t * fifoPool); lladdFifoPool_t * fifoPool);
/** /**
@ -70,5 +67,3 @@ int lladdMultiplexer_start(lladdMultiplexer_t * multiplexer, pthread_attr_t * th
*/ */
int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer); int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer);
lladdConsumer_t * fifoPool_getConsumerCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize);
lladdFifoPool_t * fifoPool_ringBufferInit (int consumerCount, int bufferSize);

View file

@ -5,9 +5,9 @@ lib_LIBRARIES=liblladd.a
# removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c # removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c
liblladd_a_SOURCES=crc32.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \ liblladd_a_SOURCES=crc32.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \
pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c \ pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c \
lockManager.c iterator.c arrayCollection.c ringbuffer.c \ lockManager.c iterator.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c\
logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c \ logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c \
logger/logMemory.c multiplexer.c\ logger/logMemory.c \
page/slotted.c page/header.c page/fixed.c compensations.c \ page/slotted.c page/header.c page/fixed.c compensations.c \
operations/pageOperations.c page/indirect.c operations/decrement.c \ operations/pageOperations.c page/indirect.c operations/decrement.c \
operations/increment.c operations/prepare.c operations/set.c \ operations/increment.c operations/prepare.c operations/set.c \

39
src/lladd/fifo.c Normal file
View file

@ -0,0 +1,39 @@
#include <lladd/fifo.h>
#include <lladd/crc32.h>
#include <stdlib.h>
#include "logger/logMemory.h"
/**
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer.
*/
lladdFifo_t * lladdFifoPool_getFifoCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) {
int memberId = crc32(multiplexKey, multiplexKeySize, (unsigned long)-1L) % pool->fifoCount;
return pool->pool[memberId];
}
void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) {
if(pool->dirtyPoolFifo) {
Tconsumer_push(xid, pool->dirtyPoolFifo->consumer, NULL, 0, (byte*)&fifo, sizeof(lladdFifo_t*));
}
}
/**
Create a new pool of ringBuffer based fifos
@param consumerCount the number of consumers in the pool.
@todo this function should be generalized to other consumer implementations.
*/
lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSize,
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) {
lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t));
pool->getFifo = getFifo ? getFifo : lladdFifoPool_getFifoCRC32;
pool->dirtyPoolFifo = dirtyPoolFifo;
pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount);
int i;
for(i = 0; i < consumerCount; i++) {
pool->pool[i] = logMemoryFifo(bufferSize, 0);
}
pool->fifoCount = consumerCount;
return pool;
}

View file

@ -16,23 +16,39 @@ static void noopTupDone(int xid, void * foo) { }
void iterator_init() { void iterator_init() {
lladdIterator_def_t linearHashNTA_def = { lladdIterator_def_t linearHashNTA_def = {
linearHashNTAIterator_close, linearHashNTAIterator_next, linearHashNTAIterator_key, linearHashNTAIterator_value, noopTupDone linearHashNTAIterator_close,
linearHashNTAIterator_next,
linearHashNTAIterator_next,
linearHashNTAIterator_key,
linearHashNTAIterator_value,
noopTupDone
}; };
lladdIterator_register(LINEAR_HASH_NTA_ITERATOR, linearHashNTA_def); lladdIterator_register(LINEAR_HASH_NTA_ITERATOR, linearHashNTA_def);
lladdIterator_def_t array_def = { lladdIterator_def_t array_def = {
arrayIterator_close, arrayIterator_next, arrayIterator_key, arrayIterator_value, noopTupDone arrayIterator_close,
arrayIterator_next,
arrayIterator_next,
arrayIterator_key,
arrayIterator_value,
noopTupDone
}; };
lladdIterator_register(ARRAY_ITERATOR, array_def); lladdIterator_register(ARRAY_ITERATOR, array_def);
lladdIterator_def_t logMemory_def = { lladdIterator_def_t logMemory_def = {
logMemory_Iterator_close, logMemory_Iterator_next, logMemory_Iterator_key, logMemory_Iterator_value, noopTupDone logMemory_Iterator_close,
logMemory_Iterator_next,
logMemory_Iterator_tryNext,
logMemory_Iterator_key,
logMemory_Iterator_value,
logMemory_Iterator_releaseTuple
}; };
lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def); lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def);
} }
//lladdIterator_t Titerator(int type, void * arg); //lladdIterator_t Titerator(int type, void * arg);
void Titerator_close(int xid, lladdIterator_t * it) { iterators[it->type].close(xid, it->impl); free(it); } void Titerator_close (int xid, lladdIterator_t * it) { iterators[it->type].close(xid, it->impl); free(it); }
int Titerator_next (int xid, lladdIterator_t * it) { return iterators[it->type].next (xid, it->impl); } int Titerator_next (int xid, lladdIterator_t * it) { return iterators[it->type].next (xid, it->impl); }
int Titerator_key (int xid, lladdIterator_t * it, byte ** key) { return iterators[it->type].key (xid, it->impl, key); } int Titerator_tryNext(int xid, lladdIterator_t * it) { return iterators[it->type].tryNext (xid, it->impl); }
int Titerator_key (int xid, lladdIterator_t * it, byte ** key){ return iterators[it->type].key (xid, it->impl, key); }
int Titerator_value(int xid, lladdIterator_t * it, byte ** value){ return iterators[it->type].value(xid, it->impl, value); } int Titerator_value(int xid, lladdIterator_t * it, byte ** value){ return iterators[it->type].value(xid, it->impl, value); }
void Titerator_tupleDone(int xid, lladdIterator_t * it) { iterators[it->type].tupleDone(xid, it->impl); } void Titerator_tupleDone(int xid, lladdIterator_t * it) { iterators[it->type].tupleDone(xid, it->impl); }

View file

@ -12,16 +12,16 @@ lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it,
size_t valueSize, size_t valueSize,
byte ** multiplexKey, byte ** multiplexKey,
size_t * multiplexKeySize), size_t * multiplexKeySize),
lladdConsumer_t * getConsumer(struct lladdFifoPool_t* getConsumerArg, /* lladdConsumer_t * getConsumer(struct lladdFifoPool_t* fifoPool,
byte* multiplexKey, byte* multiplexKey,
size_t multiplexKeySize), size_t multiplexKeySize), */
lladdFifoPool_t * fifoPool) { lladdFifoPool_t * fifoPool) {
lladdMultiplexer_t * ret = malloc(sizeof(lladdMultiplexer_t)); lladdMultiplexer_t * ret = malloc(sizeof(lladdMultiplexer_t));
ret->it = it; ret->it = it;
ret->multiplexer = multiplexer; ret->multiplexer = multiplexer;
ret->consumerHash = pblHtCreate(); ret->consumerHash = pblHtCreate();
ret->getConsumer = getConsumer; // ret->getConsumer = getConsumer;
ret->getConsumerArg = fifoPool; ret->fifoPool = fifoPool;
ret->xid = xid; ret->xid = xid;
return ret; return ret;
} }
@ -51,29 +51,28 @@ void * multiplexer_worker(void * arg) {
m->multiplexer(key, keySize, value, valueSize, &mkey, &mkeySize); m->multiplexer(key, keySize, value, valueSize, &mkey, &mkeySize);
lladdFifo_t * fifo = m->fifoPool->getFifo(m->fifoPool, mkey, mkeySize);
consumer = m->getConsumer(m->getConsumerArg, mkey, mkeySize); consumer = fifo->consumer;
/* lladdConsumer_t * consumer = pblHtLookup(m->consumerHash);
if(consumer == NULL) {
consumer = m->newConsumer(m->newConsumerArg, mkey, mkeySize);
pblHtInsert(m->consumerHash, mkey, mkeySize, consumer);
} */
Tconsumer_push(m->xid, consumer, key, keySize, value, valueSize); Tconsumer_push(m->xid, consumer, key, keySize, value, valueSize);
lladdFifoPool_markDirty(m->xid, m->fifoPool, fifo);
} }
// iterate over pblhash, closing consumers. // iterate over pblhash, closing consumers.
Titerator_close(m->xid, m->it); Titerator_close(m->xid, m->it);
lladdFifoPool_t * pool = m->getConsumerArg; /** @todo Does this belong in its own function in fifo.c? */
lladdFifoPool_t * pool = m->fifoPool;
int i; int i;
for(i = 0; i < pool->fifoCount; i++) { for(i = 0; i < pool->fifoCount; i++) {
Tconsumer_close(m->xid, pool->pool[i]->consumer); Tconsumer_close(m->xid, pool->pool[i]->consumer);
} }
if(m->fifoPool->dirtyPoolFifo) {
Tconsumer_close(m->xid, m->fifoPool->dirtyPoolFifo->consumer);
}
return (void*)compensation_error(); return (void*)compensation_error();
} }
@ -120,28 +119,3 @@ void multiplexHashLogByKey(byte * key,
} }
} }
/**
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer. */
lladdConsumer_t * fifoPool_getConsumerCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) {
int memberId = crc32(multiplexKey, multiplexKeySize, (unsigned long)-1L) % pool->fifoCount;
return pool->pool[memberId]->consumer;
}
/**
Create a new pool of ringBuffer based fifos
@param consumerCount the number of consumers in the pool.
@todo this function should be generalized to other consumer implementations.
*/
lladdFifoPool_t * fifoPool_ringBufferInit (int consumerCount, int bufferSize) {
lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t));
pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount);
int i;
for(i = 0; i < consumerCount; i++) {
pool->pool[i] = logMemoryFifo(bufferSize, 0);
}
pool->fifoCount = consumerCount;
return pool;
}