CHT cleanup, integrated CHT tests into 'make check'

This commit is contained in:
Sears Russell 2005-01-20 22:55:54 +00:00
parent b04e71360f
commit 3642716431
5 changed files with 88 additions and 267 deletions

View file

@ -45,6 +45,30 @@ AM_CONDITIONAL(HAVE_CHECK, test x$have_check = xtrue)
#AC_CHECK_LIB(efence,memalign)
#fi
# Linux has a broken O_DIRECT flag, but we allow people to override it from
# the command line.
test_host_prw=yes
AC_CACHE_CHECK([for open/O_DIRECT], db_cv_open_o_direct, [
AC_TRY_LINK([
#include <sys/types.h>
#include <fcntl.h>], [
open("a", O_RDONLY | O_DIRECT, 0);
], [db_cv_open_o_direct=yes; test_host_prw=no], [db_cv_open_o_direct=no])])
if test "$test_host_prw" = "no" -a "$db_cv_open_o_direct" = "yes"; then
case "$host_os" in
linux*)
db_cv_open_o_direct=no;
AC_MSG_WARN(
[O_DIRECT interface ignored on $host_os-$host_vendor.]);;
esac
fi
if test "$db_cv_open_o_direct" = "yes"; then
AC_DEFINE(HAVE_O_DIRECT)
AH_TEMPLATE(HAVE_O_DIRECT, [Define to 1 if you have the O_DIRECT flag.])
fi
AC_CHECK_LIB([m], [sqrt])
AC_CHECK_LIB([db], [db_open])
AC_CHECK_LIB([pthread], [pthread_create])

View file

@ -53,14 +53,7 @@ terms specified in this license.
/** TODO: Endianness. (ICK) */
//state_name request_type =*(requestType(m)); \sdsd
//state_name response_type =*(responseType(m)); \sd
//TwoPCMachineState * machine_state_2pc = (TwoPCMachineState*) &(stateMachine->app_state); \sd
/** TODO: Endianness for CHT (ICK) */
/**
* Hash function generator, taken directly from pblhash
@ -81,24 +74,26 @@ static int hash( const unsigned char * key, size_t keylen , int table_length) {
}
/** TODO: multiplex_interleaved needs to return a special group for
/** @todo multiplex_interleaved needs to return a special group for
begin/commit/abort requests. There will then be two types of
transactions. Explicit ones will create xids on all replicas, and
span multiple operations. Implcit ones will only span one replica
group, but may only contain a single operation.
*/
@todo bc group is one-indexed, which is really irritating! */
short multiplex_interleaved(DfaSet * dfaSet, Message * m) {
short table_length = dfaSet->networkSetup.broadcast_lists_count;
short bc_group;
if((*requestType(m) == CREATE) || (*requestType(m) == DELETE)) {
/* Special case: Send to all replicas...bc_group one should contain all replicas... */
/* Special case: Send to all replicas. */
bc_group = ALL_BUT_GROUP_ZERO;
} else {
/* Need to add one so that no requests are assigned to the coordinator (bc:0) */
bc_group = hash(getKeyAddr(m), getKeyLength(m), table_length) + 1;
}
// printf("request %d bc group: %d\n", *requestType(m), bc_group);
DEBUG("request %d bc group: %d\n", *requestType(m), bc_group);
return bc_group;
@ -119,27 +114,20 @@ int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine)
}
}
DfaSet * cHtInit(int cht_type, /*char * localhost, */
DfaSet * cHtInit(int cht_type,
short (* get_broadcast_group)(DfaSet *, Message *),
/* short port,
char *** broadcast_lists,
int broadcast_lists_count,
int* broadcast_list_host_count */
NetworkSetup * ns) {
DfaSet * dfaSet;
int xid = Tbegin();
// printf("Init %s port %d\n", ns->localhost, ns->localport);
DEBUG("Init %s port %d\n", ns->localhost, ns->localport);
TwoPCAppState * twoPC_state;
CHTAppState * chtApp_state;
int error;
// dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count);
dfaSet = dfa_malloc(DFA_MACHINE_COUNT, ns);
/* srand(time(NULL)); */
twoPC_state = calloc(1, sizeof(TwoPCAppState));
chtApp_state = calloc(1, sizeof(CHTAppState));
@ -158,7 +146,7 @@ DfaSet * cHtInit(int cht_type, /*char * localhost, */
chtApp_state->xid_ht = jbHtCreate(xid, 79);
chtApp_state->ht_ht = jbHtCreate(xid, 79);
chtApp_state->ht_xid = Tbegin(); // !!!!
chtApp_state->next_hashTableId = 0; /* This gets incremented each time a new hashtable is allocated. */
chtApp_state->next_hashTableId = 0;
twoPC_state->is_coordinator = (cht_type == CHT_COORDINATOR);
twoPC_state->init_xact_2pc = init_xact_cht;

View file

@ -143,10 +143,12 @@ int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet);
/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet);
int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/
/** The server side state for a CHT. */
typedef struct {
int ht_xid;
jbHashTable_t * xid_ht;
jbHashTable_t * ht_ht;
/** This gets incremented by the coordinator each time a new hashtable is allocated. */
int next_hashTableId;
} CHTAppState;

View file

@ -43,14 +43,14 @@ static int _chtEval(DfaSet * dfaSet,
Message m;
if(ht != NULL) {
printf("_chtEval(request=%d, response=%d, xid=%ld, ht=%d ", request_type, response_type, *xid, ht->id);
DEBUG("_chtEval(request=%d, response=%d, xid=%ld, ht=%d ", request_type, response_type, *xid, ht->id);
} else {
printf("_chtEval(request=%d, response=%d, xid=%ld, ht=NULL ", request_type, response_type, *xid);
DEBUG("_chtEval(request=%d, response=%d, xid=%ld, ht=NULL ", request_type, response_type, *xid);
}
if(key == NULL) {
printf(")\n");
DEBUG(")\n");
} else {
printf("key=%d)\n", *(int*)key);
DEBUG("key=%d)\n", *(int*)key);
}
* requestType(&m) = request_type;
m.response_type = response_type;
@ -96,7 +96,7 @@ static int _chtEval(DfaSet * dfaSet,
*xid = m.to_machine_id;
printf("+chtEval returning %d\n", m.type);
DEBUG("+chtEval returning %d\n", m.type);
return m.type;
}
@ -140,12 +140,9 @@ DfaSet * cHtClientInit(char * configFile) {
assert(config->coordinator);
printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n",
config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0]));
/* DfaSet * ret = cHtInit(CHT_CLIENT, parse_addr(config->broadcast_lists[0][0]), NULL,
parse_port(config->broadcast_lists[0][0]), config->broadcast_lists+1, config->broadcast_lists_count-1,
config->broadcast_list_host_count+1); */
DfaSet * ret = cHtInit(CHT_CLIENT, NULL, config);
assert(config->coordinator);
// free (config);
free (config);
return ret;
}

View file

@ -39,6 +39,7 @@ authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#define _GNU_SOURCE // For asprintf
#include <libdfa/libdfa.h>
#include <unistd.h>
#include <sys/wait.h>
@ -64,7 +65,6 @@ typedef struct {
} main_wrap_args;
void dfa_initialize_new(DfaSet * dfaSet, unsigned short port, int count) {
/*init_MonoTree(&(dfaSet->monoTree), count);*/
Tinit();
dfaSet->smash = init_Smash(count);
dfaSet->networkSetup.localport = port;
@ -72,39 +72,16 @@ void dfa_initialize_new(DfaSet * dfaSet, unsigned short port, int count) {
void recover(DfaSet * dfaSet);
/*void nothing(int signo) {
/
This space left intentionally blank.
/
}*/
int dfa_reinitialize(DfaSet *dfaSet, char * localhost,
Transition transitions[], int transition_count,
State states[], state_name state_count) {
/* struct sigaction actions;
int rc;*/
dfaSet->lock = initlock();
dfaSet->states = states;
dfaSet->state_count = state_count;
dfaSet->transitions = transitions;
dfaSet->transition_count = transition_count;
/* memset(&actions, 0, sizeof(actions));
sigemptyset(&actions.sa_mask);
actions.sa_flags = 0;
actions.sa_handler = nothing;
rc = sigaction(SIGALRM,&actions,NULL);
if(rc < 0) {
perror("sigaction");
return -1;
}
*/
if(init_network_broadcast(&(dfaSet->networkSetup), dfaSet->networkSetup.localport, localhost,
dfaSet->networkSetup.broadcast_lists,
dfaSet->networkSetup.broadcast_lists_count,
@ -131,34 +108,23 @@ pthread_t spawn_worker_thread(DfaSet * dfaSet, state_machine_id machine_id);
pthread_t spawn_main_thread(DfaSet * dfaSet);
void recover(DfaSet * dfaSet) {
/* int machine_count;
StateMachine* machines = enumerateMachines(&(dfaSet->monoTree), &machine_count);
int i;
for(i = 0; i < machine_count; i++) {
machines[i].worker_thread = spawn_worker_thread(dfaSet, machines[i].machine_id);
} */
//StateMachine sm_stack;
StateMachine * sm;//= &sm_stack;
StateMachine * sm;
StateMachine * this;
// Need to write iterator...
// int ret = (jbHtFirst(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1);
int keySize = sizeof(state_machine_id);
state_machine_id * sm_id;
int valueSize = sizeof(StateMachine);
lladd_hash_iterator * it = ThashIterator(dfaSet->smash->xid, dfaSet->smash->hash, keySize, valueSize);
lladd_hash_iterator * it = ThashIterator(dfaSet->smash->xid,
dfaSet->smash->hash,
keySize, valueSize);
//assert(0); // need to call linear hash iterator here.
while(ThashNext(dfaSet->smash->xid, it, (byte**)&sm_id, &keySize, (byte**)&sm, &valueSize)) {
// while(ret) {
assert(*sm_id == sm->machine_id);
this = getSmash(dfaSet->smash, *sm_id);
DEBUG("StateMachine %ld\n", sm->machine_id);
this->worker_thread = spawn_worker_thread(dfaSet, sm->machine_id);
free(sm_id);
free(sm);
// ret = (jbHtNext(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1);
}
}
void* main_loop(DfaSet *dfaSet) {
@ -168,25 +134,23 @@ void* main_loop(DfaSet *dfaSet) {
NetworkSetup * networkSetup = malloc(sizeof(NetworkSetup));
int recieved_message = 0;
/* StateMachine stateMachine_stack; */
StateMachine * stateMachine; /* = &stateMachine_stack; */
StateMachine * stateMachine;
writelock(dfaSet->lock, 300);
memcpy(networkSetup, &(dfaSet->networkSetup), sizeof(NetworkSetup));
writeunlock(dfaSet->lock);
/** @todo the locking scheme for this loop could be improved. The loop assumes that
pointers to existing state machines could be invalidated by unrelated threads,
and this forces it to hold a global write lock during processing. */
while(1) {
int i;
state_name new_state, current_state;
/* int ret;*/
int new_machine = 0;
/*Listen on socket... */
if(recieved_message) {
/* if(stateMachine != NULL) {
setSmash(dfaSet->smash, stateMachine->machine_id);
} */
writeunlock(dfaSet->lock);
recieved_message = 0;
}
@ -197,80 +161,39 @@ void* main_loop(DfaSet *dfaSet) {
recieved_message = 1;
/* The commented out, more complex locking scheme does not work,
because freeMachine (called by worker threads) invalidates
stateMachine pointers, so this loop really needs a global
write lock. */
/* readlock(dfaSet->lock, 200); */
writelock(dfaSet->lock, 200);
stateMachine = getSmash(dfaSet->smash, message->to_machine_id);
/* printf("Lookup %ld, ret = %d\n", message->to_machine_id, ret); */
/* stateMachine = getMachine(&(dfaSet->monoTree), message->to_machine_id); */
/* if(stateMachine != NULL) {
/ * Grab the lock now. * /
pthread_mutex_lock(&(stateMachine->mutex));
} */
DEBUG("Lookup %ld, ret = %d\n", message->to_machine_id, ret);
/* This is the only thread that can write to the monoTree, so we
don't need to worry that someone else will create the
stateMachine, so we can safely release all locks if the
stateMachine was null, and not worry about race conditions. */
/* readunlock(dfaSet->lock); */
/* TODO: Check states to make sure they actually exist? */
/** @todo Check states to make sure they actually exist? */
if(stateMachine == NULL) {
/*writelock(dfaSet->lock, 600);*/
DEBUG("Allocate machine %ld->", message->to_machine_id); fflush(NULL);
if(message->to_machine_id == NULL_MACHINE) {
stateMachine = allocSmash(dfaSet->smash);
/*stateMachine = allocMachine(&(dfaSet->monoTree));*/
} else {
/* TODO: Check id. */
/*stateMachine = insertMachine(&(dfaSet->monoTree), message->to_machine_id); */
/* @todo: Check id. */
stateMachine = insertSmash(dfaSet->smash, message->to_machine_id);
}
if(stateMachine == NULL) {
/*writeunlock(dfaSet->lock);*/
fprintf(stderr, "Too many state machines. Dropping request for new one.\n");
continue;
} else {
/* printf("machine->id:%ld\n", stateMachine->machine_id); */
}
new_machine = 1;
/* Done with monotree for now. */
/* Grab a write lock on our machine before releasing the global
write lock, so that its worker thread doesn't get a hold of
it. */
/*pthread_mutex_lock(&(stateMachine->mutex));*/
/*writeunlock(dfaSet->lock);*/
stateMachine->worker_thread = (pid_t)NULL;
stateMachine->page = NULL;
stateMachine->page_id.page = 0; /* TODO: Is there such a thing as a null page?? */
/* @todo libdfa doesn't use the 'conventional' null recordid, {0,0,-1} */
stateMachine->page_id.page = 0;
stateMachine->page_id.slot = 0;
stateMachine->page_id.size = 0;
@ -279,8 +202,6 @@ void* main_loop(DfaSet *dfaSet) {
current_state = stateMachine->current_state;
}
/* At this point, we hold stateMachine->mutex and no other locks. */
new_state = current_state;
/* Find the appropriate transition */
@ -299,7 +220,6 @@ void* main_loop(DfaSet *dfaSet) {
fprintf(stderr, "%ld received: %ld-%d:%d->? (bad message)\n", stateMachine->machine_id, message->from_machine_id,
message->type, current_state);
/*pthread_mutex_unlock(&(stateMachine->mutex));*/
continue;
}
@ -340,16 +260,10 @@ void* main_loop(DfaSet *dfaSet) {
if(stateMachine->worker_thread == (pid_t)NULL) {
/* No worker thread, so just deallocate, and move on */
/* Note, that at this point, we hold the mutex on the state machine, and the global write lock. */
/*writelock(dfaSet->lock,500);
pthread_mutex_unlock(&(stateMachine->mutex));*/
/*freeMachine(&(dfaSet->monoTree),
stateMachine->machine_id); */
freeSmash(dfaSet->smash, stateMachine->machine_id);
/*writeunlock(dfaSet->lock);*/
continue;
} else {
/* NULL_STATE_TOMBSTONE tells the worker thread that it's
@ -371,7 +285,7 @@ void* main_loop(DfaSet *dfaSet) {
stateMachine->current_state = new_state;
stateMachine->last_transition = time(NULL);
/* TODO: Is this general enough? The transition function is
/* @todo Is this general enough? The transition function is
able to overwrite both variables, so it should be
good enough. */
@ -392,59 +306,38 @@ void* main_loop(DfaSet *dfaSet) {
stateMachine->message.from_machine_id = stateMachine->machine_id;
stateMachine->message.to_machine_id = message->from_machine_id;
/* TODO: Force if necessary.*/
if(dfaSet->transitions[i].force) {
setSmash(dfaSet->smash, stateMachine->machine_id);
forceSmash(dfaSet->smash);
}
/*stateMachine->pending++; */
/* setSmash(dfaSet->smash, stateMachine->machine_id); */
/* Fork or signal the process if there already is one. */
if(stateMachine->worker_thread == (pthread_t)NULL) {
/* assert (getMachine(&(dfaSet->monoTree), stateMachine->machine_id) == stateMachine); */
assert ((stateMachine->current_state != NULL_STATE) &&
(stateMachine->current_state != NULL_STATE_TOMBSTONE));
stateMachine->worker_thread = spawn_worker_thread(dfaSet, stateMachine->machine_id);
} else {
/* pthread_cond_t * cond;
pthread_cond_t cond_copy;
pthread_kill (stateMachine->worker_thread, SIGALRM);
pthread_cond_signal (&(stateMachine->sleepCond)); */
/* printf("Waking worker...\n"); */
/* memcpy(&cond_copy, &(stateMachine->sleepCond), sizeof(pthread_cond_t)); */
pthread_cond_broadcast (stateMachine->sleepCond); /* TODO: Signal should be adequate.. */
// This was a broadcast, but was recently changed to signal for
// performance reasons.
pthread_cond_signal(stateMachine->sleepCond);
}
}
/* setSmash(dfaSet->smash, stateMachine->machine_id); */
}
}
void * inner_worker_loop(void * arg_void) {
WorkerLoopArgs * arg = arg_void;
/* int ret; */
DfaSet * dfaSet = arg->dfaSet;
const state_machine_id machine_id = arg->machine_id;
/* pthread_cond_t cond_copy;
pthread_mutex_t mutex_copy; */
int timeout = 0; /* Run through the loop immediately the first time around. */
int state = 0;
/* int tries = 0; */
/* StateMachine stateMachine_stack; */
StateMachine* stateMachine; /* = &stateMachine_stack; */
StateMachine* stateMachine;
free(arg_void);
@ -455,14 +348,6 @@ void * inner_worker_loop(void * arg_void) {
stateMachine = getSmash(dfaSet->smash, machine_id);
/* stateMachine = getMachine(&(dfaSet->monoTree), machine_id); */
/* assert(stateMachine != NULL); */
/* assert(ret); */
/* memcpy(&mutex_copy, &(stateMachine->mutex), sizeof(pthread_mutex_t));
memcpy(&cond_copy, &(stateMachine->sleepCond), sizeof(pthread_cond_t)); */
pthread_mutex_lock(stateMachine->mutex);
while(1) {
@ -470,9 +355,6 @@ void * inner_worker_loop(void * arg_void) {
state_name i, state_idx;
/** SIGALRM will make sleep return immediately (I hope!)*/
/* printf("pending: %ld, %d\n", stateMachine->machine_id, stateMachine->pending); */
/** @todo inner worker loop doesn't seem to 'do the right thing' with respect to timing */
if(1|| !stateMachine->pending) { /* If no pending messages, go to sleep */
struct timeval now;
@ -486,8 +368,6 @@ void * inner_worker_loop(void * arg_void) {
cond = stateMachine->sleepCond;
mutex = stateMachine->mutex;
/* setSmash(dfaSet->smash, stateMachine); */ /* No longer write to stateMacine in this loop..Well
, we do, but we can safely lose that change. */
readunlock(dfaSet->lock);
/* A note on locking: This loop maintains a read lock everywhere
@ -496,32 +376,23 @@ void * inner_worker_loop(void * arg_void) {
gettimeofday(&now, NULL);
usec = now.tv_usec + timeout;
if(usec > 1000000) {
now.tv_sec++;
usec-=1000000;
}
/** @todo ridiculously long timeout in libdfa.c */
timeout_spec.tv_sec = now.tv_sec;/* + timeout; */
timeout_spec.tv_nsec = 1000 * usec; /*now.tv_usec * 1000; */
timeout_spec.tv_sec = now.tv_sec;
timeout_spec.tv_nsec = 1000 * usec;
rc = pthread_cond_timedwait (cond, mutex, &timeout_spec );
/* rc = sleep(timeout); */
if(rc == EINVAL) {
perror("pthread");
} /*else if (rc != ETIMEDOUT) {
printf("Worker signaled.\n");
} else {
printf("Timed out.\n");
}*/
}
readlock(dfaSet->lock, machine_id);
/* Some other thread may have invalidated our pointer while we
@ -529,90 +400,52 @@ void * inner_worker_loop(void * arg_void) {
our pointer is local to this thread, we still need to re-read
from the store.*/
/*stateMachine = getMachine(&(dfaSet->monoTree), machine_id);*/
assert(stateMachine == getSmash(dfaSet->smash, machine_id));
}
/* stateMachine = getMachine(&(dfaSet->monoTree), machine_id); */
/* This can't happen! */
/* assert(ret); */
/* assert(stateMachine != NULL); */
DEBUG("Current State: %d, %d\n", stateMachine->current_state, NULL_STATE_TOMBSTONE);
/* pthread_mutex_lock(&(stateMachine->mutex)); */
/* if(stateMachine->pending > 0) {
if(stateMachine->pending > 0) {
stateMachine->pending--;
}
assert (stateMachine->pending >= 0);
}*/
/* pthread_mutex_unlock(&(stateMachine->mutex)); */
/* printf("Current State: %d, %d\n", stateMachine->current_state, NULL_STATE_TOMBSTONE); */
if(stateMachine->current_state == NULL_STATE_TOMBSTONE) {
/* printf("Breaking\n"); */
DEBUG("Freeing statemachine\n");
break;
}
state = stateMachine->current_state;
stateMachine->message.type = stateMachine->current_state;
timeout = 690000 +(int) (300000.0*rand()/(RAND_MAX+1.0));
/* timeout = 1; */
for(i = 0; i < dfaSet->state_count; i++) {
if(dfaSet->states[i].name == stateMachine->current_state) {
state_idx = i;
}
}
/* if(dfaSet->states[stateMachine->current_state].abort_fcn && (time(NULL) - stateMachine->last_transition > 100)) {
if(dfaSet->states[state_idx].abort_fcn && (time(NULL) - stateMachine->last_transition > 100)) {
((callback_fcn*)dfaSet->states[state_idx].abort_fcn)(dfaSet, stateMachine, &(stateMachine->message), NULL);
freeMachine(&(dfaSet->monoTree), machine_id);
break;
}
/ * if((time(NULL) - stateMachine->last_transition > abort_timeout) && abort_fcn!=NULL) {
abort_fcn(dfaSet, stateMachine, &(stateMachine->message), NULL); * /
}*/
/* TODO: Copy this stuff into buffers w/ memcopy, unlock, then call send_message. */
/* printf("Worker loop for state machine: %ld still active\n", machine_id); */
DEBUG("Worker loop for state machine: %ld still active\n", machine_id);
send_message(&(dfaSet->networkSetup), &(stateMachine->message), stateMachine->message_recipient);
}
setSmash(dfaSet->smash, stateMachine->machine_id);
pthread_mutex_unlock(stateMachine->mutex);
readunlock(dfaSet->lock);
return 0;
}
void * worker_loop(void * arg_void) {
WorkerLoopArgs * arg = arg_void;
/* StateMachine stateMachine_stack; */
StateMachine * stateMachine; /* = &stateMachine_stack; */
StateMachine * stateMachine;
DfaSet * dfaSet = arg->dfaSet;
state_machine_id machine_id = arg->machine_id;
readlock(dfaSet->lock, machine_id);
/* printf("Worker loop: %ld\n", machine_id); */
DEBUG("Worker loop: %ld\n", machine_id);
stateMachine = getSmash(dfaSet->smash, machine_id);
/* stateMachine = getMachine(&(dfaSet->monoTree), machine_id); */
/* assert(stateMachine != NULL); */
assert(stateMachine->machine_id == machine_id);
/* pthread_detach gets angry if the current process recieves a
signal. How to handle this properly? Can the main thread
wait until this thread detaches successfully? (Do we need a lock somewhere?) */
if(pthread_detach(stateMachine->worker_thread) != 0) {
perror("pthread_detach");
}
@ -620,18 +453,12 @@ void * worker_loop(void * arg_void) {
readunlock(dfaSet->lock);
inner_worker_loop(arg_void);
/* fflush(NULL); */
writelock(dfaSet->lock, machine_id);
DEBUG("Freeing machine %ld\n", machine_id);
/* pthread_mutex_lock(&(stateMachine->mutex)); */
/*freeMachine(&(dfaSet->monoTree), machine_id); */
freeSmash(dfaSet->smash, machine_id);
/* pthread_mutex_unlock(&(stateMachine->mutex)); */
writeunlock(dfaSet->lock);
return 0;
}
@ -652,24 +479,20 @@ pthread_t spawn_main_thread(DfaSet * dfaSet) {
pthread_t spawn_worker_thread(DfaSet * dfaSet, state_machine_id machine_id) {
/* Do we need to malloc a new worker_loop_args for each thread?
TODO: Should this be freed? Is it already?
*/
pthread_t worker_thread;
int ret;
WorkerLoopArgs ** worker_loop_args = malloc(sizeof(WorkerLoopArgs*));
*worker_loop_args = malloc(sizeof(WorkerLoopArgs));
WorkerLoopArgs * worker_loop_args;
// Freed in inner_worker_loop.
worker_loop_args = malloc(sizeof(WorkerLoopArgs));
DEBUG("spawn_worker_thread(state_machine_id=%ld)\n", machine_id);
(*worker_loop_args)->dfaSet = dfaSet;
(*worker_loop_args)->machine_id = machine_id;
worker_loop_args->dfaSet = dfaSet;
worker_loop_args->machine_id = machine_id;
ret = pthread_create(&worker_thread, NULL, &worker_loop, *worker_loop_args);
ret = pthread_create(&worker_thread, NULL, &worker_loop, worker_loop_args);
if(ret != 0) {
perror("libdfa: pthread_create:");
@ -681,21 +504,16 @@ pthread_t spawn_worker_thread(DfaSet * dfaSet, state_machine_id machine_id) {
}
void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, state_machine_id recipient_machine_id, Message * message) {
/* StateMachine initial_sm_stack; */
StateMachine * initial_sm; /* = &initial_sm_stack; */
StateMachine * initial_sm;
state_machine_id machine_id;
int ret;
writelock(dfaSet->lock, 600);
/* initial_sm = allocMachine(&(dfaSet->monoTree)); */
initial_sm = allocSmash(dfaSet->smash);
if(initial_sm == NULL) {
return NULL;
}
/* if(!ret) {
return NULL;
} */
assert(start_state != NULL_STATE);
if(message != NULL) {
@ -708,17 +526,15 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
initial_sm->message.to_machine_id = recipient_machine_id;
initial_sm->message.type = start_state;
//strcpy(initial_sm->message.initiator, dfaSet->networkSetup.localhost);
char * initiator;
asprintf(&initiator, "%s:%d", dfaSet->networkSetup.localhost, dfaSet->networkSetup.localport);
strcpy(initial_sm->message.initiator, initiator);
free(initiator);
printf("Set message initiator to %s", initial_sm->message.initiator);
DEBUG("Set message initiator to %s", initial_sm->message.initiator);
initial_sm->message.initiator_machine_id = initial_sm->machine_id;
strcpy(initial_sm->message_recipient, recipient_addr);
machine_id = initial_sm->machine_id;
/* setSmash(dfaSet->smash, initial_sm->machine_id); */
writeunlock(dfaSet->lock);
ret = (int)run_request(dfaSet, machine_id);
@ -731,7 +547,6 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
memcpy(message, &(initial_sm->message), sizeof(Message));
}
/* freeMachine(&(dfaSet->monoTree), initial_sm->machine_id); */
freeSmash(dfaSet->smash, initial_sm->machine_id);
writeunlock(dfaSet->lock);
@ -741,22 +556,18 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
void * run_request(DfaSet * dfaSet, state_machine_id machine_id) {
void * ret;
WorkerLoopArgs * worker_loop_args = malloc(sizeof(WorkerLoopArgs));
/* StateMachine machine_stack; */
StateMachine * machine; /* = &machine_stack; */
StateMachine * machine;
readlock(dfaSet->lock, 600);
machine = getSmash(dfaSet->smash, machine_id);
/* machine= getMachine(&(dfaSet->monoTree), machine_id); */
worker_loop_args->dfaSet = dfaSet;
worker_loop_args->machine_id = machine_id;
machine->worker_thread = pthread_self();
/* setSmash(dfaSet->smash, machine->machine_id); */
readunlock(dfaSet->lock);
ret = inner_worker_loop(worker_loop_args);
return (void*)ret;
@ -767,7 +578,6 @@ DfaSet * dfa_malloc_old(int count, short port,
int broadcast_lists_count,
int * broadcast_list_host_count) {
DfaSet * dfaSet = calloc(1, sizeof(DfaSet));
/* dfaSet->monoTree.buffer = calloc(count, sizeof(StateMachine)); */
dfa_initialize_new(dfaSet, port, count);
dfaSet->networkSetup.broadcast_lists = broadcast_lists;