multiplexer and fifo are now working. (also added removed some timeouts from tests that were timing out)

This commit is contained in:
Sears Russell 2005-03-15 05:30:57 +00:00
parent ed9b978bda
commit dffff067bb
15 changed files with 335 additions and 119 deletions

View file

@ -513,7 +513,7 @@ retry: /* Begin the transaction. */
exit (1);
}
/* Delete any previously existing item. */
/* Delete any previously existing item -- LLADD always does this during insert.*/
switch (ret = db->del(db, tid, &key, 0)) {
case 0:
case DB_NOTFOUND:
@ -580,7 +580,7 @@ retry: /* Begin the transaction. */
void
run_xact(DB_ENV *dbenv, DB *db, int offset, int count)
{
va_list ap;
// va_list ap;
DBC *dbc;
DBT key, data;
DB_TXN *tid;
@ -633,35 +633,55 @@ retry: /* Begin the transaction. */
for(q = offset; q < offset + count; q++) {
keyPtr = q;
valPtr = q;
switch (ret = db->del(db, tid, &key, 0)) {
case 0:
abort(); // we dont insert dups in this test!
case DB_NOTFOUND:
break;
case DB_LOCK_DEADLOCK:
/* Deadlock: retry the operation. */
abort(); // the lock manager should be disabled for this test...
if ((ret = tid->abort(tid)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->abort");
exit (1);
}
goto retry;
default:
dbenv->err(dbenv, ret, "db->del: %d", q);
exit (1);
}
/* data.data = s;
data.size = strlen(s); */
// printf("A"); fflush(NULL);
switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) {
case 0:
// printf("B"); fflush(NULL);
break;
case DB_LOCK_DEADLOCK:
va_end(ap);
/* Deadlock: retry the operation. */
if ((ret = dbc->c_close(dbc)) != 0) {
dbenv->err(
dbenv, ret, "dbc->c_close");
exit (1);
}
if ((ret = tid->abort(tid)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->abort");
exit (1);
}
goto retry;
default:
/* Error: run recovery. */
dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q);
exit (1);
}
switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) {
case 0:
// printf("B"); fflush(NULL);
break;
case DB_LOCK_DEADLOCK:
// va_end(ap);
/* Deadlock: retry the operation. */
if ((ret = dbc->c_close(dbc)) != 0) {
dbenv->err(
dbenv, ret, "dbc->c_close");
exit (1);
}
if ((ret = tid->abort(tid)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->abort");
exit (1);
}
goto retry;
default:
/* Error: run recovery. */
dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q);
exit (1);
}
}
va_end(ap);
// va_end(ap);
/* Success: commit the change. */
if ((ret = dbc->c_close(dbc)) != 0) {
dbenv->err(dbenv, ret, "dbc->c_close");

View file

@ -20,11 +20,11 @@
int activeThreads = 0;
int max_active = 0;
/*
double avg_var = 0;
double max_var = 0;
double avg_mean = 0;
double max_mean = 0;
double max_mean = 0;*/
pthread_mutex_t mutex;
@ -46,18 +46,18 @@ static void * go (void * arg_ptr) {
int j;
int xid = Tbegin();
double sum_x_squared = 0;
double sum = 0;
// double sum_x_squared = 0;
// double sum = 0;
for(j = k * count; j < (k+1) *(count) ; j++) {
struct timeval start, endtime;
// struct timeval start, endtime;
gettimeofday(&start, NULL);
// gettimeofday(&start, NULL);
ThashInsert(xid, hash, (byte*)&j, sizeof(int), (byte*)&j, sizeof(int));
gettimeofday(&endtime, NULL);
/* gettimeofday(&endtime, NULL);
double microSecondsPassed = 1000000 * (endtime.tv_sec - start.tv_sec);
@ -66,7 +66,7 @@ static void * go (void * arg_ptr) {
sum += microSecondsPassed;
sum_x_squared += (microSecondsPassed * microSecondsPassed) ;
*/
// printf("(%d)", k);
}
@ -79,9 +79,9 @@ static void * go (void * arg_ptr) {
assert(j == tmp);
} */
double count_d = count;
double mean = sum / count_d;
double variance = sqrt((sum_x_squared / count_d) - (mean * mean));
// double count_d = count;
// double mean = sum / count_d;
// double variance = sqrt((sum_x_squared / count_d) - (mean * mean));
@ -90,11 +90,11 @@ static void * go (void * arg_ptr) {
pthread_mutex_lock(&mutex);
activeThreads--;
avg_mean += mean;
/* avg_mean += mean;
avg_var += variance;
if(mean > max_mean ) { max_mean = mean; }
if(variance > max_var) { max_var = variance; }
if(variance > max_var) { max_var = variance; } */
pthread_mutex_unlock(&mutex);
@ -150,10 +150,10 @@ int main(int argc, char** argv) {
}
printf("mean: (max, avg) %f, %f\n", max_mean, avg_mean / (double)thread_count);
/* printf("mean: (max, avg) %f, %f\n", max_mean, avg_mean / (double)thread_count);
printf("variance: (max, avg) %f, %f\n", max_var, avg_var / (double)thread_count);
*/
Tdeinit();
}

View file

@ -1,7 +1,7 @@
#include <lladd/transactional.h>
#ifndef __CONSUMER_H
#define __ITERATOR_H
#define __CONSUMER_H
#define MAX_ITERATOR_TYPES 10
#define FIFO_CONSUMER 0

View file

@ -14,7 +14,7 @@ typedef struct {
int (*next) (int xid, void * it);
int (*key) (int xid, void * it, byte ** key);
int (*value)(int xid, void * it, byte ** value);
int (*releaseTuple)(int xid, void * it);
void (*tupleDone)(int xid, void * it);
} lladdIterator_def_t;
typedef struct {
@ -79,6 +79,12 @@ int Titerator_key(int xid, lladdIterator_t * it, byte ** key);
@see lladdIterator_key.
*/
int Titerator_value(int xid, lladdIterator_t * it, byte ** value);
/**
Iterator callers must call this before calling next(). A seperate
call is required so that iterators can be reentrant. (Warning: Not
all iterators are reentrant.)
*/
void Titerator_tupleDone(int xid, lladdIterator_t * it);
#endif

View file

@ -1,22 +1,10 @@
#include "iterator.h"
#include "consumer.h"
#include "fifo.h"
#include <pbl/pbl.h>
/**
A multiplexer takes an iterator, and splits its output into multiple consumers.
*/
typedef struct {
lladdIterator_t *iterator;
lladdConsumer_t *consumer;
} lladdFifo_t;
typedef struct lladdFifoPool_t {
lladdFifo_t ** pool;
lladdConsumer_t * (*getConsumer)(struct lladdFifoPool_t * pool,
byte * multiplexKey,
size_t multiplexKeySize);
int fifoCount;
} lladdFifoPool_t;
void multiplexHashLogByKey(byte * key,

View file

@ -6,3 +6,5 @@ ringBufferLog_t * openLogRingBuffer(size_t size, lsn_t initialOffset);
void closeLogRingBuffer(ringBufferLog_t * log);
int ringBufferAppend(ringBufferLog_t * log, byte * dat, size_t size);
int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log,size_t size);
lsn_t ringBufferAppendPosition(ringBufferLog_t * log);
lsn_t ringBufferReadPosition(ringBufferLog_t * log);

View file

@ -4,7 +4,7 @@
#include <lladd/operations.h>
#include <lladd/arrayCollection.h>
#include "logger/logMemory.h"
static lladdIterator_def_t iterators[MAX_ITERATOR_TYPES];
@ -12,17 +12,21 @@ static void lladdIterator_register(int type, lladdIterator_def_t info) {
assert(type < MAX_ITERATOR_TYPES);
iterators[type] = info;
}
static void noopTupDone(int xid, void * foo) { }
void iterator_init() {
lladdIterator_def_t linearHashNTA_def = {
linearHashNTAIterator_close, linearHashNTAIterator_next, linearHashNTAIterator_key, linearHashNTAIterator_value
linearHashNTAIterator_close, linearHashNTAIterator_next, linearHashNTAIterator_key, linearHashNTAIterator_value, noopTupDone
};
lladdIterator_register(LINEAR_HASH_NTA_ITERATOR, linearHashNTA_def);
lladdIterator_def_t array_def = {
arrayIterator_close, arrayIterator_next, arrayIterator_key, arrayIterator_value
arrayIterator_close, arrayIterator_next, arrayIterator_key, arrayIterator_value, noopTupDone
};
lladdIterator_register(ARRAY_ITERATOR, array_def);
lladdIterator_def_t logMemory_def = {
logMemory_Iterator_close, logMemory_Iterator_next, logMemory_Iterator_key, logMemory_Iterator_value, noopTupDone
};
lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def);
}
@ -31,3 +35,4 @@ void Titerator_close(int xid, lladdIterator_t * it) { itera
int Titerator_next (int xid, lladdIterator_t * it) { return iterators[it->type].next (xid, it->impl); }
int Titerator_key (int xid, lladdIterator_t * it, byte ** key) { return iterators[it->type].key (xid, it->impl, key); }
int Titerator_value(int xid, lladdIterator_t * it, byte ** value){ return iterators[it->type].value(xid, it->impl, value); }
void Titerator_tupleDone(int xid, lladdIterator_t * it) { iterators[it->type].tupleDone(xid, it->impl); }

View file

@ -4,26 +4,23 @@
#include <stdlib.h>
#include <lladd/ringbuffer.h>
#include <lladd/consumer.h>
#include <lladd/iterator.h>
#include <assert.h>
#include "logMemory.h"
#include <lladd/compensations.h>
typedef struct {
//mutex?
pthread_mutex_t mutex;
pthread_cond_t readReady;
pthread_cond_t writeReady;
ringBufferLog_t * ringBuffer;
lsn_t cached_lsn;
byte * cached_value;
size_t cached_value_size;
lsn_t eof;
} logMemory_fifo_t;
typedef struct {
lladdIterator_t *iterator;
lladdConsumer_t *consumer;
} lladdFifo_t;
void logMemory_init() {
/* NO-OP */
@ -37,14 +34,20 @@ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) {
iterator->type = LOG_MEMORY_ITERATOR;
iterator->impl = malloc(sizeof(logMemory_fifo_t));
((logMemory_fifo_t *)iterator->impl)->ringBuffer = openLogRingBuffer(size, initialOffset);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->mutex), NULL);
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->readReady), NULL);
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->writeReady), NULL);
((logMemory_fifo_t *)iterator->impl)->cached_value = NULL;
((logMemory_fifo_t *)iterator->impl)->eof = -1;
lladdConsumer_t * consumer = (lladdConsumer_t *) malloc(sizeof(lladdConsumer_t));
consumer->type = LOG_MEMORY_CONSUMER;
consumer->impl = iterator->impl; /* FIXME: same logMemory_iterator_t as iterator?*/
consumer->impl = iterator->impl;
fifo->iterator = iterator;
fifo->consumer = consumer;
return fifo;
}
@ -61,26 +64,65 @@ void logMemory_Iterator_close(int xid, void * impl) {
free(impl);
}
int logMemory_Iterator_next (int xid, void * impl) {
compensated_function int logMemory_Iterator_next (int xid, void * impl) {
logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl;
pthread_mutex_lock(&(fifo->mutex));
size_t size;
int lsn;
int ret;
ret = ringBufferTruncateRead((byte *)&size, fifo->ringBuffer, sizeof(size_t) );
if (ret == 0) { /* NOTE: I assume that ringBufferTruncateRead returns 0 when a read is successfull. */
/* TODO: the following might return null, in which case we should ... ? */
fifo->cached_value = realloc(fifo->cached_value, size);
lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size);
fifo->cached_lsn = (lsn_t)lsn;
return 1; /* FIXME: is this the right return value if there is a next value? */
} else {
return 0; /* FIXME: is this the right return value when there is no next value? */
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
pthread_mutex_unlock(&(fifo->mutex));
return 0;
}
// TODO Check to see if we're done reading...
while(-2 == (ret = ringBufferTruncateRead((byte *)&size, fifo->ringBuffer, sizeof(size_t)))) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
pthread_mutex_unlock(&(fifo->mutex));
return 0;
}
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
byte * tmp;
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
return LLADD_INTERNAL_ERROR;
}
fifo->cached_value = tmp;
fifo->cached_value_size = size;
while(-2 == (lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size))) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
fifo->cached_lsn = (lsn_t)lsn;
pthread_cond_broadcast(&(fifo->writeReady));
pthread_mutex_unlock(&(fifo->mutex));
return 1;
}
/* return the lsn */
@ -108,10 +150,45 @@ int logMemory_Iterator_releaseTuple(int xid, void *it) {
void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it){
/* Currently this doesn't have to do anything */
/* This needs to tell the iterator where the end of the ring buffer is. */
logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it->impl);
pthread_mutex_lock(&(fifo->mutex));
fifo->eof = ringBufferAppendPosition(fifo->ringBuffer);
assert(fifo->eof != -1);
pthread_cond_broadcast(&(fifo->readReady)); // There may have been threads waiting on the next tuple before close was called.
pthread_mutex_unlock(&(fifo->mutex));
}
int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize) {
ringBufferAppend( ((logMemory_fifo_t *) it->impl)->ringBuffer, (byte *)&Valsize, sizeof(size_t) );
ringBufferAppend( ((logMemory_fifo_t *) it->impl)->ringBuffer, val, Valsize);
compensated_function void Tconsumer_close(int xid, lladdConsumer_t * cons) {
logMemory_Tconsumer_close(xid, cons);
}
compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize) {
int ret;
logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it->impl);
pthread_mutex_lock(&(fifo->mutex));
while(-2 == (ret = ringBufferAppend(fifo->ringBuffer,
(byte *)&Valsize,
sizeof(size_t) ))) {
pthread_cond_wait(&(fifo->writeReady), &(fifo->mutex));
}
if(ret == -1) { // asked to append something longer than the buffer!
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}
while(-2 == ringBufferAppend( ((logMemory_fifo_t *) it->impl)->ringBuffer, val, Valsize)) {
pthread_cond_wait(&(fifo->writeReady), &(fifo->mutex));
}
if(ret == -1) { // asked to append something longer than the buffer!
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}
pthread_cond_broadcast(&(fifo->readReady));
pthread_mutex_unlock(&(fifo->mutex));
return ret;
// always succeeds.
}

View file

@ -1,11 +1,18 @@
#include <lladd/transactional.h>
#include <lladd/ringbuffer.h>
#include <lladd/consumer.h>
#include <lladd/iterator.h>
#include <lladd/fifo.h>
typedef struct ringBufferLog_s ringBufferLog_t;
#ifndef __LOGMEMORY_H
#define __LOGMEMORY_H
/*typedef struct ringBufferLog_s ringBufferLog_t;
ringBufferLog_t * openLogRingBuffer(size_t size, lsn_t initialOffset);
void closeLogRingBuffer(ringBufferLog_t * log);
int ringBufferAppend(ringBufferLog_t * log, byte * dat, size_t size);
int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log,size_t size);
int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log,size_t size); */
// int writeLogEntry(LogEntry * e); (as implemented by ring buffer, but need to remember size of entry too
@ -21,4 +28,17 @@ int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log,size_t size);
// deleteLogWriter no-op
lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset);
// LogEntry* readLSNEntry(lsn_t) if LSN is the next available log entry, return it, else error.
void logMemory_Tconsumer_close(int xid, lladdConsumer_t *it);
compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t Valsize);
void logMemory_Iterator_close(int xid, void * impl);
compensated_function int logMemory_Iterator_next (int xid, void * impl);
int logMemory_Iterator_key (int xid, void * impl, byte ** key);
int logMemory_Iterator_value (int xid, void * impl, byte ** value);
int logMemory_Iterator_releaseTuple(int xid, void *it);
#endif

View file

@ -2,6 +2,9 @@
#include <lladd/crc32.h>
#include <stdlib.h>
#include <lladd/operations/linearHashNTA.h>
#include "logger/logMemory.h"
lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it,
void (*multiplexer)(byte * key,
size_t keySize,
@ -37,7 +40,8 @@ int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer) {
void * multiplexer_worker(void * arg) {
lladdMultiplexer_t * m = arg;
lladdConsumer_t * consumer;
while(Titerator_next(m->xid, m->it)) {
byte * mkey, * key, * value;
size_t mkeySize, keySize, valueSize;
@ -47,7 +51,8 @@ void * multiplexer_worker(void * arg) {
m->multiplexer(key, keySize, value, valueSize, &mkey, &mkeySize);
lladdConsumer_t * consumer = m->getConsumer(m->getConsumerArg, mkey, mkeySize);
consumer = m->getConsumer(m->getConsumerArg, mkey, mkeySize);
/* lladdConsumer_t * consumer = pblHtLookup(m->consumerHash);
if(consumer == NULL) {
@ -63,6 +68,12 @@ void * multiplexer_worker(void * arg) {
Titerator_close(m->xid, m->it);
lladdFifoPool_t * pool = m->getConsumerArg;
int i;
for(i = 0; i < pool->fifoCount; i++) {
Tconsumer_close(m->xid, pool->pool[i]->consumer);
}
return (void*)compensation_error();
}
@ -70,16 +81,7 @@ void * multiplexer_worker(void * arg) {
Sample callbacks follow.
*********************************************************************/
// @todo remove the code until the //-----, as it just makes up for code that Jimmy needs to commit!
lladdFifo_t * logMemory_init(int bufferSize, int initialLSN);
//---------
*/
void multiplexHashLogByKey(byte * key,
size_t keySize,
byte * value,
@ -105,12 +107,14 @@ void multiplexHashLogByKey(byte * key,
*multiplexKey = (byte*) (arg+1);
*multiplexKeySize = arg->keySize;
}
break;
case OPERATION_LINEAR_HASH_REMOVE:
{
linearHash_insert_arg * arg = (linearHash_insert_arg*)updateArgs; // this *is* correct. Don't ask....
*multiplexKey = (byte*) (arg + 1);
*multiplexKeySize = arg->keySize;
}
break;
default:
abort();
}
@ -123,6 +127,7 @@ lladdConsumer_t * fifoPool_getConsumerCRC32( lladdFifoPool_t * pool, byte * mult
return pool->pool[memberId]->consumer;
}
/**
Create a new pool of ringBuffer based fifos
@ -135,7 +140,8 @@ lladdFifoPool_t * fifoPool_ringBufferInit (int consumerCount, int bufferSize) {
pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount);
int i;
for(i = 0; i < consumerCount; i++) {
pool->pool[i] = logMemory_init(bufferSize, 0);
pool->pool[i] = logMemoryFifo(bufferSize, 0);
}
pool->fifoCount = consumerCount;
return pool;
}

View file

@ -106,6 +106,14 @@ int ringBufferAppend(ringBufferLog_t * log, byte * dat, size_t size) {
}
lsn_t ringBufferAppendPosition(ringBufferLog_t * log) {
return log->end;
}
lsn_t ringBufferReadPosition(ringBufferLog_t * log) {
return log->start;
}
int ringBufferTruncateRead(byte * buf, ringBufferLog_t * log, size_t size) {
if(size > log->size) {
return -1; // Request for chunk larger than entire ringbuffer

View file

@ -9,4 +9,4 @@ endif
noinst_PROGRAMS = $(TESTS)
LDADD = $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a #-lefence
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log check_indirect.log check_bufferMananger.log check_lladdhash.log check_pageOperations.log check_linearhash.log check_linkedListNTA.log check_linearHashNTA.log check_pageOrientedListNTA.log check_lockManager.log check_compensations.log check_errorhandling.log check_header.logg check_iterator.log check_linearHash.log check_ringbuffer.log
AM_CFLAGS= -g -Wall -pedantic -std=gnu99
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -550,6 +550,9 @@ Suite * check_suite(void) {
Suite *s = suite_create("recovery_suite");
/* Begin a new test */
TCase *tc = tcase_create("recovery");
tcase_set_timeout(tc, 0); // disable timeouts
/* void * foobar; */ /* used to supress warnings. */
/* Sub tests are added, one per line, here */
tcase_add_test(tc, recoverBlob__idempotent);

View file

@ -39,7 +39,6 @@ authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include <lladd/transactional.h>
#include <pbl/pbl.h>
#include <stdlib.h>
@ -58,27 +57,96 @@ terms specified in this license.
#define LOG_NAME "check_iterator.log"
#define NUM_BYTES_IN_FIFO 100
#define NUM_INSERTS 100000
#define NUM_THREADS 10000
int * array;
static pthread_mutex_t mutex;
static pthread_cond_t never;
static void * go( void * arg) {
lladdIterator_t * it = (lladdIterator_t *) arg;
pthread_mutex_lock(&mutex);
pthread_mutex_unlock(&mutex);
int itRet = 0;
while((itRet = Titerator_next(-1, it))) {
byte * key, * value;
int keySize, valueSize;
keySize = Titerator_key (-1, it, &key);
valueSize = Titerator_value(-1, it, &value);
assert(keySize == sizeof(lsn_t));
LogEntry * e = (LogEntry*)value;
linearHash_remove_arg * arg = (linearHash_remove_arg*)getUpdateArgs(e);
assert(arg->keySize == sizeof(int));
assert(arg->valueSize == sizeof(char));
int i = *(int*)(arg+1);
array[i]++;
assert(array[i] == 1);
Titerator_tupleDone(-1, it);
}
return NULL;
}
/**
@test
*/
#define NUM_ENTRIES 10000
#define NUM_THREADS 100
START_TEST(multiplexTest) {
Tinit();
int xid = Tbegin();
recordid hash;
lladdIterator_t * it = ThashGenericIterator(xid, hash);
lladdFifoPool_t * fifoPool = fifoPool_ringBufferInit(NUM_THREADS, NUM_ENTRIES);
lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, it, &multiplexHashLogByKey, &fifoPool_getConsumerCRC32, fifoPool);
// now, read from fifos, checking to see if everything is well. (Need to spawn one thread per fifo.)
recordid hash = ThashCreate(xid, sizeof(int), VARIABLE_LENGTH);
linearHash_remove_arg * arg = malloc(sizeof(linearHash_remove_arg) + sizeof(int) + sizeof(char));
arg->keySize = sizeof(int);
arg->valueSize = sizeof(char);
int i;
array = (int*)calloc(NUM_INSERTS, sizeof(int));
for(i = 0; i < NUM_INSERTS; i++) {
(*(int*)(arg+1)) = i;
LogEntry * e = allocUpdateLogEntry(-1, -1, OPERATION_LINEAR_HASH_INSERT, NULLRID, (byte*)arg,
sizeof(linearHash_remove_arg) + sizeof(int) + sizeof(char), NULL);
ThashInsert(xid, hash, (byte*)&i, sizeof(int), (byte*)e, sizeofLogEntry(e));
free(e);
}
free(arg);
Tcommit(xid);
lladdIterator_t * it = ThashGenericIterator(xid, hash);
lladdFifoPool_t * fifoPool = fifoPool_ringBufferInit(NUM_THREADS, NUM_BYTES_IN_FIFO);
lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, it,
&multiplexHashLogByKey,
&fifoPool_getConsumerCRC32,
fifoPool);
// now, read from fifos, checking to see if everything is well. (Need to spawn one thread per fifo.)
/* threads have static thread sizes. Ughh. */
pthread_attr_t attr;
@ -91,20 +159,31 @@ START_TEST(multiplexTest) {
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
pthread_mutex_lock(&mutex);
lladdMultiplexer_start(mux, &attr);
// printf("->(%d)", fifoPool->fifoCount); fflush(stdout);
pthread_t * workers = malloc(sizeof(pthread_t) * fifoPool->fifoCount);
for(i = 0 ; i < fifoPool->fifoCount; i++) {
lladdConsumer_t * consumer = fifoPool->pool[i]->consumer;
// printf("%d ", i);
pthread_create(&workers[i], &attr, go, consumer);
}
// printf("<-(%d)", fifoPool->fifoCount); fflush(stdout);
pthread_mutex_unlock(&mutex);
lladdMultiplexer_join(mux);
for(i = 0; i < fifoPool->fifoCount; i++) {
pthread_join(&workers[i], NULL);
pthread_join(workers[i], NULL);
}
} END_TEST

View file

@ -495,6 +495,8 @@ Suite * check_suite(void) {
/* Begin a new test */
TCase *tc = tcase_create("recovery");
tcase_set_timeout(tc, 0); // disable timeouts
/* Sub tests are added, one per line, here */
tcase_add_test(tc, recovery_idempotent);
tcase_add_test(tc, recovery_exactlyOnce);