Initial upload of (untested) multiplexer implementation.
This commit is contained in:
parent
ed49949512
commit
9be3c4edd3
2 changed files with 216 additions and 0 deletions
63
lladd/multiplexer.h
Normal file
63
lladd/multiplexer.h
Normal file
|
@ -0,0 +1,63 @@
|
|||
#include "iterator.h"
|
||||
#include "consumer.h"
|
||||
#include <pbl/pbl.h>
|
||||
/**
|
||||
A multiplexer takes an iterator, and splits its output into multiple consumers.
|
||||
*/
|
||||
|
||||
|
||||
typedef struct {
|
||||
lladdIterator_t * it;
|
||||
void (*multiplexer)(byte * key,
|
||||
size_t keySize,
|
||||
byte * value,
|
||||
size_t valueSize,
|
||||
byte ** multiplexKey,
|
||||
size_t * multiplexKeySize);
|
||||
|
||||
/** A hash of consumer implementations, keyed on the output of the multiplexKey parameter of *multiplex */
|
||||
pblHashTable_t * consumerHash;
|
||||
/** The next two fields are used to create new consumers on demand. */
|
||||
lladdConsumer_t * (*getConsumer)(void* newConsumerArg,
|
||||
byte* multiplexKey,
|
||||
size_t multiplexKeySize);
|
||||
void * getConsumerArg;
|
||||
pthread_t worker;
|
||||
int xid;
|
||||
} lladdMultiplexer_t;
|
||||
|
||||
|
||||
lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it,
|
||||
void (*multiplexer)(byte * key,
|
||||
size_t keySize,
|
||||
byte * value,
|
||||
size_t valueSize,
|
||||
byte ** multiplexKey,
|
||||
size_t * multiplexKeySize),
|
||||
lladdConsumer_t * getConsumer(void* getConsumerArg,
|
||||
byte* multiplexKey,
|
||||
size_t multiplexKeySize),
|
||||
void * getConsumerArg);
|
||||
|
||||
/**
|
||||
creates a new thread that will consume input from it, and forward
|
||||
its output to the consumers.
|
||||
|
||||
@param thread_attributes passed through to pthread_create, it is
|
||||
fine if this is NULL, although it probably makes sense to set the
|
||||
stack size to something reasonable (PTHREAD_STACK_MIN will
|
||||
probably work. LLADD is tested with 16K stacks under linux/x86
|
||||
(where 16K = PTHREAD_STACK_MIN) , while the default pthread stack
|
||||
size is 2M. Your milage may vary.)
|
||||
|
||||
@return zero on success, or error code (@see pthread_create for
|
||||
possible return values) on failure.
|
||||
*/
|
||||
int lladdMultiplexer_start(lladdMultiplexer_t * multiplexer, pthread_attr_t * thread_attributes);
|
||||
|
||||
/**
|
||||
block the current thread until the multiplexer shuts down.
|
||||
|
||||
@todo lladdMultiplex_join does not propagate compensation errors as it should.
|
||||
*/
|
||||
int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer);
|
153
src/lladd/multiplexer.c
Normal file
153
src/lladd/multiplexer.c
Normal file
|
@ -0,0 +1,153 @@
|
|||
#include <lladd/multiplexer.h>
|
||||
#include <lladd/crc32.h>
|
||||
#include <stdlib.h>
|
||||
#include <lladd/operations/linearHashNTA.h>
|
||||
lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it,
|
||||
void (*multiplexer)(byte * key,
|
||||
size_t keySize,
|
||||
byte * value,
|
||||
size_t valueSize,
|
||||
byte ** multiplexKey,
|
||||
size_t * multiplexKeySize),
|
||||
lladdConsumer_t * getConsumer(void* getConsumerArg,
|
||||
byte* multiplexKey,
|
||||
size_t multiplexKeySize),
|
||||
void * getConsumerArg) {
|
||||
lladdMultiplexer_t * ret = malloc(sizeof(lladdMultiplexer_t));
|
||||
ret->it = it;
|
||||
ret->multiplexer = multiplexer;
|
||||
ret->consumerHash = pblHtCreate();
|
||||
ret->getConsumer = getConsumer;
|
||||
ret->getConsumerArg = getConsumerArg;
|
||||
ret->xid = xid;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void * multiplexer_worker(void * arg);
|
||||
|
||||
int lladdMultiplexer_start(lladdMultiplexer_t * multiplexer, pthread_attr_t * thread_attributes) {
|
||||
return pthread_create(&multiplexer->worker, thread_attributes, multiplexer_worker, multiplexer);
|
||||
}
|
||||
|
||||
|
||||
int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer) {
|
||||
return pthread_join(multiplexer->worker,NULL);
|
||||
}
|
||||
|
||||
|
||||
void * multiplexer_worker(void * arg) {
|
||||
lladdMultiplexer_t * m = arg;
|
||||
|
||||
while(Titerator_next(m->xid, m->it)) {
|
||||
byte * mkey, * key, * value;
|
||||
size_t mkeySize, keySize, valueSize;
|
||||
|
||||
keySize = Titerator_key (m->xid, m->it, &key);
|
||||
valueSize = Titerator_value(m->xid, m->it, &value);
|
||||
|
||||
m->multiplexer(key, keySize, value, valueSize, &mkey, &mkeySize);
|
||||
|
||||
lladdConsumer_t * consumer = m->getConsumer(m->getConsumerArg, mkey, mkeySize);
|
||||
|
||||
/* lladdConsumer_t * consumer = pblHtLookup(m->consumerHash);
|
||||
if(consumer == NULL) {
|
||||
consumer = m->newConsumer(m->newConsumerArg, mkey, mkeySize);
|
||||
pblHtInsert(m->consumerHash, mkey, mkeySize, consumer);
|
||||
} */
|
||||
|
||||
Tconsumer_push(m->xid, consumer, key, keySize, value, valueSize);
|
||||
|
||||
}
|
||||
|
||||
// iterate over pblhash, closing consumers.
|
||||
|
||||
Titerator_close(m->xid, m->it);
|
||||
|
||||
return (void*)compensation_error();
|
||||
}
|
||||
|
||||
/* ****************** END OF MULTIXPLEXER IMPLEMENTATION **************
|
||||
|
||||
Sample callbacks follow.
|
||||
|
||||
*********************************************************************/
|
||||
// @todo remove the code until the //-----, as it just makes up for code that Jimmy needs to commit!
|
||||
|
||||
typedef struct {
|
||||
lladdIterator_t *iterator;
|
||||
lladdConsumer_t *consumer;
|
||||
} lladdFifo_t;
|
||||
|
||||
lladdFifo_t * logMemory_init(int bufferSize, int initialLSN);
|
||||
|
||||
//---------
|
||||
|
||||
typedef struct lladdFifoPool_t {
|
||||
lladdFifo_t ** pool;
|
||||
lladdConsumer_t * (*getConsumer)(struct lladdFifoPool_t * pool,
|
||||
byte * multiplexKey,
|
||||
size_t multiplexKeySize);
|
||||
int fifoCount;
|
||||
} lladdFifoPool_t;
|
||||
|
||||
|
||||
|
||||
void multiplexHashLogByKey(byte * key,
|
||||
size_t keySize,
|
||||
byte * value,
|
||||
size_t valueSize,
|
||||
byte ** multiplexKey,
|
||||
size_t * multiplexKeySize) {
|
||||
// We don't care what the key is. It's probably an LSN.
|
||||
const LogEntry * log = (const LogEntry*) value;
|
||||
const byte * updateArgs = getUpdateArgs(log); // assume the log is a logical update entry.
|
||||
switch(log->contents.update.funcID) {
|
||||
|
||||
// If you really want to know why insert takes
|
||||
// linearHash_remove_arg entries and vice versa, look at
|
||||
// linearHashNTA. Note that during normal (physiological forward)
|
||||
// operation, ThashInsert() *generates* insert args for its undo
|
||||
// implementation, ThashRemove() and vice versa. Therefore,
|
||||
// ThashRemove's operation implementation takes an insert
|
||||
// argument.
|
||||
|
||||
case OPERATION_LINEAR_HASH_INSERT:
|
||||
{
|
||||
linearHash_remove_arg * arg = (linearHash_remove_arg*) updateArgs; // this *is* correct. Don't ask...
|
||||
*multiplexKey = (byte*) (arg+1);
|
||||
*multiplexKeySize = arg->keySize;
|
||||
}
|
||||
case OPERATION_LINEAR_HASH_REMOVE:
|
||||
{
|
||||
linearHash_insert_arg * arg = (linearHash_insert_arg*)updateArgs; // this *is* correct. Don't ask....
|
||||
*multiplexKey = (byte*) (arg + 1);
|
||||
*multiplexKeySize = arg->keySize;
|
||||
}
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer. */
|
||||
lladdConsumer_t * fifoPool_getConsumerCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) {
|
||||
int memberId = crc32(multiplexKey, multiplexKeySize, (unsigned long)-1L) % pool->fifoCount;
|
||||
return pool->pool[memberId]->consumer;
|
||||
}
|
||||
|
||||
/**
|
||||
Create a new pool of ringBuffer based fifos
|
||||
|
||||
@param consumerCount the number of consumers in the pool.
|
||||
@todo this function should be generalized to other consumer implementations.
|
||||
*/
|
||||
lladdFifoPool_t * fifoPool_ringBufferInit (int consumerCount, int bufferSize) {
|
||||
lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t));
|
||||
|
||||
pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount);
|
||||
int i;
|
||||
for(i = 0; i < consumerCount; i++) {
|
||||
pool->pool[i] = logMemory_init(bufferSize, 0);
|
||||
}
|
||||
return pool;
|
||||
}
|
Loading…
Reference in a new issue