now spawns multipler cleaner threads, and checks both for duplicated log entries and dropped log entries.
This commit is contained in:
parent
bf4be359b3
commit
3812f36e6b
1 changed files with 78 additions and 10 deletions
|
@ -51,15 +51,17 @@ terms specified in this license.
|
||||||
#include "../check_includes.h"
|
#include "../check_includes.h"
|
||||||
|
|
||||||
#include <lladd/multiplexer.h>
|
#include <lladd/multiplexer.h>
|
||||||
|
#include "../../src/lladd/logger/logMemory.h"
|
||||||
|
//#include <lladd/fifo.h>
|
||||||
|
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
|
||||||
#define LOG_NAME "check_iterator.log"
|
#define LOG_NAME "check_iterator.log"
|
||||||
|
|
||||||
#define NUM_BYTES_IN_FIFO 100
|
#define NUM_BYTES_IN_FIFO 1000
|
||||||
#define NUM_INSERTS 100000
|
#define NUM_INSERTS 100000
|
||||||
#define NUM_THREADS 10000
|
#define NUM_THREADS 500
|
||||||
|
|
||||||
int * array;
|
int * array;
|
||||||
|
|
||||||
|
@ -94,6 +96,62 @@ static void * go( void * arg) {
|
||||||
|
|
||||||
Titerator_tupleDone(-1, it);
|
Titerator_tupleDone(-1, it);
|
||||||
|
|
||||||
|
}
|
||||||
|
Titerator_close(-1, it);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
static void * trygo( void * arg) {
|
||||||
|
lladdIterator_t * it = (lladdIterator_t *) arg;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&mutex);
|
||||||
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
|
int itRet = 0;
|
||||||
|
while((itRet = Titerator_tryNext(-1, it))) {
|
||||||
|
byte * key, * value;
|
||||||
|
int keySize, valueSize;
|
||||||
|
|
||||||
|
keySize = Titerator_key (-1, it, &key);
|
||||||
|
valueSize = Titerator_value(-1, it, &value);
|
||||||
|
|
||||||
|
assert(keySize == sizeof(lsn_t));
|
||||||
|
LogEntry * e = (LogEntry*)value;
|
||||||
|
linearHash_remove_arg * arg = (linearHash_remove_arg*)getUpdateArgs(e);
|
||||||
|
|
||||||
|
assert(arg->keySize == sizeof(int));
|
||||||
|
assert(arg->valueSize == sizeof(char));
|
||||||
|
|
||||||
|
int i = *(int*)(arg+1);
|
||||||
|
array[i]++;
|
||||||
|
assert(array[i] == 1);
|
||||||
|
|
||||||
|
Titerator_tupleDone(-1, it);
|
||||||
|
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void * go2( void * arg) {
|
||||||
|
lladdIterator_t * it = (lladdIterator_t *) arg;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&mutex);
|
||||||
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
|
int itRet = 0;
|
||||||
|
while((itRet = Titerator_next(-1, it))) {
|
||||||
|
|
||||||
|
lladdFifo_t ** dirtyFifo_ptr;
|
||||||
|
lladdFifo_t * dirtyFifo;
|
||||||
|
int dirtyFifoSize = Titerator_value(-1, it, (byte**)&dirtyFifo_ptr);
|
||||||
|
|
||||||
|
dirtyFifo = * dirtyFifo_ptr;
|
||||||
|
|
||||||
|
assert(dirtyFifoSize == sizeof(lladdFifo_t *));
|
||||||
|
|
||||||
|
Titerator_tupleDone(-1, it);
|
||||||
|
|
||||||
|
trygo(dirtyFifo->iterator);
|
||||||
|
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -135,13 +193,11 @@ START_TEST(multiplexTest) {
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
lladdIterator_t * it = ThashGenericIterator(xid, hash);
|
lladdIterator_t * it = ThashGenericIterator(xid, hash);
|
||||||
lladdFifoPool_t * fifoPool = fifoPool_ringBufferInit(NUM_THREADS, NUM_BYTES_IN_FIFO);
|
lladdFifo_t * dirtyFifos = logMemoryFifo((int)(((double)NUM_INSERTS) * 0.5), 0); // 8 bytes of memory used per queued request.
|
||||||
|
lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(NUM_THREADS, NUM_BYTES_IN_FIFO, NULL, dirtyFifos);
|
||||||
|
|
||||||
|
|
||||||
lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, it,
|
lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, it,
|
||||||
&multiplexHashLogByKey,
|
&multiplexHashLogByKey,
|
||||||
&fifoPool_getConsumerCRC32,
|
|
||||||
fifoPool);
|
fifoPool);
|
||||||
|
|
||||||
|
|
||||||
|
@ -165,16 +221,23 @@ START_TEST(multiplexTest) {
|
||||||
|
|
||||||
pthread_t * workers = malloc(sizeof(pthread_t) * fifoPool->fifoCount);
|
pthread_t * workers = malloc(sizeof(pthread_t) * fifoPool->fifoCount);
|
||||||
|
|
||||||
for(i = 0 ; i < fifoPool->fifoCount; i++) {
|
for(i = 0 ; i < fifoPool->fifoCount; i+=2) {
|
||||||
lladdConsumer_t * consumer = fifoPool->pool[i]->consumer;
|
// lladdConsumer_t * consumer = fifoPool->pool[i]->consumer;
|
||||||
|
lladdIterator_t * iterator = fifoPool->pool[i]->iterator;
|
||||||
|
|
||||||
// printf("%d ", i);
|
// printf("%d ", i);
|
||||||
|
|
||||||
pthread_create(&workers[i], &attr, go, consumer);
|
pthread_create(&workers[i], &attr, go, iterator);
|
||||||
|
pthread_create(&workers[i+1], &attr, go2, dirtyFifos->iterator);
|
||||||
|
|
||||||
}
|
}
|
||||||
// printf("<-(%d)", fifoPool->fifoCount); fflush(stdout);
|
// printf("<-(%d)", fifoPool->fifoCount); fflush(stdout);
|
||||||
|
|
||||||
|
// This thread runs down the dirty list, consuming idle fifos' contents in
|
||||||
|
// case other threads are busy, or not interested.
|
||||||
|
|
||||||
|
// pthread_create(&cleaner, &attr, go2, dirtyFifos->iterator);
|
||||||
|
|
||||||
pthread_mutex_unlock(&mutex);
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
lladdMultiplexer_join(mux);
|
lladdMultiplexer_join(mux);
|
||||||
|
@ -183,6 +246,11 @@ START_TEST(multiplexTest) {
|
||||||
pthread_join(workers[i], NULL);
|
pthread_join(workers[i], NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for(i = 0; i < NUM_INSERTS; i++) {
|
||||||
|
assert(array[i] == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// pthread_join(cleaner, NULL);
|
||||||
|
|
||||||
} END_TEST
|
} END_TEST
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue