Transitive closure working!

This commit is contained in:
Sears Russell 2005-03-24 04:43:22 +00:00
parent 8f2aa6a550
commit cf58e1fb72
10 changed files with 160 additions and 57 deletions

View file

@ -3,7 +3,7 @@ LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \
$(top_builddir)/src/libdfa/librw.a
bin_PROGRAMS=naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \
arrayListSet logicalMultiReaders linearHashNTA linkedListNTA pageOrientedListNTA \
linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests
linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests transitiveClosure
AM_CFLAGS= -g -Wall -pedantic -std=gnu99
SUBDIRS=berkeleyDB

View file

@ -84,7 +84,7 @@ static void * go (void * arg_ptr) {
}
Tcommit(xid);
commitCount++;
/*
for(j = k * count; j < (k+1) *(count) ; j++) {
int tmp = -100;

View file

@ -5,17 +5,17 @@
#ifndef __LLADD_GRAPH_H
#define __LLADD_GRAPH_H
typedef struct {
/*typedef struct {
int outPage; ///<= number of links that leave this page.
short inPage; ///<= number of links that stay within this page
/// (represented as an array of shorts after the
/// array of links leaving the page.)
short flags; ///<= information about this node. (algorithm specific...)
} nodeHeader_t;
} nodeHeader_t; */
void naiveTraverse(int xid, recordid rid);
void multiTraverse(int xid, int page, lladdFifo_t * local, lladdFifo_t * global);
void naiveTraverse(int xid, recordid rid, int num);
void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t * global, lladdFifoPool_t * pool, int num);
void pushRequest(lladdConsumer_t * cons);

View file

@ -67,3 +67,5 @@ int lladdMultiplexer_start(lladdMultiplexer_t * multiplexer, pthread_attr_t * th
*/
int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer);
void multiplexByValue(byte * key, size_t keySize, byte * value, size_t valueSize, byte **multiplexKey, size_t * multiplexSize);
void multiplexByRidPage(byte * key, size_t keySize, byte * value, size_t valueSize, byte **multiplexKey, size_t * multiplexSize);

View file

@ -53,7 +53,7 @@ terms specified in this license.
#ifndef __ARRAY_LIST_H
#define __ARRAY_LIST_H
compensated_function recordid TarrayListAlloc(int xid, int count, int multiplier, int size);
compensated_function recordid TarrayListAlloc(int xid, int numPages, int multiplier, int recordSize);
Operation getArrayListAlloc();
Operation getInitFixed();

View file

@ -16,6 +16,8 @@ lladdFifo_t * lladdFifoPool_getFifoCRC32( lladdFifoPool_t * pool, byte * multipl
}
void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) {
if(pool->dirtyPoolFifo) {
assert(fifo->consumer->type < 10);
assert(fifo->iterator->type < 10);
Tconsumer_push(xid, pool->dirtyPoolFifo->consumer, NULL, 0, (byte*)&fifo, sizeof(lladdFifo_t*));
}
}

View file

@ -3,72 +3,120 @@
#include <assert.h>
#include <stdlib.h>
#include "page.h"
void naiveTraverse(int xid, recordid rid) {
nodeHeader_t * node = alloca(rid.size);
#include <lladd/crc32.h>
extern int transClos_outdegree;
extern int numOut;
extern int numTset;
extern int numShortcutted;
extern int numSkipped;
extern int numPushed;
void naiveTraverse(int xid, recordid rid, int num) {
int * node = alloca(sizeof(int) * (transClos_outdegree+1));
Tread(xid, rid, node);
if(node->flags) { return; }
node->flags = 1;
if(node[transClos_outdegree] == num) { return; }
assert(node[transClos_outdegree] == (num-1));
node[transClos_outdegree] = num;
numTset++;
Tset(xid, rid, node);
int i = 0;
// do 'local' nodes first.
for(i = 0; i < node->inPage; i++) {
short next = *(((short*)(((recordid*)(node+1))+node->outPage))+i);
rid.slot = next;
rid.size = TrecordSize(xid, rid);
naiveTraverse(xid, rid);
}
for(i = 0; i < node->outPage; i++) {
recordid next = ((recordid*)(node+1))[i];
next.size = TrecordSize(xid, next);
naiveTraverse(xid, next);
for(i = 0; i < transClos_outdegree; i++) {
rid.slot = node[i];
naiveTraverse(xid, rid, num);
}
}
/** @todo need to load the correct pages, since the local fifo doesn't refer to a single page!!! */
void multiTraverse(int xid, int page, lladdFifo_t * local, lladdFifo_t * global) {
Page * p = loadPage(xid, page);
while(Titerator_next(xid, local->iterator)) {
recordid * rid_p;
int rid_len = Titerator_value(xid, local->iterator, (byte**)&rid_p);
assert(rid_len == sizeof(recordid));
recordid rid = *rid_p;
pthread_mutex_t counters = PTHREAD_MUTEX_INITIALIZER;
/** @todo need to load the correct pages, since the local fifo doesn't refer to a single page!!! */
void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t * global, lladdFifoPool_t * pool, int num) {
int * node = alloca(sizeof(int) * (transClos_outdegree+1));
int * nodeScratch = alloca(sizeof(int) * (transClos_outdegree+1));
int myFifo = -1;
int deltaNumOut = 0;
int deltaNumSkipped = 0;
int deltaNumShortcutted = 0;
int deltaPushed = 0;
while(Titerator_tryNext(xid, local->iterator)) { // @nextOrEmprty?
recordid * rid;
recordid localRid;
size_t size = Titerator_value(xid, local->iterator, (byte**)&rid);
assert(size == sizeof(recordid));
localRid = *rid;
if(myFifo == -1) {
myFifo = crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount;
// printf("Switched locality sets... %d\n", myFifo);
} else {
// assert(myFifo == crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount);
}
Titerator_tupleDone(xid, local->iterator);
nodeHeader_t * node = malloc(rid.size);
Tread(xid, localRid, node);
readRecord(xid, p, rid, node);
if(node[transClos_outdegree] != num) {
assert(node[transClos_outdegree] == (num-1));
if(!node->flags) {
node->flags = 1;
// @todo logical operation here.
Tset(xid, rid, node);
// do 'local' nodes first.
node[transClos_outdegree] = num;
numTset++;
Tset(xid, localRid, node); /// @todo TsetRange?
int i;
for(i = 0; i < node->inPage; i++) {
short slot = *(((short*)(((recordid*)(node+1))+node->outPage))+i);
rid.slot = slot;
rid.size = getRecordSize(xid, p, rid);
Tconsumer_push(xid, local->consumer, (byte*)&(rid.page), sizeof(rid.page), (byte*)&rid, sizeof(recordid));
}
// now, do non-local nodes
for(i = 0; i < node->outPage; i++) {
recordid next = ((recordid*)(node+1))[i];
Tconsumer_push(xid, global->consumer, (byte*)&(rid.page), sizeof(rid.page), (byte*)&next, sizeof(recordid));
for(i =0 ; i < transClos_outdegree; i++) {
recordid nextRid = arrayList;
nextRid.slot = node[i];
Page * p = loadPage(xid, arrayList.page); // just pin it forever and ever
nextRid = dereferenceArrayListRid(p, nextRid.slot);
releasePage(p);
int thisFifo = crc32((byte*)&(nextRid.page), sizeof(nextRid.page), (unsigned long)-1L) % pool->fifoCount;
/* if(nextRid.page == rid->page) {
assert(thisFifo == myFifo);
}*/
// if(nextRid.page == localRid.page) {
if(thisFifo == myFifo) {
deltaNumShortcutted++;
Tread(xid, nextRid, nodeScratch);
if(nodeScratch[transClos_outdegree] != num) {
Tconsumer_push(xid, local->consumer, NULL, 0, (byte*)&nextRid, sizeof(recordid));
deltaNumOut++;
} else {
deltaNumSkipped++;
}
} else {
// @todo check nextRid to see if we're the worker that will consume it, or (easier) if it stays on the same page.
Tconsumer_push(xid, global->consumer, NULL, 0, (byte*)&nextRid, sizeof(recordid));
deltaPushed++;
deltaNumOut++;
}
}
}
free(node);
Titerator_tupleDone(xid, local->iterator);
deltaNumOut--;
}
releasePage(p);
pthread_mutex_lock(&counters);
numOut += deltaNumOut;
numSkipped += deltaNumSkipped;
numShortcutted += deltaNumShortcutted;
numPushed += deltaPushed;
pthread_mutex_unlock(&counters);
}

View file

@ -37,6 +37,45 @@ int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer) {
return pthread_join(multiplexer->worker,NULL);
}
void * lladdMultiplexer_flush(lladdMultiplexer_t * m) {
// lladdMultiplexer_t * m = arg;
lladdConsumer_t * consumer;
while(Titerator_tryNext(m->xid, m->it)) {
byte * mkey, * key, * value;
size_t mkeySize, keySize, valueSize;
keySize = Titerator_key (m->xid, m->it, &key);
valueSize = Titerator_value(m->xid, m->it, &value);
m->multiplexer(key, keySize, value, valueSize, &mkey, &mkeySize);
lladdFifo_t * fifo = m->fifoPool->getFifo(m->fifoPool, mkey, mkeySize);
consumer = fifo->consumer;
Tconsumer_push(m->xid, consumer, key, keySize, value, valueSize);
Titerator_tupleDone(m->xid, m->it);
lladdFifoPool_markDirty(m->xid, m->fifoPool, fifo);
}
// iterate over pblhash, closing consumers.
/* Titerator_close(m->xid, m->it);
// @todo Does this belong in its own function in fifo.c?
lladdFifoPool_t * pool = m->fifoPool;
int i;
for(i = 0; i < pool->fifoCount; i++) {
Tconsumer_close(m->xid, pool->pool[i]->consumer);
}
if(m->fifoPool->dirtyPoolFifo) {
Tconsumer_close(m->xid, m->fifoPool->dirtyPoolFifo->consumer);
}
*/
return (void*)compensation_error();
}
void * multiplexer_worker(void * arg) {
lladdMultiplexer_t * m = arg;
@ -120,3 +159,13 @@ void multiplexHashLogByKey(byte * key,
}
}
void multiplexByValue(byte * key, size_t keySize, byte * value, size_t valueSize, byte **multiplexKey, size_t * multiplexSize) {
*multiplexKey = value;
*multiplexSize = valueSize;
}
void multiplexByRidPage(byte * key, size_t keySize, byte * value, size_t valueSize, byte **multiplexKey, size_t * multiplexSize) {
*multiplexKey = (byte*)&(((recordid*)value)->page);
*multiplexSize = sizeof(((recordid*)value)->page);
}

View file

@ -216,6 +216,7 @@ compensated_function recordid TallocFromPage(int xid, long page, long size) {
if(rid.size == size) {
Tupdate(xid,rid, NULL, OPERATION_ALLOC);
} else {
// slottedCompact(p);
assert(rid.size < 0);
}
if(p) {

View file

@ -131,7 +131,8 @@ void openPageFile() {
#if HAVE_POSIX_MEMALIGN
// O_DIRECT is broken under linux 2.4..
stable = open (STORE_FILE, O_CREAT | O_RDWR /*| O_DIRECT*/, S_IRWXU | S_IRWXG | S_IRWXO);
stable = open (STORE_FILE, O_CREAT | O_RDWR/* | O_DIRECT*/, S_IRWXU | S_IRWXG | S_IRWXO);
// printf("WARNING: LLADD IS USING O_DIRECT!!!\n");
#else
//#warn Not using O_DIRECT
// If we don't have posix_memalign(), then we aren't aligning our pages in memory, and can't use O_DIRECT.