stasis-aries-wal/src/stasis/graph.c
Sears Russell 6d17442380 Major refactoring:
- Changed operations to take only two arguments
     - No more hacks regarding log argument sizes

     - Set pageid = INVALID_PAGE if you want a logical operation

  - Ported operation implementations to new api; exposed + fixed a number of concurrency bugs

  - More fixes to prepare

  - Fixed to nested top actions

  - More coherent operations api / recovery implmentation

  - TnaiveHash* and Tinstant* are (and were already) broken, and are set for removal

  - Removed some instances of fail_unless

  - Fixed design flaws in blob implementation.

  - New naming convention for operation callback functions.
2008-09-28 03:11:24 +00:00

124 lines
3.5 KiB
C

#include <stasis/graph.h>
#include <alloca.h>
#include <assert.h>
#include <stdlib.h>
#include <stasis/page.h>
#include <stasis/crc32.h>
int numOut = 0;
int numTset = 0;
int numShortcutted = 0;
int numSkipped = 0;
int numPushed = 0;
int useCRC = 0;
int transClos_outdegree = 0;
void naiveTraverse(int xid, recordid rid, int num) {
int * node = alloca(sizeof(int) * (transClos_outdegree+1));
Tread(xid, rid, node);
if(node[transClos_outdegree] == num) { return; }
assert(node[transClos_outdegree] == (num-1));
node[transClos_outdegree] = num;
numTset++;
Tset(xid, rid, (const byte*)node);
int i = 0;
for(i = 0; i < transClos_outdegree; i++) {
rid.slot = node[i];
naiveTraverse(xid, rid, num);
}
}
pthread_mutex_t counters = PTHREAD_MUTEX_INITIALIZER;
/** @todo need to load the correct pages, since the local fifo doesn't refer to a single page!!! */
void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t * global, lladdFifoPool_t * pool, int num) {
int * node = alloca(sizeof(int) * (transClos_outdegree+1));
int * nodeScratch = alloca(sizeof(int) * (transClos_outdegree+1));
int myFifo = -1;
int deltaNumOut = 0;
int deltaNumSkipped = 0;
int deltaNumShortcutted = 0;
int deltaPushed = 0;
while(Titerator_tryNext(xid, local->iterator)) { // @nextOrEmprty?
byte * brid;
recordid localRid;
size_t size = Titerator_value(xid, local->iterator, &brid);
assert(size == sizeof(recordid));
recordid * rid = (recordid*)brid;
localRid = *rid;
if(myFifo == -1) {
if(useCRC) {
myFifo = stasis_crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned int)-1) % pool->fifoCount;
} else {
myFifo = rid->page % pool->fifoCount;
}
// printf("Switched locality sets... %d\n", myFifo);
} else {
// assert(myFifo == crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount);
}
Titerator_tupleDone(xid, local->iterator);
Tread(xid, localRid, node);
if(node[transClos_outdegree] != num) {
assert(node[transClos_outdegree] == (num-1));
node[transClos_outdegree] = num;
numTset++;
Tset(xid, localRid, (const byte*)node); /// @todo TsetRange?
int i;
for(i =0 ; i < transClos_outdegree; i++) {
recordid nextRid = arrayList;
nextRid.slot = node[i];
Page * p = loadPage(xid, arrayList.page); // just pin it forever and ever
nextRid = dereferenceArrayListRid(xid, p, nextRid.slot);
releasePage(p);
int thisFifo = stasis_crc32((byte*)&(nextRid.page), sizeof(nextRid.page), (unsigned int)-1) % pool->fifoCount;
/* if(nextRid.page == rid->page) {
assert(thisFifo == myFifo);
}*/
// if(nextRid.page == localRid.page) {
if(thisFifo == myFifo) {
deltaNumShortcutted++;
Tread(xid, nextRid, nodeScratch);
if(nodeScratch[transClos_outdegree] != num) {
Tconsumer_push(xid, local->consumer, NULL, 0, (byte*)&nextRid, sizeof(recordid));
deltaNumOut++;
} else {
deltaNumSkipped++;
}
} else {
// @todo check nextRid to see if we're the worker that will consume it, or (easier) if it stays on the same page.
Tconsumer_push(xid, global->consumer, NULL, 0, (byte*)&nextRid, sizeof(recordid));
deltaPushed++;
deltaNumOut++;
}
}
}
deltaNumOut--;
}
pthread_mutex_lock(&counters);
numOut += deltaNumOut;
numSkipped += deltaNumSkipped;
numShortcutted += deltaNumShortcutted;
numPushed += deltaPushed;
pthread_mutex_unlock(&counters);
}