stasis-aries-wal/benchmarks/transitiveClosure.c
2005-03-24 11:10:44 +00:00

209 lines
4.8 KiB
C

#include <lladd/transactional.h>
#include <lladd/multiplexer.h>
#include <lladd/graph.h>
#include "../src/lladd/logger/logMemory.h"
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#define NUM_NODES 1000000
#define DIRTY_BUF_SIZE 20000000
#define OUTDEGREE 3
#define NUM_THREADS 1
#define NUM_FIFOS 5
#define NUM_ITER 2
int transClos_outdegree = OUTDEGREE;
/** This is a hack */
/*int countRecordsPerPage(int outDegree) {
Tinit();
int xid = Tbegin();
// Relies upon a bug in lladd that will always start allocation on a fresh page.
recordid rid = Talloc(xid, sizeof(int) * outDegree);
int oldPage = rid.page;
int count = 1;
while(oldPage == rid.page) {
Talloc(xid, sizeof(int) * outDegree);
count++;
}
Tdeinit(); // When LLADD is restarted, it will leak the last page we allocated a record on. This is the "hack".
return count;
}*/
int numOut = 0;
int numTset = 0;
int numShortcutted = 0;
int numSkipped = 0;
int numPushed = 0;
typedef struct {
lladdFifo_t * dirty;
lladdFifo_t * global;
lladdFifoPool_t * pool;
lladdMultiplexer_t * mux;
recordid rid;
int i;
} worker_arg;
int closed = 0;
pthread_mutex_t closed_mut = PTHREAD_MUTEX_INITIALIZER;
void * worker(void * arg_p) {
worker_arg * arg = (worker_arg*)arg_p;
lladdFifo_t * dirtyFifo = arg->dirty;
lladdFifo_t * globalFifo = arg->global;
recordid rid = arg->rid;
int i = arg->i;
int xid = Tbegin();
lladdMultiplexer_flush(arg->mux);
while(Titerator_next(xid, dirtyFifo->iterator)) {
lladdFifo_t ** localFifo_ptr;
lladdFifo_t * localFifo;
int size = Titerator_value(xid, dirtyFifo->iterator, (byte**) &localFifo_ptr);
localFifo = *localFifo_ptr;
assert(size == sizeof(lladdFifo_t*));
multiTraverse(xid, rid, localFifo, globalFifo, arg->pool, i+NUM_ITER);
Titerator_tupleDone(xid, dirtyFifo->iterator);
if(!numOut) { break ;}
lladdMultiplexer_flush(arg->mux);
}
pthread_mutex_lock(&closed_mut);
if(!closed) {
Tconsumer_close(xid, globalFifo->consumer);
closed = 1;
}
pthread_mutex_unlock(&closed_mut);
Tcommit(xid);
return NULL;
}
int main(int argc, char ** argv) {
srandom(0);
Tinit();
int xid = Tbegin();
// int lastPage = -1;
// int pageCount = 0;
// int max_fanout = 0;
/* for(i = 0; i < NUM_NODES; i++) {
recordid rid = Talloc(xid, sizeof(nodeHeader_t) + (sizeof(recordid) * max_fanout));
if(lastPage != rid.page && lastPage != -1) {
rids_per_page[lastPage] = pageCount;
lastPage = rid.page;
pageCount = 0;
}
pageCount ++;
}
*/
recordid rid = TarrayListAlloc(xid, NUM_NODES / 100, 2, sizeof(long) * (OUTDEGREE+1));
TarrayListExtend(xid, rid, NUM_NODES);
int * node = malloc(sizeof(int) * (OUTDEGREE+1)); // the last long in the node holds flags.
int i, j;
for(i = 0; i < NUM_NODES; i++) {
node[OUTDEGREE] = 0;
#ifdef RING
node[0] = (i+1) % NUM_NODES;
abort();
for(j = 1; j < OUTDEGREE; j++) {
#else
for(j = 0; j < OUTDEGREE; j++) {
#endif
node[j] = (int)( ((double)NUM_NODES) * (double)random()/((double)RAND_MAX+1.0));
// printf("%d ", node[j]);
}
rid.slot = i;
Tset(xid, rid, node);
}
printf("Nodes loaded.\n");
fflush(stdout);
rid.slot = 0;
system("date");
numTset = 0;
for(i = 1; i <= NUM_ITER; i++) {
naiveTraverse(xid, rid, i);
}
system("date");
printf("TransClos returned Tset called %d times\n", numTset);
fflush(stdout);
numTset = 0;
numPushed = 0;
Tcommit(xid);
for(i = 1; i <= NUM_ITER; i++) {
lladdFifo_t * dirtyFifo = logMemoryFifo(DIRTY_BUF_SIZE, 0);
lladdFifoPool_t * pool = lladdFifoPool_pointerPoolInit(NUM_FIFOS, DIRTY_BUF_SIZE, lladdFifoPool_getFifoCRC32, dirtyFifo);
lladdFifo_t * globalFifo = logMemoryFifo(DIRTY_BUF_SIZE, /*NUM_FIFOS*/ 0);
lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, globalFifo->iterator, &multiplexByRidPage, pool);
// lladdMultiplexer_start(mux, NULL);
rid.slot =0;
Tconsumer_push(xid, globalFifo->consumer, NULL, 0, (byte*)&rid, sizeof(recordid));
numOut = 1;
pthread_t * workers = malloc(sizeof(pthread_t) * NUM_THREADS);
int j;
worker_arg * arg = malloc(sizeof(worker_arg));
arg->dirty = dirtyFifo;
arg->global = globalFifo;
arg->pool = pool;
arg->rid = rid;
arg->mux = mux;
arg->i = i;
for(j = 0; j < NUM_THREADS; j++) {
pthread_create(&workers[j], NULL, worker, arg);
}
for(j = 0; j < NUM_THREADS; j++) {
pthread_join(workers[j], NULL);
}
// lladdMultiplexer_join(mux);
closed = 0;
}
system("date");
printf("FastTransClos returned Tset called %d times, shortcutted %d, skipped %d pushed %d\n", numTset, numShortcutted, numSkipped, numPushed);
Tdeinit();
}