libdfa ported to new hash table implementation. (Uses nested top actions.)

This commit is contained in:
Sears Russell 2005-01-15 23:55:49 +00:00
parent ac0d413e58
commit 894e30085c
11 changed files with 48 additions and 32 deletions

View file

@ -137,19 +137,26 @@ void recover(DfaSet * dfaSet) {
for(i = 0; i < machine_count; i++) {
machines[i].worker_thread = spawn_worker_thread(dfaSet, machines[i].machine_id);
} */
StateMachine sm_stack;
StateMachine * sm = &sm_stack;
//StateMachine sm_stack;
StateMachine * sm;//= &sm_stack;
StateMachine * this;
// Need to write iterator...
int ret = (jbHtFirst(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1);
// int ret = (jbHtFirst(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1);
int keySize = sizeof(state_machine_id);
state_machine_id * sm_id;
int valueSize = sizeof(StateMachine);
lladd_hash_iterator * it = ThashIterator(dfaSet->smash->xid, dfaSet->smash->hash, keySize, valueSize);
assert(0); // need to call linear hash iterator here.
while(ret) {
this = getSmash(dfaSet->smash, sm->machine_id);
//assert(0); // need to call linear hash iterator here.
while(ThashNext(dfaSet->smash->xid, it, (byte**)&sm_id, &keySize, (byte**)&sm, &valueSize)) {
// while(ret) {
assert(*sm_id == sm->machine_id);
this = getSmash(dfaSet->smash, *sm_id);
DEBUG("StateMachine %ld\n", sm->machine_id);
this->worker_thread = spawn_worker_thread(dfaSet, sm->machine_id);
ret = (jbHtNext(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1);
free(sm_id);
free(sm);
// ret = (jbHtNext(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1);
}
}
@ -591,7 +598,6 @@ void * worker_loop(void * arg_void) {
StateMachine * stateMachine; /* = &stateMachine_stack; */
DfaSet * dfaSet = arg->dfaSet;
state_machine_id machine_id = arg->machine_id;
readlock(dfaSet->lock, machine_id);
/* printf("Worker loop: %ld\n", machine_id); */

View file

@ -215,11 +215,13 @@ int _send_message(const NetworkSetup *ns, Message *message, const char *to) {
if(addr == NULL) {
fprintf(stderr, "Send failed. Could not parse addr.\n");
free(to_sa);
return -1;
}
if(port == -1) {
fprintf(stderr, "Send failed. Could not parse port.\n");
free(to_sa);
free(addr);
return -1;
}
@ -228,6 +230,7 @@ int _send_message(const NetworkSetup *ns, Message *message, const char *to) {
if(err == 0) {
perror("inet_aton");
free(to_sa);
free(addr);
return -1;
}
@ -244,6 +247,7 @@ int _send_message(const NetworkSetup *ns, Message *message, const char *to) {
if(ret < 0) {
perror("send_message");
}
free(to_sa);
if(ret != sizeof(Message)) {
fprintf(stderr, "send_message sent partial message!\n");

View file

@ -53,7 +53,7 @@ smash_t * init_Smash(int size) {
ret->next_sm_id = 0;
ret->store = store;
// ret->hash = jbHtCreate(xid, 3499);
ret->hash = lHtCreate(xid, 7);
ret->hash = ThashCreate(xid, sizeof(state_machine_id), sizeof(StateMachine));//lHtCreate(xid, 7);
ret->xid = xid;
ret->lock = malloc(sizeof(pthread_mutex_t));
ret->memHash = pblHtCreate();
@ -92,7 +92,7 @@ StateMachine * _insertSmash(smash_t * smash, state_machine_id id) {
new->current_state = START_STATE;
/* printf("Insert %ld\n", id); */
TlogicalHashInsert(smash->xid, smash->hash, (byte*)&id, sizeof(state_machine_id), (byte*)new, sizeof(StateMachine));
ThashInsert(smash->xid, smash->hash, (byte*)&id, sizeof(state_machine_id), (byte*)new, sizeof(StateMachine));
pblHtInsert(smash->memHash, &id, sizeof(state_machine_id), new);
/* Tcommit(smash->xid);
smash->xid = Tbegin(); */
@ -130,11 +130,12 @@ StateMachine * allocSmash (smash_t * smash) {
/** @return -1 on error, 0 if there isn't any more room, and 1 on success. */
StateMachine * insertSmash(smash_t * smash, state_machine_id id) {
void * ret;
StateMachine junk;
StateMachine * junk;
pthread_mutex_lock(smash->lock);
if(TlogicalHashLookup(smash->xid, smash->hash, (byte*)&(smash->next_sm_id), sizeof(state_machine_id), (byte*)&junk, sizeof(state_machine_id)) != -1) {
if(ThashLookup(smash->xid, smash->hash, (byte*)&(smash->next_sm_id), sizeof(state_machine_id), (byte**)&junk) != -1) {
free(junk);
pthread_mutex_unlock(smash->lock);
return NULL;
}
@ -165,8 +166,8 @@ int freeSmash (smash_t * smash, state_machine_id id) {
free(old->sleepCond);
pblHtRemove(smash->memHash, &(id), sizeof(state_machine_id));
ret = TlogicalHashDelete(smash->xid, smash->hash, (byte*)&(id), sizeof(state_machine_id), NULL, sizeof(state_machine_id)) != -1;
//ret = TlogicalHashDelete(smash->xid, smash->hash, (byte*)&(id), sizeof(state_machine_id), NULL, sizeof(state_machine_id)) != -1;
ret = ThashRemove(smash->xid, smash->hash, (byte*)&(id), sizeof(state_machine_id));
free(old);
/* Tcommit(smash->xid);
@ -192,8 +193,10 @@ void _setSmash(smash_t * smash, state_machine_id id) {
StateMachine * machine;
machine = _getSmash(smash, id);
TlogicalHashInsert(smash->xid, smash->hash, (byte*)&id, sizeof(state_machine_id),(byte*) machine, sizeof(StateMachine));
//TlogicalHashInsert(smash->xid, smash->hash, (byte*)&id, sizeof(state_machine_id),(byte*) machine, sizeof(StateMachine));
ThashInsert(smash->xid, smash->hash,
(byte*)&id, sizeof(state_machine_id),
(byte*) machine, sizeof(StateMachine));
}

View file

@ -137,12 +137,13 @@ pthread_mutex_t truncateLog_mutex;
static int sought = 1;
static char * buffer;
int openLogWriter() {
#define BUFSIZE (1024*96)
//#define BUFSIZE (512)
char * buffer ;/*= malloc(BUFSIZE);*/
assert(!posix_memalign((void*)&(buffer), PAGE_SIZE, BUFSIZE));
//char * buffer ;/*= malloc(BUFSIZE);*/
int ret = posix_memalign((void*)&(buffer), PAGE_SIZE, BUFSIZE);
assert(!ret);
int logFD = open (LOG_FILE, O_CREAT | O_RDWR | O_APPEND /*| O_SYNC*/, S_IRWXU | S_IRWXG | S_IRWXO);
if(logFD == -1) {
@ -380,7 +381,7 @@ void closeLogWriter() {
deletelock(log_read_lock);
pthread_mutex_destroy(&log_write_mutex);
pthread_mutex_destroy(&truncateLog_mutex);
free (buffer); // breaks efence. :(
}
void deleteLogWriter() {

View file

@ -840,7 +840,7 @@ linearHash_iteratorPair TlogicalHashIteratorNext(int xid, recordid hashRid, line
//next.size == 0 -> empty bucket. == -1 -> end of list.
int inBucket = 0;
//while(!memcmp(&(it->current_rid), &(NULLRID), sizeof(recordid))
printf("--- %d %d %d\n", it->current_rid.size, it->current_hashBucket, max_bucket(headerHashBits, headerNextSplit)); fflush(NULL);
// printf("--- %d %d %d\n", it->current_rid.size, it->current_hashBucket, max_bucket(headerHashBits, headerNextSplit)); fflush(NULL);
int found = 0;
while(/*it->current_rid.size == -1
&& */it->current_hashBucket <= max_bucket(headerHashBits, headerNextSplit)) {
@ -853,7 +853,7 @@ linearHash_iteratorPair TlogicalHashIteratorNext(int xid, recordid hashRid, line
inBucket = 1;
} else {
found = 1;
printf("bbb {%d, %d, %d} {%d, %d, %d} %d %d\n", e->next.page, e->next.slot, e->next.size, it->current_rid.page, it->current_rid.slot, it->current_rid.size, it->current_hashBucket, max_bucket(headerHashBits, headerNextSplit)); fflush(NULL);
// printf("bbb {%d, %d, %d} {%d, %d, %d} %d %d\n", e->next.page, e->next.slot, e->next.size, it->current_rid.page, it->current_rid.slot, it->current_rid.size, it->current_hashBucket, max_bucket(headerHashBits, headerNextSplit)); fflush(NULL);
break;
} // else, it stays NULLRID.
}

View file

@ -282,6 +282,7 @@ int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte
} else {
TlinkedListRemove(xid, start_list, key, keySize);
TlinkedListInsert(xid, end_list, key, keySize, value, valueSize);
free(value);
pthread_mutex_unlock(&linked_list_mutex);
return 1;
}

View file

@ -160,7 +160,7 @@ void pageDeInit() {
for(int i = 0; i < MAX_BUFFER_SIZE+1; i++) {
deletelock(pool[i].rwlatch);
deletelock(pool[i].loadlatch);
free(pool[i].memAddr);
free(pool[i].memAddr); // breaks efence
}
pthread_mutex_destroy(&lastAllocedPage_mutex);
}

View file

@ -88,7 +88,7 @@ int main (int argc, char** argv) {
/* dfaSet->monoTree.buffer = calloc(DFA_MACHINE_COUNT, sizeof(StateMachine));
dfa_initialize_new (dfaSet, 10001, DFA_MACHINE_COUNT); */
/* dfaSet->monoTree.buffer = calloc(100, sizeof(StateMachine)); */
dfa_initialize_new (dfaSet, 10001, 1000);
dfa_initialize_new (dfaSet, 10001, 100);
/* initial_sm1 = allocMachine(&(dfaSet->monoTree)); */
assert(NULL != (initial_sm1 = allocSmash(dfaSet->smash)));
@ -190,4 +190,3 @@ state_name recv_ack(void * dfaSet, StateMachine * stateMachine, Message * m, cha
}

View file

@ -161,6 +161,7 @@ START_TEST (pingpong_check) {
}
main_loop(dfaSet);
/* Can't get here. */
// return 0;
}
@ -169,6 +170,7 @@ END_TEST
state_name recv_ping(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
if((global_kill--) <= 0) {
printf("Done: %d\n", stateMachine->machine_id);
// while(1) { int i; i++; }
exit(0);
} else if(stateMachine != NULL) {
DEBUG("%ld(%d): Got a ping from %s Machine %ld\n", stateMachine->machine_id, stateMachine->current_state, from, m->from_machine_id);

View file

@ -111,7 +111,7 @@ START_TEST(simpleLinearHashTest)
if(! (i % 1000)) {
printf("%d\n", i);
// printf("%d\n", i);
fflush(NULL);
}
@ -210,11 +210,11 @@ START_TEST(transactionalLinearHashTest)
recordid foo = Talloc(xid, 1);
printf("%d %d %ld\n", foo.page, foo.slot, foo.size);
// printf("%d %d %ld\n", foo.page, foo.slot, foo.size);
recordid hashRoot = ThashAlloc(xid, sizeof(int), sizeof(recordid));
printf("%d %d %ld", hashRoot.page, hashRoot.slot, hashRoot.size);
// printf("%d %d %ld", hashRoot.page, hashRoot.slot, hashRoot.size);
// Insert some entries, see if they stick around.

View file

@ -213,7 +213,7 @@ START_TEST(check_linearHashIterator) {
int key;
int val;
int * keySeen = calloc(NUM_ITERATOR_ENTRIES, sizeof(int));
// int * keySeen = calloc(NUM_ITERATOR_ENTRIES, sizeof(int));
for(int i = 0; i < NUM_ITERATOR_ENTRIES; i++) {
key = i;
val = NUM_ITERATOR_ENTRIES * key;
@ -235,7 +235,7 @@ START_TEST(check_linearHashIterator) {
linearHash_iteratorPair next = TlogicalHashIteratorNext(xid,rid, it, sizeof(int), sizeof(int));
assert(next.key );
while(next.key != NULL) {
printf("%d -> %d\n", *(next.key), *(next.value));
// printf("%d -> %d\n", *(next.key), *(next.value));
next = TlogicalHashIteratorNext(xid, rid, it, sizeof(int), sizeof(int));
}
TlogicalHashIteratorFree(it);