diff --git a/libdfa/libdfa.h b/libdfa/libdfa.h index b98a601..bbd3760 100644 --- a/libdfa/libdfa.h +++ b/libdfa/libdfa.h @@ -151,8 +151,9 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s */ void* main_loop(DfaSet *dfaSet); -DfaSet * dfa_malloc(int count, short port, +DfaSet * dfa_malloc_old(int count, short port, char *** broadcast_lists, int broadcast_lists_count, int * broadcast_list_host_count); +DfaSet * dfa_malloc(int count, NetworkSetup * ns); #endif diff --git a/libdfa/messages.h b/libdfa/messages.h index 513a48c..da4392a 100644 --- a/libdfa/messages.h +++ b/libdfa/messages.h @@ -63,7 +63,7 @@ longest valid ip:port string we will ever encounter or produce */ #define MAX_ADDRESS_LENGTH 21 - +#define ALL_BUT_GROUP_ZERO -1 /** Message structs are the in-memory representation of network diff --git a/libdfa/networksetup.h b/libdfa/networksetup.h index df9edaf..a9c0394 100644 --- a/libdfa/networksetup.h +++ b/libdfa/networksetup.h @@ -5,6 +5,7 @@ Currently, everything here can be derived at startup, so this won't need to be in transactional storage, with some luck. */ typedef struct networkSetup { + char * coordinator; unsigned short localport; char * localhost; int socket; @@ -33,4 +34,7 @@ typedef struct networkSetup { @return an initialized NetworkSetup struct. */ NetworkSetup * readNetworkConfig(char * name, int hostnumber); + +int consolidate_bc_groups(char *** list, NetworkSetup * ns) ; + #endif // __NETWORKSETUP_H diff --git a/src/2pc/2pc.c b/src/2pc/2pc.c index 34c1b61..13bac25 100644 --- a/src/2pc/2pc.c +++ b/src/2pc/2pc.c @@ -93,9 +93,6 @@ Transition transitions_2pc[] = { { COORDINATOR_COMMITTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE}, { COORDINATOR_ABORTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE}, - - - }; Transition client_transitions_2pc[] = { @@ -108,8 +105,6 @@ Transition client_transitions_2pc[] = { { COORDINATOR_ABORTING_2PC, AWAIT_COMMIT_POINT, NULL_STATE, NULL, FALSE}, { SUBORDINATE_ACKING_2PC, AWAIT_RESULT, NULL_STATE, NULL, FALSE}, - - }; @@ -136,9 +131,6 @@ State states_2pc[MAX_STATE_COUNT] = { }; - - - /* - add broadcast to messages.h (Done) @@ -164,17 +156,18 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine, return 0; } - printf("bc_group %d\n", bc_group); - /* Need to check for this somewhere... */ assert(sizeof(TwoPCAppState) <= MAX_APP_STATE_SIZE); memset(state->subordinate_votes, 0, MAX_SUBORDINATES); - /* state->xid = m->from_machine_id; */ state->xid = stateMachine->machine_id; - printf("From: %s", from); + // handled by the client. /*strncpy(state->initiator, from, MAX_ADDRESS_LENGTH);*/ + if(strncmp(m->initiator, from, MAX_ADDRESS_LENGTH)) { + printf("WARNING: Mismatch between request source (%s) and initiator field (%s). Proceeding. (Trusting the client)\n", m->initiator, from); + } + sprintf(from, "bc:%d\n", bc_group); /* TODO: (n)ack the client. (Implies yes / no / already pending return values for callback on last line) @@ -187,12 +180,15 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine, ret = 1; } + printf("INIT %ld\n", m->initiator_machine_id); if(m->type==AWAIT_ARRIVAL && ret) { // need to (n)ack the client: // Respond using the machine id expected by the client. m->from_machine_id = m->initiator_machine_id; - printf("Responding\n"); + // printf("Responding\n"); + + printf("ACK %ld (to %s)\n", m->initiator_machine_id, m->initiator); respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_START_2PC, m, m->initiator); @@ -205,6 +201,7 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine, } state_name send_ack_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + printf("ACK %ld\n", m->to_machine_id); respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, from); return OVERRIDDEN_STATE; } @@ -216,7 +213,13 @@ state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Messa TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); int ret = app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from); - + // printf("veto_or_prepare returned: %d", ret); + if(ret == SUBORDINATE_VETO_2PC) { + printf("VETO %ld\n", m->to_machine_id); + } else { + assert(ret == SUBORDINATE_PREPARED_2PC); + printf("PREPARE %ld\n", m->to_machine_id); + } return ret; } @@ -236,9 +239,11 @@ state_name abort_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, ch /* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */ m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/ - printf("Response being sent to: %s:%ld\n", m->initiator, m->to_machine_id); + printf("ABORT SUBORDINATE_VETO being sent to: %s:%ld\n", m->initiator, m->to_machine_id); respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_VETO_2PC, m, m->initiator); m->from_machine_id = tmp; + } else { + printf("ABORT %ld\n", m->to_machine_id); } return ret; } @@ -247,23 +252,28 @@ state_name commit_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, c TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); send_ack_2pc(dfaSet, stateMachine, m, from); - return app_state->commit_2pc(dfaSet, stateMachine, m, from); + int ret = app_state->commit_2pc(dfaSet, stateMachine, m, from); + if(ret) { printf("COMMIT %ld\n", m->to_machine_id); } + if(ret && m->response_type == AWAIT_RESULT) { + respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, m->initiator); + } + return ret; } state_name check_veto_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { /* Clear subordinate_votes array, so that it can be used to tally acks after the votes are tallied. */ - TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); +// TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); TwoPCMachineState * machine_state = (TwoPCMachineState*)&(stateMachine->app_state); /* if (!check_from()) { return 0; } */ - short bc_group = app_state->get_broadcast_group(dfaSet, m); +// short bc_group = app_state->get_broadcast_group(dfaSet, m); - printf("bc_group:veto %d\n", bc_group); +// printf("bc_group:veto %d\n", bc_group); memset(machine_state->subordinate_votes, 0, MAX_SUBORDINATES); - sprintf(from, "bc:%d", bc_group); + // sprintf(from, "bc:%d", bc_group); return 1; } @@ -274,14 +284,27 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, DfaSet * dfaSet = (DfaSet*) dfaSetPtr; TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); - + /* if (!check_from()) { return 0; } */ short bc_group = app_state->get_broadcast_group(dfaSet, m); - - if(bc_group < dfaSet->networkSetup.broadcast_lists_count) { - state_name ret = tally(dfaSet->networkSetup.broadcast_lists[bc_group], - dfaSet->networkSetup.broadcast_list_host_count[bc_group], - (char*)(machine_state->subordinate_votes), from); + // fprintf(stderr, "tally: %s, broadcast group: %d\n", from, bc_group); + if(bc_group < dfaSet->networkSetup.broadcast_lists_count+1) { + state_name ret; + if(bc_group == ALL_BUT_GROUP_ZERO) { + char ** list; + int count = consolidate_bc_groups(&list, &dfaSet->networkSetup); +/* int i; + for(i = 0; i < count; i++) { + fprintf(stderr, "count = %d tallyhost %d: %s\n", count, i, list[i]); + } */ + ret = tally(list, count, (char*)(machine_state->subordinate_votes), from); + free(list); + } else { + ret = tally(dfaSet->networkSetup.broadcast_lists[bc_group-1], + dfaSet->networkSetup.broadcast_list_host_count[bc_group-1], + (char*)(machine_state->subordinate_votes), from); + } +// fprintf(stderr, "Tally returned: %d", ret); if(ret) { /* Clear subordinate_votes array, so that it can be used to tally acks after the votes are tallied. */ @@ -300,15 +323,21 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, /* TODO: CORRECTNESS BUG Make sure this is after tally forces the log. Also, need to make sure that it increments the (currently unimplemented) sequence number before flushing... */ - - if(ret && (m->response_type == AWAIT_COMMIT_POINT && m->response_type == COORDINATOR_START_2PC)) { +// printf("committed ret = %d response_type = %d "); + if(ret) { + printf("COMMIT POINT %ld\n", m->to_machine_id); + } + if(ret && (m->response_type == AWAIT_COMMIT_POINT && stateMachine->current_state == COORDINATOR_START_2PC)) { +// printf("sending ack to %s", m->initiator); // if(ret && (*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC)) { state_machine_id tmp = m->from_machine_id; m->from_machine_id = m->initiator_machine_id; - //printf("Coordinator responding: ? ht=? (key length %d) %d -> to %s:%ld\n", getKeyLength(m), *(int*)getKeyAddr(m), /*getValAddr(m),*/ m->initiator, m->initiator_machine_id ); +// printf("Coordinator responding: ? ht=? (key length %d) %d -> /*%d*/ to %s:%ld\n", 0/*getKeyLength(m),*/ , 0, 0, /**(int*)getKeyAddr(m), *//**(int*)getValAddr(m),*/ m->initiator, m->initiator_machine_id ); + //debug_print_message(m); respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator); m->from_machine_id = tmp; } + // printf("\n"); return ret; } else { sprintf(from, "bc:%d", bc_group); diff --git a/src/apps/cht/cht.c b/src/apps/cht/cht.c index 1f792df..8050b57 100644 --- a/src/apps/cht/cht.c +++ b/src/apps/cht/cht.c @@ -88,17 +88,17 @@ static int hash( const unsigned char * key, size_t keylen , int table_length) { group, but may only contain a single operation. */ short multiplex_interleaved(DfaSet * dfaSet, Message * m) { - short table_length = dfaSet->networkSetup.broadcast_lists_count-2; + short table_length = dfaSet->networkSetup.broadcast_lists_count; short bc_group; if((*requestType(m) == CREATE) || (*requestType(m) == DELETE)) { /* Special case: Send to all replicas...bc_group one should contain all replicas... */ - bc_group = 1; + bc_group = ALL_BUT_GROUP_ZERO; } else { /* Need to add one so that no requests are assigned to the coordinator (bc:0) */ - bc_group = hash(getKeyAddr(m), getKeyLength(m), table_length) + 2; + bc_group = hash(getKeyAddr(m), getKeyLength(m), table_length) + 1; } - printf("request %d bc group: %d\n", *requestType(m), bc_group); +// printf("request %d bc group: %d\n", *requestType(m), bc_group); return bc_group; @@ -119,21 +119,24 @@ int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine) } } -DfaSet * cHtInit(int cht_type, char * localhost, +DfaSet * cHtInit(int cht_type, /*char * localhost, */ short (* get_broadcast_group)(DfaSet *, Message *), - short port, + /* short port, char *** broadcast_lists, int broadcast_lists_count, - int* broadcast_list_host_count) { + int* broadcast_list_host_count */ + NetworkSetup * ns) { DfaSet * dfaSet; int xid = Tbegin(); +// printf("Init %s port %d\n", ns->localhost, ns->localport); TwoPCAppState * twoPC_state; CHTAppState * chtApp_state; int error; - dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); + // dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); + dfaSet = dfa_malloc(DFA_MACHINE_COUNT, ns); /* srand(time(NULL)); */ @@ -141,9 +144,9 @@ DfaSet * cHtInit(int cht_type, char * localhost, chtApp_state = calloc(1, sizeof(CHTAppState)); if(cht_type == CHT_CLIENT) { - error = dfa_reinitialize(dfaSet, localhost, client_transitions_2pc, client_transition_count_2pc, states_2pc, state_count_2pc); + error = dfa_reinitialize(dfaSet, ns->localhost, client_transitions_2pc, client_transition_count_2pc, states_2pc, state_count_2pc); } else { - error = dfa_reinitialize(dfaSet, localhost, transitions_2pc, transition_count_2pc, states_2pc, state_count_2pc); + error = dfa_reinitialize(dfaSet, ns->localhost, transitions_2pc, transition_count_2pc, states_2pc, state_count_2pc); } if(error < 0) { diff --git a/src/apps/cht/cht.h b/src/apps/cht/cht.h index 91a3508..965944c 100644 --- a/src/apps/cht/cht.h +++ b/src/apps/cht/cht.h @@ -129,13 +129,16 @@ int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht); * * */ -DfaSet * cHtInit(int cht_type, char * localhost, +DfaSet * cHtInit(int cht_type,/* char * localhost,*/ short (* get_broadcast_group)(DfaSet *, Message *), - short port, + /*short port, char *** broadcast_lists, int broadcast_lists_count, - int* broadcast_list_host_count); - + int* broadcast_list_host_count*/ + NetworkSetup * ns); +DfaSet * cHtClientInit(char * config_file); +DfaSet * cHtCoordinatorInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *)); +DfaSet * cHtSubordinateInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *), int subordinate_number); int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet); /*int cHtCommit(state_machine_id xid, DfaSet * dfaSet); int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/ @@ -153,3 +156,5 @@ callback_fcn commit_cht; callback_fcn tally_cht; callback_fcn abort_cht; callback_fcn init_xact_cht; + +short multiplex_interleaved(DfaSet * dfaSet, Message * m); diff --git a/src/apps/cht/cht_client.c b/src/apps/cht/cht_client.c index 770398e..c39b12d 100644 --- a/src/apps/cht/cht_client.c +++ b/src/apps/cht/cht_client.c @@ -135,6 +135,20 @@ int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) { return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; } +DfaSet * cHtClientInit(char * configFile) { + NetworkSetup * config = readNetworkConfig(configFile, 0); + assert(config->coordinator); + printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n", + config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0])); + /* DfaSet * ret = cHtInit(CHT_CLIENT, parse_addr(config->broadcast_lists[0][0]), NULL, + parse_port(config->broadcast_lists[0][0]), config->broadcast_lists+1, config->broadcast_lists_count-1, + config->broadcast_list_host_count+1); */ + DfaSet * ret = cHtInit(CHT_CLIENT, NULL, config); + assert(config->coordinator); + // free (config); + return ret; +} + /*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) { size_t zero = 0; return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); diff --git a/src/apps/cht/cht_message.o b/src/apps/cht/cht_message.o index a159f75..7b2ab5b 100644 Binary files a/src/apps/cht/cht_message.o and b/src/apps/cht/cht_message.o differ diff --git a/src/apps/cht/cht_server.c b/src/apps/cht/cht_server.c index 2398775..63c6fbb 100644 --- a/src/apps/cht/cht_server.c +++ b/src/apps/cht/cht_server.c @@ -45,7 +45,8 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * { if(!ht_exists) { printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(NULL); ret = 0; } else { ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0); - printf("Insert: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), (char*)getValAddr(m)); + printf("Insert: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m)); + fflush(NULL); (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); } @@ -55,7 +56,8 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * { if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); - printf("Lookup: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), (char*)getValAddr(m)); + printf("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m)); + fflush(NULL); } } break; @@ -223,8 +225,8 @@ state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, c /** @todo why was there an assert(0) in commit_cht?? */ /** @todo On a commit, should responses of type AWAIT_RESULT ever be done by subordinates? */ // if(m->response_type == AWAIT_RESULT) { -// printf("commit_cht responding on an AWAIT_RESULT request.\n"); -// assert(0); + // printf("commit_cht responding on an AWAIT_RESULT request.\n"); + // assert(0); /* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */ // } /* TODO: Check error codes, and return accordingly... */ @@ -234,3 +236,28 @@ state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, c state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { return 1; } + +DfaSet * cHtCoordinatorInit(char * configFile, short (*partition_function)(DfaSet *, Message *)) { + NetworkSetup * config = readNetworkConfig(configFile, COORDINATOR); + /* DfaSet * ret = cHtInit(CHT_COORDINATOR, config->localhost, partition_function, + config->localport, config->broadcast_lists, config->broadcast_lists_count, + config->broadcast_list_host_count); + free (config);*/ + DfaSet * ret = cHtInit(CHT_COORDINATOR, partition_function, config); + free(config); + return ret; +} + +DfaSet * cHtSubordinateInit(char * configFile, short (*partition_function)(DfaSet *, Message *), int subordinate_number) { + NetworkSetup * config = readNetworkConfig(configFile, subordinate_number); +/* DfaSet * ret = cHtInit(CHT_SERVER, config->localhost, partition_function, + config->localport, config->broadcast_lists, config->broadcast_lists_count, + config->broadcast_list_host_count);*/ + DfaSet * ret = cHtInit(CHT_SERVER, partition_function, config); + free (config); + return ret; +} +void debug_print_message(Message * m) { + printf("debug: (key length %d) %d -> %d\n", getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m)); + fflush(NULL); +} diff --git a/src/libdfa/callbacks.c b/src/libdfa/callbacks.c index ea319e8..28901a9 100644 --- a/src/libdfa/callbacks.c +++ b/src/libdfa/callbacks.c @@ -40,8 +40,8 @@ permission to use and distribute the software in accordance with the terms specified in this license. ---*/ #include "callbacks.h" -#include "stdio.h" - +#include +#include #define TRUE 1 #define FALSE 0 @@ -74,6 +74,8 @@ state_name tally(char ** broadcast_list, int host_count, char * bitSet, char * f if(index < 0) { printf("Received message from unknown recipient: %s\n", from); + + assert(0); return FALSE; } bitSet[index] = TRUE; @@ -85,4 +87,3 @@ state_name tally(char ** broadcast_list, int host_count, char * bitSet, char * f } return TRUE; } - diff --git a/src/libdfa/libdfa.c b/src/libdfa/libdfa.c index 994f6da..aa61d29 100644 --- a/src/libdfa/libdfa.c +++ b/src/libdfa/libdfa.c @@ -473,7 +473,7 @@ void * inner_worker_loop(void * arg_void) { /** SIGALRM will make sleep return immediately (I hope!)*/ /* printf("pending: %ld, %d\n", stateMachine->machine_id, stateMachine->pending); */ - + /** @todo inner worker loop doesn't seem to 'do the right thing' with respect to timing */ if(1|| !stateMachine->pending) { /* If no pending messages, go to sleep */ struct timeval now; struct timespec timeout_spec; @@ -504,6 +504,8 @@ void * inner_worker_loop(void * arg_void) { now.tv_sec++; usec-=1000000; } + /** @todo ridiculously long timeout in libdfa.c */ + timeout_spec.tv_sec = now.tv_sec;/* + timeout; */ timeout_spec.tv_nsec = 1000 * usec; /*now.tv_usec * 1000; */ @@ -706,7 +708,12 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s initial_sm->message.to_machine_id = recipient_machine_id; initial_sm->message.type = start_state; - strcpy(initial_sm->message.initiator, dfaSet->networkSetup.localhost); + //strcpy(initial_sm->message.initiator, dfaSet->networkSetup.localhost); + char * initiator; + asprintf(&initiator, "%s:%d", dfaSet->networkSetup.localhost, dfaSet->networkSetup.localport); + strcpy(initial_sm->message.initiator, initiator); + free(initiator); + printf("Set message initiator to %s", initial_sm->message.initiator); initial_sm->message.initiator_machine_id = initial_sm->machine_id; strcpy(initial_sm->message_recipient, recipient_addr); @@ -755,8 +762,7 @@ void * run_request(DfaSet * dfaSet, state_machine_id machine_id) { return (void*)ret; } - -DfaSet * dfa_malloc(int count, short port, +DfaSet * dfa_malloc_old(int count, short port, char *** broadcast_lists, int broadcast_lists_count, int * broadcast_list_host_count) { @@ -770,3 +776,11 @@ DfaSet * dfa_malloc(int count, short port, return dfaSet; } + +DfaSet * dfa_malloc(int count, NetworkSetup * ns) { + DfaSet * dfaSet = calloc(1, sizeof(DfaSet)); + dfa_initialize_new(dfaSet, ns->localport, count); + + memcpy(&dfaSet->networkSetup, ns, sizeof(NetworkSetup)); + return dfaSet; +} diff --git a/src/libdfa/messages.c b/src/libdfa/messages.c index de5ee5e..1922d95 100644 --- a/src/libdfa/messages.c +++ b/src/libdfa/messages.c @@ -172,27 +172,46 @@ int _send_message(const NetworkSetup *ns, Message *message, const char *to); int __send_message(const NetworkSetup *ns, Message *message, const char *to) { - DEBUG("Sending %ld-%d: to %s:%ld\n", message->from_machine_id, message->type ,to, message->to_machine_id); if(strncmp(to, "bc:", 3)==0) { int i; int list_number = parse_port(to); - - if(list_number < 0 || list_number >= ns->broadcast_lists_count) { + if(list_number == ALL_BUT_GROUP_ZERO) { + // fprintf(stderr, "Broadcasting to all groups (except group 0).\n"); + for(int i = 1; i < ns->broadcast_lists_count+1; i++) { + char * new_to; + asprintf(&new_to, "bc:%d", i); + int ret = __send_message(ns, message, new_to); + free(new_to); + if(ret < 0) { + return ret; + } + } + return 0; + } else if(list_number < 0 || list_number >= ns->broadcast_lists_count+1) { fprintf(stderr, "Invalid list number %d passed into send_message: %s\n", list_number, to); return -1; } + if(list_number == 0) { + // send to coordinator + // fprintf(stderr, "Sending message to coordinator: %s", ns->coordinator); + return __send_message(ns, message, ns->coordinator); + } if(ns->broadcast_list_host_count[list_number] == 0) { fprintf(stderr, "Sending to empty broadcast list! Address was %s\n", to); } - for(i =0; i < ns->broadcast_list_host_count[list_number]; i++) { + + for(i =0; i < ns->broadcast_list_host_count[list_number-1]; i++) { + // fprintf(stderr, "sending to member %d of list %d\n", i, list_number); int ret; - if((ret = __send_message(ns, message, ns->broadcast_lists[list_number][i])) < 0) { + if((ret = __send_message(ns, message, ns->broadcast_lists[list_number-1][i])) < 0) { return ret; } } } else { + DEBUG("Sending %ld-%d: to %s:%ld\n", message->from_machine_id, message->type ,to, message->to_machine_id); + return _send_message(ns, message, to); } return 0; diff --git a/src/libdfa/networksetup.c b/src/libdfa/networksetup.c index b01c592..739159d 100644 --- a/src/libdfa/networksetup.c +++ b/src/libdfa/networksetup.c @@ -87,7 +87,9 @@ NetworkSetup * readNetworkConfig(char * name, int hostnumber) { : parse_addr(cfg_getnstr(cfg, "subordinates", hostnumber)); ret->socket = -1; /// @todo where should the socket field be initialized? ret->broadcast_lists_count = cfg_size(cfg, "group"); - printf("broadcast list count = %d\n", ret->broadcast_lists_count); + DEBUG("broadcast list count = %d\n", ret->broadcast_lists_count); + ret->coordinator = strdup(cfg_getstr(cfg, "coordinator")); + DEBUG("Coordinator: %s", ret->coordinator); ret->broadcast_list_host_count = malloc(sizeof(int *) * ret->broadcast_lists_count); ret->broadcast_lists = malloc(sizeof(int**) * ret->broadcast_lists_count); for(i = 0; i < ret->broadcast_lists_count; i++) { @@ -108,3 +110,20 @@ NetworkSetup * readNetworkConfig(char * name, int hostnumber) { cfg_free(cfg); return ret; } + +int consolidate_bc_groups(char *** list, NetworkSetup * ns) { + int count = 0; + int i, j; + for(i = 0; i < ns->broadcast_lists_count; i++) { + count += ns->broadcast_list_host_count[i]; + } + *list = malloc(sizeof(char *) * count); + int k = 0; + for(i = 0; i < ns->broadcast_lists_count; i++) { + for(j = 0; j < ns->broadcast_list_host_count[i]; j++) { + (*list)[k] = ns->broadcast_lists[i][j]; + k++; + } + } + return count; +} diff --git a/src/lladd/operations/linearHashNTA.c b/src/lladd/operations/linearHashNTA.c index fe51db7..3631482 100644 --- a/src/lladd/operations/linearHashNTA.c +++ b/src/lladd/operations/linearHashNTA.c @@ -40,7 +40,7 @@ recordid ThashCreate(int xid, int keySize, int valueSize) { lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(lladd_linkedList_entry) + keySize + valueSize); TarrayListExtend(xid, lhh.buckets, HASH_INIT_ARRAY_LIST_COUNT); int i; - byte * entry = calloc(1, sizeof(lhh.buckets)); + byte * entry = calloc(1, lhh.buckets.size); recordid bucket = lhh.buckets; for(i = 0; i < HASH_INIT_ARRAY_LIST_COUNT; i++) { bucket.slot = i; diff --git a/test/2pc/Makefile.am b/test/2pc/Makefile.am index e15e1d0..752f466 100644 --- a/test/2pc/Makefile.am +++ b/test/2pc/Makefile.am @@ -1,4 +1,4 @@ LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \ - $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a + $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a -lconfuse bin_PROGRAMS=always_commit AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/test/2pc/always_commit.c b/test/2pc/always_commit.c index 6cfe6c3..e36902d 100644 --- a/test/2pc/always_commit.c +++ b/test/2pc/always_commit.c @@ -92,7 +92,7 @@ int main (int argc, char ** argv) { assert(argc == 2); port = parse_port(broadcast_lists[0][0]); app_state->is_coordinator = TRUE; - dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, + dfaSet = dfa_malloc_old(DFA_MACHINE_COUNT, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); localhost = broadcast_lists[0][0]; @@ -103,7 +103,7 @@ int main (int argc, char ** argv) { replica = atoi(argv[2]); port = parse_port(broadcast_lists[1][replica]); app_state->is_coordinator = FALSE; - dfaSet = dfa_malloc(DFA_MACHINE_COUNT * 10, port, broadcast_lists, + dfaSet = dfa_malloc_old(DFA_MACHINE_COUNT * 10, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); localhost = broadcast_lists[1][replica]; }else { diff --git a/test/cht/Makefile.am b/test/cht/Makefile.am index 5cff260..2c88e42 100644 --- a/test/cht/Makefile.am +++ b/test/cht/Makefile.am @@ -1,4 +1,4 @@ -LDADD=$(top_builddir)/src/apps/cht/libcht.a $(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \ +LDADD=-lconfuse $(top_builddir)/src/apps/cht/libcht.a $(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a -bin_PROGRAMS=simple cht_server +bin_PROGRAMS=simple cht_server client coordinator subordinate AM_CFLAGS= -g -Wall -pedantic -std=c99 diff --git a/test/cht/cht_server.c b/test/cht/cht_server.c index 15efa4b..f0e1beb 100644 --- a/test/cht/cht_server.c +++ b/test/cht/cht_server.c @@ -117,8 +117,8 @@ int main (int argc, char**argv) { } port = parse_port(localhost); Tinit(); - dfaSet = cHtInit(server_type, localhost, NULL, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); - + // dfaSet = cHtInit(server_type, localhost, NULL, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); + assert(0); main_loop(dfaSet); return -1; diff --git a/test/cht/simple.c b/test/cht/simple.c index 0734647..808eee9 100644 --- a/test/cht/simple.c +++ b/test/cht/simple.c @@ -39,10 +39,10 @@ authors grant the U.S. Government and others acting in its behalf permission to use and distribute the software in accordance with the terms specified in this license. ---*/ -#include #include "../../src/apps/cht/cht.h" #include +#include /** Thanks, jbhtsimple.c!! */ @@ -102,7 +102,8 @@ int main (int argc, char**argv) { broadcast_lists[0] = star_nodes; broadcast_lists[1] = point_nodes; - dfaSet = cHtInit(CHT_CLIENT, argv[2], NULL, atoi(argv[1]), broadcast_lists, broadcast_lists_count, broadcast_list_host_count); +/* dfaSet = cHtInit(CHT_CLIENT, argv[2], NULL, atoi(argv[1]), broadcast_lists, broadcast_lists_count, broadcast_list_host_count); */ + assert(0); spawn_main_thread(dfaSet); @@ -164,6 +165,3 @@ int main (int argc, char**argv) { return 0; } - - - diff --git a/test/dfa/star.c b/test/dfa/star.c index 8700e7d..7b0de7a 100644 --- a/test/dfa/star.c +++ b/test/dfa/star.c @@ -123,7 +123,7 @@ int main (int argc, char ** argv) { port = parse_port(broadcast_lists[list_number][node_number]); - dfaSet = dfa_malloc(DFA_MACHINE_COUNT, port, broadcast_lists, + dfaSet = dfa_malloc_old(DFA_MACHINE_COUNT, port, broadcast_lists, broadcast_lists_count, broadcast_list_host_count); if(list_number == 0) {