CHT is starting to work.

This commit is contained in:
Sears Russell 2005-01-19 00:47:55 +00:00
parent 7dacf21069
commit d7b846d370
20 changed files with 210 additions and 76 deletions

View file

@ -151,8 +151,9 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
*/ */
void* main_loop(DfaSet *dfaSet); void* main_loop(DfaSet *dfaSet);
DfaSet * dfa_malloc(int count, short port, DfaSet * dfa_malloc_old(int count, short port,
char *** broadcast_lists, char *** broadcast_lists,
int broadcast_lists_count, int broadcast_lists_count,
int * broadcast_list_host_count); int * broadcast_list_host_count);
DfaSet * dfa_malloc(int count, NetworkSetup * ns);
#endif #endif

View file

@ -63,7 +63,7 @@ longest valid ip:port string we will ever encounter or produce
*/ */
#define MAX_ADDRESS_LENGTH 21 #define MAX_ADDRESS_LENGTH 21
#define ALL_BUT_GROUP_ZERO -1
/** /**
Message structs are the in-memory representation of network Message structs are the in-memory representation of network

View file

@ -5,6 +5,7 @@
Currently, everything here can be derived at startup, so this won't Currently, everything here can be derived at startup, so this won't
need to be in transactional storage, with some luck. */ need to be in transactional storage, with some luck. */
typedef struct networkSetup { typedef struct networkSetup {
char * coordinator;
unsigned short localport; unsigned short localport;
char * localhost; char * localhost;
int socket; int socket;
@ -33,4 +34,7 @@ typedef struct networkSetup {
@return an initialized NetworkSetup struct. @return an initialized NetworkSetup struct.
*/ */
NetworkSetup * readNetworkConfig(char * name, int hostnumber); NetworkSetup * readNetworkConfig(char * name, int hostnumber);
int consolidate_bc_groups(char *** list, NetworkSetup * ns) ;
#endif // __NETWORKSETUP_H #endif // __NETWORKSETUP_H

View file

@ -93,9 +93,6 @@ Transition transitions_2pc[] = {
{ COORDINATOR_COMMITTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE}, { COORDINATOR_COMMITTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE},
{ COORDINATOR_ABORTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE}, { COORDINATOR_ABORTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE},
}; };
Transition client_transitions_2pc[] = { Transition client_transitions_2pc[] = {
@ -108,8 +105,6 @@ Transition client_transitions_2pc[] = {
{ COORDINATOR_ABORTING_2PC, AWAIT_COMMIT_POINT, NULL_STATE, NULL, FALSE}, { COORDINATOR_ABORTING_2PC, AWAIT_COMMIT_POINT, NULL_STATE, NULL, FALSE},
{ SUBORDINATE_ACKING_2PC, AWAIT_RESULT, NULL_STATE, NULL, FALSE}, { SUBORDINATE_ACKING_2PC, AWAIT_RESULT, NULL_STATE, NULL, FALSE},
}; };
@ -136,9 +131,6 @@ State states_2pc[MAX_STATE_COUNT] = {
}; };
/* /*
- add broadcast to messages.h (Done) - add broadcast to messages.h (Done)
@ -164,17 +156,18 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine,
return 0; return 0;
} }
printf("bc_group %d\n", bc_group);
/* Need to check for this somewhere... */ /* Need to check for this somewhere... */
assert(sizeof(TwoPCAppState) <= MAX_APP_STATE_SIZE); assert(sizeof(TwoPCAppState) <= MAX_APP_STATE_SIZE);
memset(state->subordinate_votes, 0, MAX_SUBORDINATES); memset(state->subordinate_votes, 0, MAX_SUBORDINATES);
/* state->xid = m->from_machine_id; */
state->xid = stateMachine->machine_id; state->xid = stateMachine->machine_id;
printf("From: %s", from); // handled by the client.
/*strncpy(state->initiator, from, MAX_ADDRESS_LENGTH);*/ /*strncpy(state->initiator, from, MAX_ADDRESS_LENGTH);*/
if(strncmp(m->initiator, from, MAX_ADDRESS_LENGTH)) {
printf("WARNING: Mismatch between request source (%s) and initiator field (%s). Proceeding. (Trusting the client)\n", m->initiator, from);
}
sprintf(from, "bc:%d\n", bc_group); sprintf(from, "bc:%d\n", bc_group);
/* TODO: (n)ack the client. (Implies yes / no / already pending return values for callback on last line) /* TODO: (n)ack the client. (Implies yes / no / already pending return values for callback on last line)
@ -187,12 +180,15 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine,
ret = 1; ret = 1;
} }
printf("INIT %ld\n", m->initiator_machine_id);
if(m->type==AWAIT_ARRIVAL && ret) { if(m->type==AWAIT_ARRIVAL && ret) {
// need to (n)ack the client: // need to (n)ack the client:
// Respond using the machine id expected by the client. // Respond using the machine id expected by the client.
m->from_machine_id = m->initiator_machine_id; m->from_machine_id = m->initiator_machine_id;
printf("Responding\n"); // printf("Responding\n");
printf("ACK %ld (to %s)\n", m->initiator_machine_id, m->initiator);
respond_once(&((DfaSet*)dfaSet)->networkSetup, respond_once(&((DfaSet*)dfaSet)->networkSetup,
COORDINATOR_START_2PC, m, m->initiator); COORDINATOR_START_2PC, m, m->initiator);
@ -205,6 +201,7 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine,
} }
state_name send_ack_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { state_name send_ack_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
printf("ACK %ld\n", m->to_machine_id);
respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, from); respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, from);
return OVERRIDDEN_STATE; return OVERRIDDEN_STATE;
} }
@ -216,7 +213,13 @@ state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Messa
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
int ret = app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from); int ret = app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from);
// printf("veto_or_prepare returned: %d", ret);
if(ret == SUBORDINATE_VETO_2PC) {
printf("VETO %ld\n", m->to_machine_id);
} else {
assert(ret == SUBORDINATE_PREPARED_2PC);
printf("PREPARE %ld\n", m->to_machine_id);
}
return ret; return ret;
} }
@ -236,9 +239,11 @@ state_name abort_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, ch
/* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */ /* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */
m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/ m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/
printf("Response being sent to: %s:%ld\n", m->initiator, m->to_machine_id); printf("ABORT SUBORDINATE_VETO being sent to: %s:%ld\n", m->initiator, m->to_machine_id);
respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_VETO_2PC, m, m->initiator); respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_VETO_2PC, m, m->initiator);
m->from_machine_id = tmp; m->from_machine_id = tmp;
} else {
printf("ABORT %ld\n", m->to_machine_id);
} }
return ret; return ret;
} }
@ -247,23 +252,28 @@ state_name commit_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, c
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
send_ack_2pc(dfaSet, stateMachine, m, from); send_ack_2pc(dfaSet, stateMachine, m, from);
return app_state->commit_2pc(dfaSet, stateMachine, m, from); int ret = app_state->commit_2pc(dfaSet, stateMachine, m, from);
if(ret) { printf("COMMIT %ld\n", m->to_machine_id); }
if(ret && m->response_type == AWAIT_RESULT) {
respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, m->initiator);
}
return ret;
} }
state_name check_veto_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { state_name check_veto_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
/* Clear subordinate_votes array, so that it can be used to /* Clear subordinate_votes array, so that it can be used to
tally acks after the votes are tallied. */ tally acks after the votes are tallied. */
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); // TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
TwoPCMachineState * machine_state = (TwoPCMachineState*)&(stateMachine->app_state); TwoPCMachineState * machine_state = (TwoPCMachineState*)&(stateMachine->app_state);
/* if (!check_from()) { return 0; } */ /* if (!check_from()) { return 0; } */
short bc_group = app_state->get_broadcast_group(dfaSet, m); // short bc_group = app_state->get_broadcast_group(dfaSet, m);
printf("bc_group:veto %d\n", bc_group); // printf("bc_group:veto %d\n", bc_group);
memset(machine_state->subordinate_votes, 0, MAX_SUBORDINATES); memset(machine_state->subordinate_votes, 0, MAX_SUBORDINATES);
sprintf(from, "bc:%d", bc_group); // sprintf(from, "bc:%d", bc_group);
return 1; return 1;
} }
@ -274,14 +284,27 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
DfaSet * dfaSet = (DfaSet*) dfaSetPtr; DfaSet * dfaSet = (DfaSet*) dfaSetPtr;
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
/* if (!check_from()) { return 0; } */ /* if (!check_from()) { return 0; } */
short bc_group = app_state->get_broadcast_group(dfaSet, m); short bc_group = app_state->get_broadcast_group(dfaSet, m);
// fprintf(stderr, "tally: %s, broadcast group: %d\n", from, bc_group);
if(bc_group < dfaSet->networkSetup.broadcast_lists_count) { if(bc_group < dfaSet->networkSetup.broadcast_lists_count+1) {
state_name ret = tally(dfaSet->networkSetup.broadcast_lists[bc_group], state_name ret;
dfaSet->networkSetup.broadcast_list_host_count[bc_group], if(bc_group == ALL_BUT_GROUP_ZERO) {
(char*)(machine_state->subordinate_votes), from); char ** list;
int count = consolidate_bc_groups(&list, &dfaSet->networkSetup);
/* int i;
for(i = 0; i < count; i++) {
fprintf(stderr, "count = %d tallyhost %d: %s\n", count, i, list[i]);
} */
ret = tally(list, count, (char*)(machine_state->subordinate_votes), from);
free(list);
} else {
ret = tally(dfaSet->networkSetup.broadcast_lists[bc_group-1],
dfaSet->networkSetup.broadcast_list_host_count[bc_group-1],
(char*)(machine_state->subordinate_votes), from);
}
// fprintf(stderr, "Tally returned: %d", ret);
if(ret) { if(ret) {
/* Clear subordinate_votes array, so that it can be used to /* Clear subordinate_votes array, so that it can be used to
tally acks after the votes are tallied. */ tally acks after the votes are tallied. */
@ -300,15 +323,21 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
/* TODO: CORRECTNESS BUG Make sure this is after tally forces the log. Also, need to /* TODO: CORRECTNESS BUG Make sure this is after tally forces the log. Also, need to
make sure that it increments the (currently unimplemented) make sure that it increments the (currently unimplemented)
sequence number before flushing... */ sequence number before flushing... */
// printf("committed ret = %d response_type = %d ");
if(ret && (m->response_type == AWAIT_COMMIT_POINT && m->response_type == COORDINATOR_START_2PC)) { if(ret) {
printf("COMMIT POINT %ld\n", m->to_machine_id);
}
if(ret && (m->response_type == AWAIT_COMMIT_POINT && stateMachine->current_state == COORDINATOR_START_2PC)) {
// printf("sending ack to %s", m->initiator);
// if(ret && (*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC)) { // if(ret && (*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC)) {
state_machine_id tmp = m->from_machine_id; state_machine_id tmp = m->from_machine_id;
m->from_machine_id = m->initiator_machine_id; m->from_machine_id = m->initiator_machine_id;
//printf("Coordinator responding: ? ht=? (key length %d) %d -> to %s:%ld\n", getKeyLength(m), *(int*)getKeyAddr(m), /*getValAddr(m),*/ m->initiator, m->initiator_machine_id ); // printf("Coordinator responding: ? ht=? (key length %d) %d -> /*%d*/ to %s:%ld\n", 0/*getKeyLength(m),*/ , 0, 0, /**(int*)getKeyAddr(m), *//**(int*)getValAddr(m),*/ m->initiator, m->initiator_machine_id );
//debug_print_message(m);
respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator); respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator);
m->from_machine_id = tmp; m->from_machine_id = tmp;
} }
// printf("\n");
return ret; return ret;
} else { } else {
sprintf(from, "bc:%d", bc_group); sprintf(from, "bc:%d", bc_group);

View file

@ -88,17 +88,17 @@ static int hash( const unsigned char * key, size_t keylen , int table_length) {
group, but may only contain a single operation. group, but may only contain a single operation.
*/ */
short multiplex_interleaved(DfaSet * dfaSet, Message * m) { short multiplex_interleaved(DfaSet * dfaSet, Message * m) {
short table_length = dfaSet->networkSetup.broadcast_lists_count-2; short table_length = dfaSet->networkSetup.broadcast_lists_count;
short bc_group; short bc_group;
if((*requestType(m) == CREATE) || (*requestType(m) == DELETE)) { if((*requestType(m) == CREATE) || (*requestType(m) == DELETE)) {
/* Special case: Send to all replicas...bc_group one should contain all replicas... */ /* Special case: Send to all replicas...bc_group one should contain all replicas... */
bc_group = 1; bc_group = ALL_BUT_GROUP_ZERO;
} else { } else {
/* Need to add one so that no requests are assigned to the coordinator (bc:0) */ /* Need to add one so that no requests are assigned to the coordinator (bc:0) */
bc_group = hash(getKeyAddr(m), getKeyLength(m), table_length) + 2; bc_group = hash(getKeyAddr(m), getKeyLength(m), table_length) + 1;
} }
printf("request %d bc group: %d\n", *requestType(m), bc_group); // printf("request %d bc group: %d\n", *requestType(m), bc_group);
return bc_group; return bc_group;
@ -119,21 +119,24 @@ int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine)
} }
} }
DfaSet * cHtInit(int cht_type, char * localhost, DfaSet * cHtInit(int cht_type, /*char * localhost, */
short (* get_broadcast_group)(DfaSet *, Message *), short (* get_broadcast_group)(DfaSet *, Message *),
short port, /* short port,
char *** broadcast_lists, char *** broadcast_lists,
int broadcast_lists_count, int broadcast_lists_count,
int* broadcast_list_host_count) { int* broadcast_list_host_count */
NetworkSetup * ns) {
DfaSet * dfaSet; DfaSet * dfaSet;
int xid = Tbegin(); int xid = Tbegin();
// printf("Init %s port %d\n", ns->localhost, ns->localport);
TwoPCAppState * twoPC_state; TwoPCAppState * twoPC_state;
CHTAppState * chtApp_state; CHTAppState * chtApp_state;
int error; int error;
dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); // dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count);
dfaSet = dfa_malloc(DFA_MACHINE_COUNT, ns);
/* srand(time(NULL)); */ /* srand(time(NULL)); */
@ -141,9 +144,9 @@ DfaSet * cHtInit(int cht_type, char * localhost,
chtApp_state = calloc(1, sizeof(CHTAppState)); chtApp_state = calloc(1, sizeof(CHTAppState));
if(cht_type == CHT_CLIENT) { if(cht_type == CHT_CLIENT) {
error = dfa_reinitialize(dfaSet, localhost, client_transitions_2pc, client_transition_count_2pc, states_2pc, state_count_2pc); error = dfa_reinitialize(dfaSet, ns->localhost, client_transitions_2pc, client_transition_count_2pc, states_2pc, state_count_2pc);
} else { } else {
error = dfa_reinitialize(dfaSet, localhost, transitions_2pc, transition_count_2pc, states_2pc, state_count_2pc); error = dfa_reinitialize(dfaSet, ns->localhost, transitions_2pc, transition_count_2pc, states_2pc, state_count_2pc);
} }
if(error < 0) { if(error < 0) {

View file

@ -129,13 +129,16 @@ int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht);
* *
* *
*/ */
DfaSet * cHtInit(int cht_type, char * localhost, DfaSet * cHtInit(int cht_type,/* char * localhost,*/
short (* get_broadcast_group)(DfaSet *, Message *), short (* get_broadcast_group)(DfaSet *, Message *),
short port, /*short port,
char *** broadcast_lists, char *** broadcast_lists,
int broadcast_lists_count, int broadcast_lists_count,
int* broadcast_list_host_count); int* broadcast_list_host_count*/
NetworkSetup * ns);
DfaSet * cHtClientInit(char * config_file);
DfaSet * cHtCoordinatorInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *));
DfaSet * cHtSubordinateInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *), int subordinate_number);
int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet); int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet);
/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet); /*int cHtCommit(state_machine_id xid, DfaSet * dfaSet);
int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/ int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/
@ -153,3 +156,5 @@ callback_fcn commit_cht;
callback_fcn tally_cht; callback_fcn tally_cht;
callback_fcn abort_cht; callback_fcn abort_cht;
callback_fcn init_xact_cht; callback_fcn init_xact_cht;
short multiplex_interleaved(DfaSet * dfaSet, Message * m);

View file

@ -135,6 +135,20 @@ int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) {
return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
} }
DfaSet * cHtClientInit(char * configFile) {
NetworkSetup * config = readNetworkConfig(configFile, 0);
assert(config->coordinator);
printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n",
config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0]));
/* DfaSet * ret = cHtInit(CHT_CLIENT, parse_addr(config->broadcast_lists[0][0]), NULL,
parse_port(config->broadcast_lists[0][0]), config->broadcast_lists+1, config->broadcast_lists_count-1,
config->broadcast_list_host_count+1); */
DfaSet * ret = cHtInit(CHT_CLIENT, NULL, config);
assert(config->coordinator);
// free (config);
return ret;
}
/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) { /*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) {
size_t zero = 0; size_t zero = 0;
return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero);

Binary file not shown.

View file

@ -45,7 +45,8 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message *
{ {
if(!ht_exists) { printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(NULL); ret = 0; } else { if(!ht_exists) { printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(NULL); ret = 0; } else {
ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0); ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0);
printf("Insert: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), (char*)getValAddr(m)); printf("Insert: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(NULL);
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
} }
@ -55,7 +56,8 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message *
{ {
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
printf("Lookup: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), (char*)getValAddr(m)); printf("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(NULL);
} }
} break; } break;
@ -223,8 +225,8 @@ state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, c
/** @todo why was there an assert(0) in commit_cht?? */ /** @todo why was there an assert(0) in commit_cht?? */
/** @todo On a commit, should responses of type AWAIT_RESULT ever be done by subordinates? */ /** @todo On a commit, should responses of type AWAIT_RESULT ever be done by subordinates? */
// if(m->response_type == AWAIT_RESULT) { // if(m->response_type == AWAIT_RESULT) {
// printf("commit_cht responding on an AWAIT_RESULT request.\n"); // printf("commit_cht responding on an AWAIT_RESULT request.\n");
// assert(0); // assert(0);
/* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */ /* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */
// } // }
/* TODO: Check error codes, and return accordingly... */ /* TODO: Check error codes, and return accordingly... */
@ -234,3 +236,28 @@ state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, c
state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
return 1; return 1;
} }
DfaSet * cHtCoordinatorInit(char * configFile, short (*partition_function)(DfaSet *, Message *)) {
NetworkSetup * config = readNetworkConfig(configFile, COORDINATOR);
/* DfaSet * ret = cHtInit(CHT_COORDINATOR, config->localhost, partition_function,
config->localport, config->broadcast_lists, config->broadcast_lists_count,
config->broadcast_list_host_count);
free (config);*/
DfaSet * ret = cHtInit(CHT_COORDINATOR, partition_function, config);
free(config);
return ret;
}
DfaSet * cHtSubordinateInit(char * configFile, short (*partition_function)(DfaSet *, Message *), int subordinate_number) {
NetworkSetup * config = readNetworkConfig(configFile, subordinate_number);
/* DfaSet * ret = cHtInit(CHT_SERVER, config->localhost, partition_function,
config->localport, config->broadcast_lists, config->broadcast_lists_count,
config->broadcast_list_host_count);*/
DfaSet * ret = cHtInit(CHT_SERVER, partition_function, config);
free (config);
return ret;
}
void debug_print_message(Message * m) {
printf("debug: (key length %d) %d -> %d\n", getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(NULL);
}

View file

@ -40,8 +40,8 @@ permission to use and distribute the software in accordance with the
terms specified in this license. terms specified in this license.
---*/ ---*/
#include "callbacks.h" #include "callbacks.h"
#include "stdio.h" #include <stdio.h>
#include <assert.h>
#define TRUE 1 #define TRUE 1
#define FALSE 0 #define FALSE 0
@ -74,6 +74,8 @@ state_name tally(char ** broadcast_list, int host_count, char * bitSet, char * f
if(index < 0) { if(index < 0) {
printf("Received message from unknown recipient: %s\n", from); printf("Received message from unknown recipient: %s\n", from);
assert(0);
return FALSE; return FALSE;
} }
bitSet[index] = TRUE; bitSet[index] = TRUE;
@ -85,4 +87,3 @@ state_name tally(char ** broadcast_list, int host_count, char * bitSet, char * f
} }
return TRUE; return TRUE;
} }

View file

@ -473,7 +473,7 @@ void * inner_worker_loop(void * arg_void) {
/** SIGALRM will make sleep return immediately (I hope!)*/ /** SIGALRM will make sleep return immediately (I hope!)*/
/* printf("pending: %ld, %d\n", stateMachine->machine_id, stateMachine->pending); */ /* printf("pending: %ld, %d\n", stateMachine->machine_id, stateMachine->pending); */
/** @todo inner worker loop doesn't seem to 'do the right thing' with respect to timing */
if(1|| !stateMachine->pending) { /* If no pending messages, go to sleep */ if(1|| !stateMachine->pending) { /* If no pending messages, go to sleep */
struct timeval now; struct timeval now;
struct timespec timeout_spec; struct timespec timeout_spec;
@ -504,6 +504,8 @@ void * inner_worker_loop(void * arg_void) {
now.tv_sec++; now.tv_sec++;
usec-=1000000; usec-=1000000;
} }
/** @todo ridiculously long timeout in libdfa.c */
timeout_spec.tv_sec = now.tv_sec;/* + timeout; */ timeout_spec.tv_sec = now.tv_sec;/* + timeout; */
timeout_spec.tv_nsec = 1000 * usec; /*now.tv_usec * 1000; */ timeout_spec.tv_nsec = 1000 * usec; /*now.tv_usec * 1000; */
@ -706,7 +708,12 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
initial_sm->message.to_machine_id = recipient_machine_id; initial_sm->message.to_machine_id = recipient_machine_id;
initial_sm->message.type = start_state; initial_sm->message.type = start_state;
strcpy(initial_sm->message.initiator, dfaSet->networkSetup.localhost); //strcpy(initial_sm->message.initiator, dfaSet->networkSetup.localhost);
char * initiator;
asprintf(&initiator, "%s:%d", dfaSet->networkSetup.localhost, dfaSet->networkSetup.localport);
strcpy(initial_sm->message.initiator, initiator);
free(initiator);
printf("Set message initiator to %s", initial_sm->message.initiator);
initial_sm->message.initiator_machine_id = initial_sm->machine_id; initial_sm->message.initiator_machine_id = initial_sm->machine_id;
strcpy(initial_sm->message_recipient, recipient_addr); strcpy(initial_sm->message_recipient, recipient_addr);
@ -755,8 +762,7 @@ void * run_request(DfaSet * dfaSet, state_machine_id machine_id) {
return (void*)ret; return (void*)ret;
} }
DfaSet * dfa_malloc_old(int count, short port,
DfaSet * dfa_malloc(int count, short port,
char *** broadcast_lists, char *** broadcast_lists,
int broadcast_lists_count, int broadcast_lists_count,
int * broadcast_list_host_count) { int * broadcast_list_host_count) {
@ -770,3 +776,11 @@ DfaSet * dfa_malloc(int count, short port,
return dfaSet; return dfaSet;
} }
DfaSet * dfa_malloc(int count, NetworkSetup * ns) {
DfaSet * dfaSet = calloc(1, sizeof(DfaSet));
dfa_initialize_new(dfaSet, ns->localport, count);
memcpy(&dfaSet->networkSetup, ns, sizeof(NetworkSetup));
return dfaSet;
}

View file

@ -172,27 +172,46 @@ int _send_message(const NetworkSetup *ns, Message *message, const char *to);
int __send_message(const NetworkSetup *ns, Message *message, const char *to) { int __send_message(const NetworkSetup *ns, Message *message, const char *to) {
DEBUG("Sending %ld-%d: to %s:%ld\n", message->from_machine_id, message->type ,to, message->to_machine_id);
if(strncmp(to, "bc:", 3)==0) { if(strncmp(to, "bc:", 3)==0) {
int i; int i;
int list_number = parse_port(to); int list_number = parse_port(to);
if(list_number == ALL_BUT_GROUP_ZERO) {
if(list_number < 0 || list_number >= ns->broadcast_lists_count) { // fprintf(stderr, "Broadcasting to all groups (except group 0).\n");
for(int i = 1; i < ns->broadcast_lists_count+1; i++) {
char * new_to;
asprintf(&new_to, "bc:%d", i);
int ret = __send_message(ns, message, new_to);
free(new_to);
if(ret < 0) {
return ret;
}
}
return 0;
} else if(list_number < 0 || list_number >= ns->broadcast_lists_count+1) {
fprintf(stderr, "Invalid list number %d passed into send_message: %s\n", list_number, to); fprintf(stderr, "Invalid list number %d passed into send_message: %s\n", list_number, to);
return -1; return -1;
} }
if(list_number == 0) {
// send to coordinator
// fprintf(stderr, "Sending message to coordinator: %s", ns->coordinator);
return __send_message(ns, message, ns->coordinator);
}
if(ns->broadcast_list_host_count[list_number] == 0) { if(ns->broadcast_list_host_count[list_number] == 0) {
fprintf(stderr, "Sending to empty broadcast list! Address was %s\n", to); fprintf(stderr, "Sending to empty broadcast list! Address was %s\n", to);
} }
for(i =0; i < ns->broadcast_list_host_count[list_number]; i++) {
for(i =0; i < ns->broadcast_list_host_count[list_number-1]; i++) {
// fprintf(stderr, "sending to member %d of list %d\n", i, list_number);
int ret; int ret;
if((ret = __send_message(ns, message, ns->broadcast_lists[list_number][i])) < 0) { if((ret = __send_message(ns, message, ns->broadcast_lists[list_number-1][i])) < 0) {
return ret; return ret;
} }
} }
} else { } else {
DEBUG("Sending %ld-%d: to %s:%ld\n", message->from_machine_id, message->type ,to, message->to_machine_id);
return _send_message(ns, message, to); return _send_message(ns, message, to);
} }
return 0; return 0;

View file

@ -87,7 +87,9 @@ NetworkSetup * readNetworkConfig(char * name, int hostnumber) {
: parse_addr(cfg_getnstr(cfg, "subordinates", hostnumber)); : parse_addr(cfg_getnstr(cfg, "subordinates", hostnumber));
ret->socket = -1; /// @todo where should the socket field be initialized? ret->socket = -1; /// @todo where should the socket field be initialized?
ret->broadcast_lists_count = cfg_size(cfg, "group"); ret->broadcast_lists_count = cfg_size(cfg, "group");
printf("broadcast list count = %d\n", ret->broadcast_lists_count); DEBUG("broadcast list count = %d\n", ret->broadcast_lists_count);
ret->coordinator = strdup(cfg_getstr(cfg, "coordinator"));
DEBUG("Coordinator: %s", ret->coordinator);
ret->broadcast_list_host_count = malloc(sizeof(int *) * ret->broadcast_lists_count); ret->broadcast_list_host_count = malloc(sizeof(int *) * ret->broadcast_lists_count);
ret->broadcast_lists = malloc(sizeof(int**) * ret->broadcast_lists_count); ret->broadcast_lists = malloc(sizeof(int**) * ret->broadcast_lists_count);
for(i = 0; i < ret->broadcast_lists_count; i++) { for(i = 0; i < ret->broadcast_lists_count; i++) {
@ -108,3 +110,20 @@ NetworkSetup * readNetworkConfig(char * name, int hostnumber) {
cfg_free(cfg); cfg_free(cfg);
return ret; return ret;
} }
int consolidate_bc_groups(char *** list, NetworkSetup * ns) {
int count = 0;
int i, j;
for(i = 0; i < ns->broadcast_lists_count; i++) {
count += ns->broadcast_list_host_count[i];
}
*list = malloc(sizeof(char *) * count);
int k = 0;
for(i = 0; i < ns->broadcast_lists_count; i++) {
for(j = 0; j < ns->broadcast_list_host_count[i]; j++) {
(*list)[k] = ns->broadcast_lists[i][j];
k++;
}
}
return count;
}

View file

@ -40,7 +40,7 @@ recordid ThashCreate(int xid, int keySize, int valueSize) {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(lladd_linkedList_entry) + keySize + valueSize); lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(lladd_linkedList_entry) + keySize + valueSize);
TarrayListExtend(xid, lhh.buckets, HASH_INIT_ARRAY_LIST_COUNT); TarrayListExtend(xid, lhh.buckets, HASH_INIT_ARRAY_LIST_COUNT);
int i; int i;
byte * entry = calloc(1, sizeof(lhh.buckets)); byte * entry = calloc(1, lhh.buckets.size);
recordid bucket = lhh.buckets; recordid bucket = lhh.buckets;
for(i = 0; i < HASH_INIT_ARRAY_LIST_COUNT; i++) { for(i = 0; i < HASH_INIT_ARRAY_LIST_COUNT; i++) {
bucket.slot = i; bucket.slot = i;

View file

@ -1,4 +1,4 @@
LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \ LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \
$(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a -lconfuse
bin_PROGRAMS=always_commit bin_PROGRAMS=always_commit
AM_CFLAGS= -g -Wall -pedantic -std=gnu99 AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -92,7 +92,7 @@ int main (int argc, char ** argv) {
assert(argc == 2); assert(argc == 2);
port = parse_port(broadcast_lists[0][0]); port = parse_port(broadcast_lists[0][0]);
app_state->is_coordinator = TRUE; app_state->is_coordinator = TRUE;
dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, dfaSet = dfa_malloc_old(DFA_MACHINE_COUNT, port, broadcast_lists,
broadcast_lists_count, broadcast_list_host_count); broadcast_lists_count, broadcast_list_host_count);
localhost = broadcast_lists[0][0]; localhost = broadcast_lists[0][0];
@ -103,7 +103,7 @@ int main (int argc, char ** argv) {
replica = atoi(argv[2]); replica = atoi(argv[2]);
port = parse_port(broadcast_lists[1][replica]); port = parse_port(broadcast_lists[1][replica]);
app_state->is_coordinator = FALSE; app_state->is_coordinator = FALSE;
dfaSet = dfa_malloc(DFA_MACHINE_COUNT * 10, port, broadcast_lists, dfaSet = dfa_malloc_old(DFA_MACHINE_COUNT * 10, port, broadcast_lists,
broadcast_lists_count, broadcast_list_host_count); broadcast_lists_count, broadcast_list_host_count);
localhost = broadcast_lists[1][replica]; localhost = broadcast_lists[1][replica];
}else { }else {

View file

@ -1,4 +1,4 @@
LDADD=$(top_builddir)/src/apps/cht/libcht.a $(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \ LDADD=-lconfuse $(top_builddir)/src/apps/cht/libcht.a $(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \
$(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a
bin_PROGRAMS=simple cht_server bin_PROGRAMS=simple cht_server client coordinator subordinate
AM_CFLAGS= -g -Wall -pedantic -std=c99 AM_CFLAGS= -g -Wall -pedantic -std=c99

View file

@ -117,8 +117,8 @@ int main (int argc, char**argv) {
} }
port = parse_port(localhost); port = parse_port(localhost);
Tinit(); Tinit();
dfaSet = cHtInit(server_type, localhost, NULL, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); // dfaSet = cHtInit(server_type, localhost, NULL, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count);
assert(0);
main_loop(dfaSet); main_loop(dfaSet);
return -1; return -1;

View file

@ -39,10 +39,10 @@ authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the permission to use and distribute the software in accordance with the
terms specified in this license. terms specified in this license.
---*/ ---*/
#include <string.h>
#include "../../src/apps/cht/cht.h" #include "../../src/apps/cht/cht.h"
#include <assert.h> #include <assert.h>
#include <string.h>
/** Thanks, jbhtsimple.c!! */ /** Thanks, jbhtsimple.c!! */
@ -102,7 +102,8 @@ int main (int argc, char**argv) {
broadcast_lists[0] = star_nodes; broadcast_lists[0] = star_nodes;
broadcast_lists[1] = point_nodes; broadcast_lists[1] = point_nodes;
dfaSet = cHtInit(CHT_CLIENT, argv[2], NULL, atoi(argv[1]), broadcast_lists, broadcast_lists_count, broadcast_list_host_count); /* dfaSet = cHtInit(CHT_CLIENT, argv[2], NULL, atoi(argv[1]), broadcast_lists, broadcast_lists_count, broadcast_list_host_count); */
assert(0);
spawn_main_thread(dfaSet); spawn_main_thread(dfaSet);
@ -164,6 +165,3 @@ int main (int argc, char**argv) {
return 0; return 0;
} }

View file

@ -123,7 +123,7 @@ int main (int argc, char ** argv) {
port = parse_port(broadcast_lists[list_number][node_number]); port = parse_port(broadcast_lists[list_number][node_number]);
dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, dfaSet = dfa_malloc_old(DFA_MACHINE_COUNT, port, broadcast_lists,
broadcast_lists_count, broadcast_list_host_count); broadcast_lists_count, broadcast_list_host_count);
if(list_number == 0) { if(list_number == 0) {