stasis-aries-wal/src/stasis/graph.c

125 lines
3.4 KiB
C
Raw Normal View History

#include <stasis/graph.h>
#include <alloca.h>
#include <assert.h>
#include <stdlib.h>
#include "page.h"
#include <stasis/crc32.h>
2005-03-24 04:43:22 +00:00
int numOut = 0;
int numTset = 0;
int numShortcutted = 0;
int numSkipped = 0;
int numPushed = 0;
int useCRC = 0;
int transClos_outdegree = 0;
2005-03-25 16:33:36 +00:00
2005-03-24 04:43:22 +00:00
void naiveTraverse(int xid, recordid rid, int num) {
int * node = alloca(sizeof(int) * (transClos_outdegree+1));
Tread(xid, rid, node);
2005-03-24 04:43:22 +00:00
if(node[transClos_outdegree] == num) { return; }
assert(node[transClos_outdegree] == (num-1));
node[transClos_outdegree] = num;
numTset++;
Tset(xid, rid, node);
int i = 0;
2005-03-24 04:43:22 +00:00
for(i = 0; i < transClos_outdegree; i++) {
rid.slot = node[i];
naiveTraverse(xid, rid, num);
}
}
2005-03-24 04:43:22 +00:00
pthread_mutex_t counters = PTHREAD_MUTEX_INITIALIZER;
/** @todo need to load the correct pages, since the local fifo doesn't refer to a single page!!! */
2005-03-24 04:43:22 +00:00
void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t * global, lladdFifoPool_t * pool, int num) {
int * node = alloca(sizeof(int) * (transClos_outdegree+1));
int * nodeScratch = alloca(sizeof(int) * (transClos_outdegree+1));
int myFifo = -1;
int deltaNumOut = 0;
int deltaNumSkipped = 0;
int deltaNumShortcutted = 0;
int deltaPushed = 0;
2005-03-24 04:43:22 +00:00
while(Titerator_tryNext(xid, local->iterator)) { // @nextOrEmprty?
2006-05-25 22:50:52 +00:00
byte * brid;
2005-03-24 04:43:22 +00:00
recordid localRid;
2006-05-25 22:50:52 +00:00
size_t size = Titerator_value(xid, local->iterator, &brid);
2005-03-24 04:43:22 +00:00
assert(size == sizeof(recordid));
2006-05-25 22:50:52 +00:00
recordid * rid = (recordid*)brid;
2005-03-24 04:43:22 +00:00
localRid = *rid;
2005-03-24 04:43:22 +00:00
if(myFifo == -1) {
2005-03-25 16:33:36 +00:00
if(useCRC) {
2006-03-20 23:11:46 +00:00
myFifo = crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned int)-1) % pool->fifoCount;
2005-03-25 16:33:36 +00:00
} else {
myFifo = rid->page % pool->fifoCount;
}
2005-03-24 04:43:22 +00:00
// printf("Switched locality sets... %d\n", myFifo);
} else {
// assert(myFifo == crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount);
}
2005-03-24 04:43:22 +00:00
Titerator_tupleDone(xid, local->iterator);
Tread(xid, localRid, node);
2005-03-24 04:43:22 +00:00
if(node[transClos_outdegree] != num) {
assert(node[transClos_outdegree] == (num-1));
2005-03-24 04:43:22 +00:00
node[transClos_outdegree] = num;
numTset++;
Tset(xid, localRid, node); /// @todo TsetRange?
int i;
for(i =0 ; i < transClos_outdegree; i++) {
recordid nextRid = arrayList;
nextRid.slot = node[i];
Page * p = loadPage(xid, arrayList.page); // just pin it forever and ever
2007-06-07 21:53:09 +00:00
nextRid = dereferenceArrayListRid(xid, p, nextRid.slot);
2005-03-24 04:43:22 +00:00
releasePage(p);
2006-03-20 23:11:46 +00:00
int thisFifo = crc32((byte*)&(nextRid.page), sizeof(nextRid.page), (unsigned int)-1) % pool->fifoCount;
2005-03-24 04:43:22 +00:00
/* if(nextRid.page == rid->page) {
assert(thisFifo == myFifo);
}*/
// if(nextRid.page == localRid.page) {
if(thisFifo == myFifo) {
deltaNumShortcutted++;
Tread(xid, nextRid, nodeScratch);
if(nodeScratch[transClos_outdegree] != num) {
Tconsumer_push(xid, local->consumer, NULL, 0, (byte*)&nextRid, sizeof(recordid));
deltaNumOut++;
} else {
deltaNumSkipped++;
}
} else {
// @todo check nextRid to see if we're the worker that will consume it, or (easier) if it stays on the same page.
Tconsumer_push(xid, global->consumer, NULL, 0, (byte*)&nextRid, sizeof(recordid));
deltaPushed++;
deltaNumOut++;
}
}
}
2005-03-24 04:43:22 +00:00
deltaNumOut--;
}
2005-03-24 04:43:22 +00:00
pthread_mutex_lock(&counters);
numOut += deltaNumOut;
numSkipped += deltaNumSkipped;
numShortcutted += deltaNumShortcutted;
numPushed += deltaPushed;
pthread_mutex_unlock(&counters);
}