diff --git a/benchmarks/transitiveClosure.c b/benchmarks/transitiveClosure.c index 278e363..386dd96 100644 --- a/benchmarks/transitiveClosure.c +++ b/benchmarks/transitiveClosure.c @@ -5,15 +5,52 @@ #include #include #include + +#include +#include + + #define NUM_NODES 1000000 -#define DIRTY_BUF_SIZE 20000000 -#define OUTDEGREE 3 +#define DIRTY_BUF_SIZE 60000000 +//#define OUTDGREE 3 #define NUM_THREADS 1 #define NUM_FIFOS 5 -#define NUM_ITER 2 +#define NUM_ITER 1 + +int hotSet(int j, int HotSetProb) { + int hotset_size = (NUM_NODES / 10); + int coldset_size = NUM_NODES - hotset_size; + int id; + int p = 1 + (int)(100.0 * random() / (RAND_MAX + 1.0)); + + if (p <= HotSetProb) { + // printf("a "); + id = (int)(((double)hotset_size) * random() / + (RAND_MAX + 1.0)); + } else { + // printf("b "); + id = hotset_size + (int)(((double)coldset_size) * + random() / (RAND_MAX + 1.0)); + } + id += j; + id -= (hotset_size / 2); + // printf("%d ", id); + if(id < 0) { id += NUM_NODES; } + id %= NUM_NODES; + return id; +} + +lladdFifo_t * transClos_getFifoLocal(lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) { + recordid * rid; + assert(multiplexKeySize == sizeof(rid->page)); + + + long page = ((recordid*)multiplexKey)->page; + + return pool->pool[page % pool->fifoCount]; +} -int transClos_outdegree = OUTDEGREE; /** This is a hack */ /*int countRecordsPerPage(int outDegree) { @@ -37,6 +74,9 @@ int numTset = 0; int numShortcutted = 0; int numSkipped = 0; int numPushed = 0; +int useCRC = 0; +int OUTDEGREE = 0; +int transClos_outdegree = 0; typedef struct { lladdFifo_t * dirty; @@ -54,30 +94,32 @@ pthread_mutex_t closed_mut = PTHREAD_MUTEX_INITIALIZER; void * worker(void * arg_p) { worker_arg * arg = (worker_arg*)arg_p; - lladdFifo_t * dirtyFifo = arg->dirty; + // lladdFifo_t * dirtyFifo = arg->dirty; lladdFifo_t * globalFifo = arg->global; recordid rid = arg->rid; int i = arg->i; int xid = Tbegin(); lladdMultiplexer_flush(arg->mux); - - while(Titerator_next(xid, dirtyFifo->iterator)) { - + int count = 0; + // while(Titerator_next(xid, dirtyFifo->iterator)) { + while(1) { - lladdFifo_t ** localFifo_ptr; + + + // lladdFifo_t ** localFifo_ptr; lladdFifo_t * localFifo; - int size = Titerator_value(xid, dirtyFifo->iterator, (byte**) &localFifo_ptr); + // int size = Titerator_value(xid, dirtyFifo->iterator, (byte**) &localFifo_ptr); - localFifo = *localFifo_ptr; + localFifo = arg->pool->pool[count % arg->pool->fifoCount]; //*localFifo_ptr; + count++; + // assert(size == sizeof(lladdFifo_t*)); - assert(size == sizeof(lladdFifo_t*)); - - multiTraverse(xid, rid, localFifo, globalFifo, arg->pool, i+NUM_ITER); + multiTraverse(xid, rid, localFifo, globalFifo, arg->pool, i/*+NUM_ITER*/); - Titerator_tupleDone(xid, dirtyFifo->iterator); + // Titerator_tupleDone(xid, dirtyFifo->iterator); if(!numOut) { break ;} @@ -92,8 +134,6 @@ void * worker(void * arg_p) { } pthread_mutex_unlock(&closed_mut); - - Tcommit(xid); return NULL; } @@ -101,11 +141,33 @@ void * worker(void * arg_p) { int main(int argc, char ** argv) { - srandom(0); Tinit(); int xid = Tbegin(); + + assert(argc == 8); + int naive = atoi(argv[1]); + useCRC = atoi(argv[2]); + int ring = atoi(argv[3]); + transClos_outdegree = OUTDEGREE = atoi(argv[4]); + int hot = atoi(argv[5]); + int seed = atoi(argv[6]); + if(seed == -1) { + struct timeval tv; + gettimeofday(&tv); + srandom(tv.usec); + if(!silent) { + printf("Seed: %d\n", tv.usec); + } + } else { + srandom(seed); + } + int silent = atoi(argv[7]); + if(!silent) { + printf("naive=%d crc=%d ring=%d outdegree=%d hot=%d seed=%d\n", naive, useCRC, ring, OUTDEGREE, hot, seed); + assert(!hot || !ring); + } // int lastPage = -1; // int pageCount = 0; @@ -130,80 +192,108 @@ int main(int argc, char ** argv) { int i, j; for(i = 0; i < NUM_NODES; i++) { node[OUTDEGREE] = 0; -#ifdef RING - node[0] = (i+1) % NUM_NODES; - abort(); - for(j = 1; j < OUTDEGREE; j++) { -#else - for(j = 0; j < OUTDEGREE; j++) { -#endif - node[j] = (int)( ((double)NUM_NODES) * (double)random()/((double)RAND_MAX+1.0)); + if(ring) { + node[0] = (i+1) % NUM_NODES; + for(j = 1; j < OUTDEGREE; j++) { + if(hot) { + node[j] = hotSet(j, hot); + } else { + node[j] = (int)( ((double)NUM_NODES) * (double)random()/((double)RAND_MAX+1.0)); + } + } // printf("%d ", node[j]); + } else { + for(j = 0; j < OUTDEGREE; j++) { + if(hot) { + node[j] = hotSet(j, hot); + } else { + node[j] = (int)( ((double)NUM_NODES) * (double)random()/((double)RAND_MAX+1.0)); + } + } } rid.slot = i; Tset(xid, rid, node); } - printf("Nodes loaded.\n"); - fflush(stdout); - + Tcommit(xid); + if(!silent) { + printf("Nodes loaded.\n"); + fflush(stdout); + system("date"); + } rid.slot = 0; - system("date"); + struct timeval start, stop; + + gettimeofday(&start, NULL); numTset = 0; - for(i = 1; i <= NUM_ITER; i++) { - - naiveTraverse(xid, rid, i); + xid = Tbegin(); + if(naive) { + for(i = 1; i <= NUM_ITER; i++) { + + naiveTraverse(xid, rid, i); + } + if(!silent) { + system("date"); + + printf("TransClos returned Tset called %d times\n", numTset); + fflush(stdout); + } + numTset = 0; + numPushed = 0; + } else { + + for(i = 1; i <= NUM_ITER; i++) { + // lladdFifo_t * dirtyFifo = logMemoryFifo(DIRTY_BUF_SIZE, 0); + lladdFifoPool_t * pool = lladdFifoPool_pointerPoolInit(NUM_FIFOS, DIRTY_BUF_SIZE, + useCRC ? lladdFifoPool_getFifoCRC32 : transClos_getFifoLocal + , NULL/*, dirtyFifo*/); + + lladdFifo_t * globalFifo = logMemoryFifo(DIRTY_BUF_SIZE, /*NUM_FIFOS*/ 0); + lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, globalFifo->iterator, &multiplexByRidPage, pool); + + // lladdMultiplexer_start(mux, NULL); + rid.slot =0; + Tconsumer_push(xid, globalFifo->consumer, NULL, 0, (byte*)&rid, sizeof(recordid)); + numOut = 1; + + pthread_t * workers = malloc(sizeof(pthread_t) * NUM_THREADS); + int j; + + worker_arg * arg = malloc(sizeof(worker_arg)); + // arg->dirty = dirtyFifo; + arg->global = globalFifo; + arg->pool = pool; + arg->rid = rid; + arg->mux = mux; + arg->i = i; + for(j = 0; j < NUM_THREADS; j++) { + pthread_create(&workers[j], NULL, worker, arg); + } + for(j = 0; j < NUM_THREADS; j++) { + pthread_join(workers[j], NULL); + } + // lladdMultiplexer_join(mux); + + closed = 0; + + } + if(!silent) { + system("date"); + printf("FastTransClos returned Tset called %d times, shortcutted %d, skipped %d pushed %d\n", numTset, numShortcutted, numSkipped, numPushed); + } } - - system("date"); - - printf("TransClos returned Tset called %d times\n", numTset); - fflush(stdout); - - numTset = 0; - numPushed = 0; + Tcommit(xid); - - for(i = 1; i <= NUM_ITER; i++) { - lladdFifo_t * dirtyFifo = logMemoryFifo(DIRTY_BUF_SIZE, 0); - lladdFifoPool_t * pool = lladdFifoPool_pointerPoolInit(NUM_FIFOS, DIRTY_BUF_SIZE, lladdFifoPool_getFifoCRC32, dirtyFifo); - - lladdFifo_t * globalFifo = logMemoryFifo(DIRTY_BUF_SIZE, /*NUM_FIFOS*/ 0); - lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, globalFifo->iterator, &multiplexByRidPage, pool); - - // lladdMultiplexer_start(mux, NULL); - rid.slot =0; - Tconsumer_push(xid, globalFifo->consumer, NULL, 0, (byte*)&rid, sizeof(recordid)); - numOut = 1; - - pthread_t * workers = malloc(sizeof(pthread_t) * NUM_THREADS); - int j; - - worker_arg * arg = malloc(sizeof(worker_arg)); - arg->dirty = dirtyFifo; - arg->global = globalFifo; - arg->pool = pool; - arg->rid = rid; - arg->mux = mux; - arg->i = i; - for(j = 0; j < NUM_THREADS; j++) { - pthread_create(&workers[j], NULL, worker, arg); - } - for(j = 0; j < NUM_THREADS; j++) { - pthread_join(workers[j], NULL); - } - // lladdMultiplexer_join(mux); - - closed = 0; - - } - - system("date"); - printf("FastTransClos returned Tset called %d times, shortcutted %d, skipped %d pushed %d\n", numTset, numShortcutted, numSkipped, numPushed); + gettimeofday(&stop, NULL); Tdeinit(); + + double elapsed = ((double)(stop.tv_sec - start.tv_sec)) + 1e-6 * ((double)(stop.tv_usec - start.tv_usec)); + + printf("%f\n", elapsed); + } diff --git a/src/lladd/graph.c b/src/lladd/graph.c index 6b7d93a..e5d5c2e 100644 --- a/src/lladd/graph.c +++ b/src/lladd/graph.c @@ -11,6 +11,8 @@ extern int numTset; extern int numShortcutted; extern int numSkipped; extern int numPushed; +extern int useCRC; + void naiveTraverse(int xid, recordid rid, int num) { int * node = alloca(sizeof(int) * (transClos_outdegree+1)); @@ -58,7 +60,11 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t localRid = *rid; if(myFifo == -1) { - myFifo = crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount; + if(useCRC) { + myFifo = crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount; + } else { + myFifo = rid->page % pool->fifoCount; + } // printf("Switched locality sets... %d\n", myFifo); } else { // assert(myFifo == crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount);