diff --git a/src/2pc/2pc.c b/src/2pc/2pc.c index 8c682ff..bb3c693 100644 --- a/src/2pc/2pc.c +++ b/src/2pc/2pc.c @@ -64,7 +64,7 @@ callback_fcn coordinator_continue_xact_2pc; /* Remember to update transition_count_2pc if you add/remove transitions */ -const int transition_count_2pc = 26; +const int transition_count_2pc = 25; Transition transitions_2pc[] = { @@ -76,7 +76,6 @@ Transition transitions_2pc[] = { { XACT_ACK_ARRIVAL, XACT_ACTIVE, XACT_ACTION_RUNNING, coordinator_continue_xact_2pc, FALSE }, { XACT_ACK_RESULT, XACT_ACTIVE, XACT_ACTION_RUNNING, coordinator_continue_xact_2pc, FALSE }, { XACT_COMMIT, XACT_ACTIVE, COORDINATOR_START_2PC, coordinator_continue_xact_2pc, FALSE }, - { XACT_ABORT, XACT_ACTIVE, COORDINATOR_START_2PC, coordinator_continue_xact_2pc, FALSE }, { XACT_SUBORDINATE_ACK, XACT_ACTION_RUNNING, XACT_ACTIVE, &tally_2pc, FALSE }, /* Library user must provide callback that init_xact_2pc calls. */ @@ -112,7 +111,7 @@ Transition transitions_2pc[] = { }; -const int client_transition_count_2pc = 8; +const int client_transition_count_2pc = 7; Transition client_transitions_2pc[] = { @@ -127,13 +126,12 @@ Transition client_transitions_2pc[] = { { XACT_ACTION_RUNNING, XACT_ACK_ARRIVAL, NULL_STATE, NULL, FALSE}, { XACT_ACTIVE, XACT_ACK_RESULT, NULL_STATE, NULL, FALSE}, - { COORDINATOR_COMMITTING_2PC, XACT_COMMIT, NULL_STATE, NULL, FALSE}, - { COORDINATOR_ABORTING_2PC, XACT_ABORT, NULL_STATE, NULL, FALSE} + { COORDINATOR_COMMITTING_2PC, XACT_COMMIT, NULL_STATE, NULL, FALSE} }; -const int state_count_2pc = 16; +const int state_count_2pc = 15; State states_2pc[MAX_STATE_COUNT] = { @@ -167,7 +165,6 @@ State states_2pc[MAX_STATE_COUNT] = { { XACT_ACK_ARRIVAL, NULL, NULL}, { XACT_COMMIT, NULL, NULL}, - { XACT_ABORT, NULL, NULL} }; @@ -336,11 +333,12 @@ state_name commit_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, c int ret = app_state->commit_2pc(dfaSet, stateMachine, m, from); if(ret) { printf("COMMIT %ld\n", m->to_machine_id); } if(ret && m->response_type == AWAIT_RESULT) { + printf("responding w/ commit to %s\n", m->initiator); fflush(stdout); respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, m->initiator); } return ret; } - +/** @todo 2pc can't handle replica groups of size one. */ state_name check_veto_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { /* Clear subordinate_votes array, so that it can be used to tally acks after the votes are tallied. */ @@ -355,6 +353,12 @@ state_name check_veto_2pc(void * dfaSet, StateMachine * stateMachine, Message * memset(machine_state->subordinate_votes, 0, MAX_SUBORDINATES); // sprintf(from, "bc:%d", bc_group); + printf("Sending commit message to %s\n", m->initiator); + respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator); + + if(tally_2pc(dfaSet, stateMachine, m, from)) { + printf("YOU FOUND A BUG: 2pc doesn't support size one replication groups!\n"); + } return 1; } @@ -384,7 +388,7 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, dfaSet->networkSetup.broadcast_list_host_count[bc_group-1], (char*)(machine_state->subordinate_votes), from); } -// fprintf(stderr, "Tally returned: %d", ret); + // fprintf(stderr, "Tally returned: %d", ret); if(ret) { /* Clear subordinate_votes array, so that it can be used to tally acks after the votes are tallied. */ @@ -405,14 +409,15 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, sequence number before flushing... */ + // fprintf(stderr, " response type %d current state %d\n", m->response_type, stateMachine->current_state); if(ret && ( ( ( m->response_type == AWAIT_COMMIT_POINT || - m->response_type==XACT_COMMIT || - m->response_type==XACT_ABORT + m->response_type==XACT_COMMIT ) && ( - stateMachine->current_state == COORDINATOR_START_2PC + stateMachine->current_state == COORDINATOR_START_2PC || + stateMachine->current_state == COORDINATOR_ABORTING_2PC ) ) || ( m->response_type == XACT_ACK_RESULT && @@ -426,9 +431,10 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, m->from_machine_id = m->initiator_machine_id; // printf("Coordinator responding: ? ht=? (key length %d) %d -> /*%d*/ to %s:%ld\n", 0/*getKeyLength(m),*/ , 0, 0, /**(int*)getKeyAddr(m), *//**(int*)getValAddr(m),*/ m->initiator, m->initiator_machine_id ); //debug_print_message(m); - if(m->response_type == AWAIT_COMMIT_POINT || m->response_type == XACT_COMMIT || m->response_type == XACT_ABORT) { + if((m->response_type == AWAIT_COMMIT_POINT) || (m->response_type == XACT_COMMIT)) { printf("COMMIT POINT %ld\n", m->to_machine_id); fflush(stdout); + printf("Sending commit point message to %s\n", m->initiator); fflush(stdout); respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator); } else { // printf("COMPLETED %ld\n", m->to_machine_id); diff --git a/src/2pc/2pc.h b/src/2pc/2pc.h index 5a66910..ed43834 100644 --- a/src/2pc/2pc.h +++ b/src/2pc/2pc.h @@ -101,9 +101,7 @@ gets to reuse the transaction id. #define XACT_ACTION_RUNNING 224 #define XACT_COMMIT 225 -#define XACT_ABORT 226 - -#define XACT_SUBORDINATE_ACK 227 +#define XACT_SUBORDINATE_ACK 226 /** The callbacks are called whenever the transition 'should' succeed. diff --git a/src/apps/cht/cht.c b/src/apps/cht/cht.c index 9d97d79..d130b48 100644 --- a/src/apps/cht/cht.c +++ b/src/apps/cht/cht.c @@ -91,6 +91,7 @@ short multiplex_interleaved(DfaSet * dfaSet, Message * m) { } else { /* Need to add one so that no requests are assigned to the coordinator (bc:0) */ bc_group = hash(getKeyAddr(m), getKeyLength(m), table_length) + 1; + // printf("group %d: %d (%d)\n", bc_group, *(int*)getKeyAddr(m), getKeyLength(m)); fflush(stdout); } DEBUG("request %d bc group: %d\n", *requestType(m), bc_group); diff --git a/src/apps/cht/cht_client.c b/src/apps/cht/cht_client.c index a016335..3834f04 100644 --- a/src/apps/cht/cht_client.c +++ b/src/apps/cht/cht_client.c @@ -97,12 +97,14 @@ static int _chtEval(DfaSet * dfaSet, if (*value_size != 0) { memcpy(key, getKeyAddr(&m), getKeyLength(&m)); } - + *value_size = getValLength(&m); + *key_size = getKeyLength(&m); *xid = m.to_machine_id; DEBUG("+chtEval returning %d\n", m.type); - return m.type; + // return m.type; + return 1; } int cHtCreate(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * new_ht) { @@ -116,11 +118,16 @@ int cHtInsert(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, vo } int cHtLookup(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) { - return _chtEval(dfaSet, LOOKUP, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; + assert(keylen != 0); + int ret = _chtEval(dfaSet, LOOKUP, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; + assert(ret); + return (*datlen != 0); } int cHtRemove(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) { - return _chtEval(dfaSet, REMOVE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; + int ret = _chtEval(dfaSet, REMOVE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; + if(!ret) { return -1; } + return *datlen; } int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht) { @@ -142,8 +149,7 @@ int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) { DfaSet * cHtClientInit(char * configFile) { NetworkSetup * config = readNetworkConfig(configFile, 0); assert(config->coordinator); - // printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n", - // config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0])); + DfaSet * ret = cHtInit(CHT_CLIENT, NULL, config); assert(config->coordinator); free (config); @@ -155,8 +161,9 @@ int cHtCommit(state_machine_id xid, DfaSet * dfaSet) { return _chtEval(dfaSet, COMMIT, XACT_COMMIT, &xid, NULL, NULL, &zero, NULL, &zero); } -/* + int cHtAbort(state_machine_id xid, DfaSet * dfaSet) { size_t zero = 0; - return _chtEval(dfaSet, ABORT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); - }*/ + return _chtEval(dfaSet, ABORT, XACT_COMMIT, &xid, NULL, NULL, &zero, NULL, &zero); + abort(); +} diff --git a/src/apps/cht/cht_message.h b/src/apps/cht/cht_message.h index 9875349..d5ff987 100644 --- a/src/apps/cht/cht_message.h +++ b/src/apps/cht/cht_message.h @@ -10,7 +10,7 @@ #define TSTSET 6 #define GETXID 7 #define COMMIT 8 -/* #define ABORT 9 */ +#define ABORT 9 typedef struct { unsigned short key_length; diff --git a/src/apps/cht/cht_server.c b/src/apps/cht/cht_server.c index 599e070..209cd3a 100644 --- a/src/apps/cht/cht_server.c +++ b/src/apps/cht/cht_server.c @@ -96,10 +96,10 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * assert(valueLength <= getValLength(m)); memcpy(getValAddr(m), new, valueLength); free(new); + setValLength(m, valueLength); } else { - ret = 0; + setValLength(m, 0); } - // ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); DEBUG("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m)); @@ -112,7 +112,12 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * printf ("Hash table doesn't exist!\n"); fflush(stdout); ret = 0; } else { /** @todo we no longer return old value on remove... */ - ret = ThashRemove(xid, ht, getKeyAddr(m), getKeyLength(m)); + int remove_ret = ThashRemove(xid, ht, getKeyAddr(m), getKeyLength(m)); + if(remove_ret == 0) { + setValLength(m, 0); + } else { + setValLength(m, 1); + } } } break; @@ -150,19 +155,15 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * ret = 0; } NOOP */ } break; - case COMMIT: // placeholder (2pc commits for us) - break; - /* - case COMMIT: + case COMMIT: { - ret = (Tcommit(xid) >= 0); + // placeholder (2pc commits for us unless there's an error) } break; - case ABORT: { - ret = (Tabort(xid) >= 0); + ret = 0; // Insert an 'error' to cause 2pc to abort the transaction. } break; - */ + default: { printf("Unknown request type: %d\n", *requestType(m)); diff --git a/src/libdfa/callbacks.c b/src/libdfa/callbacks.c index 28901a9..7cd3397 100644 --- a/src/libdfa/callbacks.c +++ b/src/libdfa/callbacks.c @@ -73,7 +73,7 @@ state_name tally(char ** broadcast_list, int host_count, char * bitSet, char * f index= get_index(broadcast_list, host_count, from); if(index < 0) { - printf("Received message from unknown recipient: %s\n", from); + printf("Received message from unknown recipient: %s\n", from); assert(0); return FALSE; diff --git a/src/libdfa/libdfa.c b/src/libdfa/libdfa.c index 34cd254..9be94e6 100644 --- a/src/libdfa/libdfa.c +++ b/src/libdfa/libdfa.c @@ -218,8 +218,8 @@ void* main_loop(DfaSet *dfaSet) { if(i == dfaSet->transition_count) { - fprintf(stderr, "%ld received: %ld-%d:%d->? (bad message)\n", stateMachine->machine_id, message->from_machine_id, - message->type, current_state); + fprintf(stderr, "%ld received: %ld-%d:%d->? (bad message from %s)\n", stateMachine->machine_id, message->from_machine_id, + message->type, current_state, from); continue; } diff --git a/test/cht/client.c b/test/cht/client.c index 040306c..5c45acd 100644 --- a/test/cht/client.c +++ b/test/cht/client.c @@ -21,16 +21,9 @@ int main(int argc, char ** argv) { clusterHashTable_t * new_ht; cHtCreate(xid, cht_client, new_ht); int i; - for(i = 0; i < 1000; i++) { + for(i = 0; i < 10000; i++) { int one = i; int two = i+1; cHtInsert(xid, cht_client, new_ht, &one, sizeof(int), &two, sizeof(int)); - // xid++; - /* int i =0; - for(i =0; i < 100; i++) { - printf("\n"); - } - fflush(NULL); */ - int newOne, newTwo; newOne = i; newTwo = 0; @@ -44,6 +37,46 @@ int main(int argc, char ** argv) { assert(newLen == sizeof(int)); } cHtCommit(xid, cht_client); + + for(i = 0; i < 10000; i+=10) { + int one = i; int two = -1; + unsigned int size = sizeof(int); + int removed = cHtRemove(xid, cht_client, new_ht, &one, sizeof(int), &two, &size); + assert(removed); + + size = sizeof(int); + two = -1; + removed = cHtRemove(xid, cht_client, new_ht, &one, sizeof(int), &two, &size); + assert(!removed); + + int newOne, newTwo; + newOne = i; + newTwo = 0; + unsigned int newLen = sizeof(int); + int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen); + + assert(!ret); + + } + cHtAbort(xid, cht_client); + exit(0); // @todo this test case should continue on... + for(i = 0; i < 10000; i++) { + int one = i; int two = i+1; + // cHtInsert(xid, cht_client, new_ht, &one, sizeof(int), &two, sizeof(int)); + int newOne, newTwo; + newOne = i; + newTwo = 0; + unsigned int newLen = sizeof(int); + int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen); + // xid++; + //printf("lookup returned %d (%d->%d)\n", ret, newOne, newTwo); + assert(ret); + assert(newOne == one); + assert(newTwo == two); + assert(newLen == sizeof(int)); + } + cHtCommit(xid, cht_client); + /** @todo devise a way to cleanly shut a CHT down. */ // dfa_free(cht_client); diff --git a/test/cht/simple.c b/test/cht/simple.c index a5bae7c..65c23a5 100644 --- a/test/cht/simple.c +++ b/test/cht/simple.c @@ -39,6 +39,7 @@ authors grant the U.S. Government and others acting in its behalf permission to use and distribute the software in accordance with the terms specified in this license. ---*/ +#define _GNU_SOURCE #include #include "../../src/apps/cht/cht.h" #include