diff --git a/libdfa/libdfa.h b/libdfa/libdfa.h index bbd3760..119b284 100644 --- a/libdfa/libdfa.h +++ b/libdfa/libdfa.h @@ -54,6 +54,10 @@ terms specified in this license. */ #define DFA_MACHINE_COUNT 100 +/** Defined in libdfa.c */ +callback_fcn callback_false; +callback_fcn callback_true; + typedef struct dfaSet { /* MonoTree monoTree; */ smash_t * smash; diff --git a/libdfa/statemachine.h b/libdfa/statemachine.h index 642fd4d..3a7be76 100644 --- a/libdfa/statemachine.h +++ b/libdfa/statemachine.h @@ -112,7 +112,6 @@ typedef struct stateMachine { typedef state_name(callback_fcn)(void * dfaSet, StateMachine * stateMachine, Message * m, char * from); - /* All function pointers follow this prototype: TODO @@ -130,7 +129,8 @@ typedef struct state { */ callback_fcn* retry_fcn; /** NULL unless the machine can be aborted while in this state. If - not-null, then it should point to a function that performs a + not-null, then +it should point to a function that performs a No-op or does any house-keeping that should be performed before the machine gets nuked. diff --git a/src/2pc/2pc.c b/src/2pc/2pc.c index 13bac25..8c682ff 100644 --- a/src/2pc/2pc.c +++ b/src/2pc/2pc.c @@ -48,10 +48,6 @@ terms specified in this license. #define FALSE 0 /* #define _TWO_PC 1 */ -const int transition_count_2pc = 14; -const int client_transition_count_2pc = 4; -const int state_count_2pc = 9; - callback_fcn check_veto_2pc; @@ -61,13 +57,28 @@ callback_fcn coordinator_init_xact_2pc; callback_fcn veto_or_prepare_2pc; callback_fcn abort_2pc; callback_fcn commit_2pc; +callback_fcn eval_action_2pc; +/* new fcns */ +callback_fcn coordinator_continue_xact_2pc; /* Remember to update transition_count_2pc if you add/remove transitions */ + +const int transition_count_2pc = 26; + Transition transitions_2pc[] = { /* Coordinator transitions */ + { XACT_ACK_ARRIVAL, NULL_STATE, XACT_ACTION_RUNNING, coordinator_init_xact_2pc, FALSE }, + { XACT_ACK_RESULT, NULL_STATE, XACT_ACTION_RUNNING, coordinator_init_xact_2pc, FALSE }, + + { XACT_ACK_ARRIVAL, XACT_ACTIVE, XACT_ACTION_RUNNING, coordinator_continue_xact_2pc, FALSE }, + { XACT_ACK_RESULT, XACT_ACTIVE, XACT_ACTION_RUNNING, coordinator_continue_xact_2pc, FALSE }, + { XACT_COMMIT, XACT_ACTIVE, COORDINATOR_START_2PC, coordinator_continue_xact_2pc, FALSE }, + { XACT_ABORT, XACT_ACTIVE, COORDINATOR_START_2PC, coordinator_continue_xact_2pc, FALSE }, + { XACT_SUBORDINATE_ACK, XACT_ACTION_RUNNING, XACT_ACTIVE, &tally_2pc, FALSE }, + /* Library user must provide callback that init_xact_2pc calls. */ { AWAIT_ARRIVAL, NULL_STATE, COORDINATOR_START_2PC, coordinator_init_xact_2pc, FALSE }, { AWAIT_COMMIT_POINT, NULL_STATE, COORDINATOR_START_2PC, coordinator_init_xact_2pc, FALSE }, @@ -83,8 +94,12 @@ Transition transitions_2pc[] = { /* Subordinate transitions */ /* veto_or_prepare overrides target state. */ + { XACT_ACTION_RUNNING, NULL_STATE, XACT_ACTIVE, &eval_action_2pc, FALSE}, + { XACT_ACTION_RUNNING, XACT_ACTIVE, XACT_ACTIVE, &eval_action_2pc, FALSE}, + /* Library user must provide the subordinate function pointers for these transitions */ { COORDINATOR_START_2PC, NULL_STATE, OVERRIDDEN_STATE, &veto_or_prepare_2pc, TRUE }, + { COORDINATOR_START_2PC, XACT_ACTIVE, OVERRIDDEN_STATE, &veto_or_prepare_2pc, TRUE }, { COORDINATOR_ABORTING_2PC, SUBORDINATE_PREPARED_2PC, NULL_STATE, &abort_2pc, FALSE}, { COORDINATOR_COMMITTING_2PC, SUBORDINATE_PREPARED_2PC, NULL_STATE, &commit_2pc, FALSE}, { COORDINATOR_ABORTING_2PC, SUBORDINATE_VETO_2PC, NULL_STATE, NULL, FALSE}, @@ -92,9 +107,13 @@ Transition transitions_2pc[] = { /* transition fcn always fails, but sends ack to coordinator */ { COORDINATOR_COMMITTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE}, { COORDINATOR_ABORTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE}, + { COORDINATOR_COMMITTING_2PC, XACT_ACTIVE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE}, + { COORDINATOR_ABORTING_2PC, XACT_ACTIVE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE}, }; +const int client_transition_count_2pc = 8; + Transition client_transitions_2pc[] = { /* Caller transitions */ @@ -106,9 +125,16 @@ Transition client_transitions_2pc[] = { { SUBORDINATE_ACKING_2PC, AWAIT_RESULT, NULL_STATE, NULL, FALSE}, + { XACT_ACTION_RUNNING, XACT_ACK_ARRIVAL, NULL_STATE, NULL, FALSE}, + { XACT_ACTIVE, XACT_ACK_RESULT, NULL_STATE, NULL, FALSE}, + { COORDINATOR_COMMITTING_2PC, XACT_COMMIT, NULL_STATE, NULL, FALSE}, + { COORDINATOR_ABORTING_2PC, XACT_ABORT, NULL_STATE, NULL, FALSE} + }; +const int state_count_2pc = 16; + State states_2pc[MAX_STATE_COUNT] = { /* Coordinator states */ @@ -117,18 +143,32 @@ State states_2pc[MAX_STATE_COUNT] = { { COORDINATOR_COMMITTING_2PC, NULL, NULL }, { COORDINATOR_ABORTING_2PC, NULL, NULL }, + { XACT_ACTION_RUNNING, NULL, NULL }, + /* Subordinate states */ { SUBORDINATE_VETO_2PC, NULL, NULL }, /* Need to think about callback fcns */ { SUBORDINATE_PREPARED_2PC, NULL, NULL }, { SUBORDINATE_ACKING_2PC, NULL, NULL }, + { XACT_SUBORDINATE_ACK, NULL, NULL }, /* Just used for one-time acks. */ + + /* mixed client/subordinate states */ + + { XACT_ACTIVE, callback_false, NULL}, /* Never send a message indicating the current state. */ + /* Client states */ { AWAIT_ARRIVAL, NULL, NULL}, { AWAIT_RESULT, NULL, NULL}, { AWAIT_COMMIT_POINT, NULL, NULL}, + { XACT_ACK_RESULT, NULL, NULL}, + { XACT_ACK_ARRIVAL, NULL, NULL}, + + { XACT_COMMIT, NULL, NULL}, + { XACT_ABORT, NULL, NULL} + }; /* @@ -141,6 +181,40 @@ State states_2pc[MAX_STATE_COUNT] = { */ +state_name coordinator_continue_xact_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + int ret; + TwoPCMachineState * state = (TwoPCMachineState*) &(stateMachine->app_state); + TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); + + /** Overwrite the from parameter, with the appropriate broadcast group. */ + short bc_group = app_state->get_broadcast_group(dfaSet, m); + sprintf(from, "bc:%d\n", bc_group); + + memset(state->subordinate_votes, 0, MAX_SUBORDINATES); + + if(app_state->continue_xact_2pc != NULL) { + ret = app_state->continue_xact_2pc(dfaSet, stateMachine, m, from); + } else { + ret = 1; + } + + // printf("CONTINUE %ld\n", m->initiator_machine_id); + if(m->type==AWAIT_ARRIVAL && ret) { + // need to (n)ack the client: + // Respond using the machine id expected by the client. + m->from_machine_id = m->initiator_machine_id; + + /** @todo What if the message to the client is dropped? Is there an easy way to just ACK if it resends? */ + printf("ACK %ld (to %s)\n", m->initiator_machine_id, m->initiator); + + respond_once(&((DfaSet*)dfaSet)->networkSetup, + COORDINATOR_START_2PC, m, m->initiator); + } + + m->from_machine_id = stateMachine->machine_id; + + return ret; +} /* Probably should ack the client that called commit(), instead of hoping that UDP got it here for them... ;) So, they should have a @@ -161,8 +235,6 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine, memset(state->subordinate_votes, 0, MAX_SUBORDINATES); state->xid = stateMachine->machine_id; - // handled by the client. - /*strncpy(state->initiator, from, MAX_ADDRESS_LENGTH);*/ if(strncmp(m->initiator, from, MAX_ADDRESS_LENGTH)) { printf("WARNING: Mismatch between request source (%s) and initiator field (%s). Proceeding. (Trusting the client)\n", m->initiator, from); @@ -170,23 +242,16 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine, sprintf(from, "bc:%d\n", bc_group); - /* TODO: (n)ack the client. (Implies yes / no / already pending return values for callback on last line) - Currently, this is handled by the library user. It could be moved back into here. - - */ if(app_state->init_xact_2pc != NULL) { ret = app_state->init_xact_2pc(dfaSet, stateMachine, m, from); } else { ret = 1; } - printf("INIT %ld\n", m->initiator_machine_id); + printf("INIT %ld\n", m->initiator_machine_id); fflush(stdout); if(m->type==AWAIT_ARRIVAL && ret) { - // need to (n)ack the client: - // Respond using the machine id expected by the client. + // (n)ack the client: Respond using the machine id expected by the client. m->from_machine_id = m->initiator_machine_id; - - // printf("Responding\n"); printf("ACK %ld (to %s)\n", m->initiator_machine_id, m->initiator); @@ -213,7 +278,7 @@ state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Messa TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); int ret = app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from); - // printf("veto_or_prepare returned: %d", ret); + if(ret == SUBORDINATE_VETO_2PC) { printf("VETO %ld\n", m->to_machine_id); } else { @@ -223,6 +288,22 @@ state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Messa return ret; } +state_name eval_action_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); + int ret = app_state->eval_action_2pc(dfaSet, stateMachine, m, from); + if(ret) { + + respond_once(&((DfaSet*)dfaSet)->networkSetup, + XACT_SUBORDINATE_ACK, m, from); + } + /* Just ack with the respond_once. Don't need to change states. */ + if(stateMachine->current_state == XACT_ACTIVE) { + return 0; + } else { + return 1; + } +} + /** TODO: The next two functions should fork and immediately return true. */ @@ -285,18 +366,17 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); - /* if (!check_from()) { return 0; } */ short bc_group = app_state->get_broadcast_group(dfaSet, m); - // fprintf(stderr, "tally: %s, broadcast group: %d\n", from, bc_group); + // fprintf(stderr, "tally: %s, broadcast group: %d\n", from, bc_group); if(bc_group < dfaSet->networkSetup.broadcast_lists_count+1) { state_name ret; if(bc_group == ALL_BUT_GROUP_ZERO) { char ** list; int count = consolidate_bc_groups(&list, &dfaSet->networkSetup); -/* int i; - for(i = 0; i < count; i++) { + // int i; + /* for(i = 0; i < count; i++) { fprintf(stderr, "count = %d tallyhost %d: %s\n", count, i, list[i]); - } */ + } */ ret = tally(list, count, (char*)(machine_state->subordinate_votes), from); free(list); } else { @@ -323,18 +403,37 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, /* TODO: CORRECTNESS BUG Make sure this is after tally forces the log. Also, need to make sure that it increments the (currently unimplemented) sequence number before flushing... */ -// printf("committed ret = %d response_type = %d "); - if(ret) { - printf("COMMIT POINT %ld\n", m->to_machine_id); - } - if(ret && (m->response_type == AWAIT_COMMIT_POINT && stateMachine->current_state == COORDINATOR_START_2PC)) { -// printf("sending ack to %s", m->initiator); -// if(ret && (*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC)) { + + + if(ret && ( + ( + ( + m->response_type == AWAIT_COMMIT_POINT || + m->response_type==XACT_COMMIT || + m->response_type==XACT_ABORT + ) && ( + stateMachine->current_state == COORDINATOR_START_2PC + ) + ) || ( + m->response_type == XACT_ACK_RESULT && + stateMachine->current_state == XACT_ACTION_RUNNING + ) + ) + ) { + // printf("sending ack to %s (%d)\n", m->initiator, m->response_type); fflush(stdout); + state_machine_id tmp = m->from_machine_id; m->from_machine_id = m->initiator_machine_id; // printf("Coordinator responding: ? ht=? (key length %d) %d -> /*%d*/ to %s:%ld\n", 0/*getKeyLength(m),*/ , 0, 0, /**(int*)getKeyAddr(m), *//**(int*)getValAddr(m),*/ m->initiator, m->initiator_machine_id ); //debug_print_message(m); - respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator); + if(m->response_type == AWAIT_COMMIT_POINT || m->response_type == XACT_COMMIT || m->response_type == XACT_ABORT) { + printf("COMMIT POINT %ld\n", m->to_machine_id); + fflush(stdout); + respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator); + } else { + // printf("COMPLETED %ld\n", m->to_machine_id); + respond_once(&((DfaSet*)dfaSet)->networkSetup, XACT_ACTIVE, m, m->initiator); + } m->from_machine_id = tmp; } // printf("\n"); diff --git a/src/2pc/2pc.h b/src/2pc/2pc.h index f97de05..5a66910 100644 --- a/src/2pc/2pc.h +++ b/src/2pc/2pc.h @@ -95,6 +95,16 @@ gets to reuse the transaction id. #define AWAIT_COMMIT_POINT 212 #define AWAIT_RESULT 213 +#define XACT_ACK_RESULT 221 +#define XACT_ACK_ARRIVAL 222 +#define XACT_ACTIVE 223 +#define XACT_ACTION_RUNNING 224 + +#define XACT_COMMIT 225 +#define XACT_ABORT 226 + +#define XACT_SUBORDINATE_ACK 227 + /** The callbacks are called whenever the transition 'should' succeed. Other than tally_2pc, they are always called when a @@ -132,6 +142,8 @@ typedef struct { two broadcast groups.) */ char is_coordinator; callback_fcn *init_xact_2pc; + callback_fcn *continue_xact_2pc; + callback_fcn *eval_action_2pc; callback_fcn *veto_or_prepare_2pc; callback_fcn *abort_2pc; callback_fcn *commit_2pc; diff --git a/src/apps/cht/cht.c b/src/apps/cht/cht.c index 63e2b37..9d97d79 100644 --- a/src/apps/cht/cht.c +++ b/src/apps/cht/cht.c @@ -101,17 +101,9 @@ short multiplex_interleaved(DfaSet * dfaSet, Message * m) { state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from); - - - int xid_exists(int ht_xid, recordid xid_ht, StateMachine * stateMachine) { int * xid = 0; - /* if (-1 == jbHtLookup(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)) { - return 0; - } else { - assert(xid); - return xid; - } */ + int size = ThashLookup(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte**)&xid); @@ -155,15 +147,15 @@ DfaSet * cHtInit(int cht_type, } if(cht_type != CHT_CLIENT) { - /* chtApp_state->xid_ht = jbHtCreate(xid, 79); - chtApp_state->ht_ht = jbHtCreate(xid, 79); */ chtApp_state->xid_ht = ThashCreate(xid, sizeof(state_machine_id), sizeof(int)); - chtApp_state->ht_ht = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); + chtApp_state->ht_ht = ThashCreate(xid, sizeof(clusterHashTable_t), sizeof(recordid)); chtApp_state->ht_xid = Tbegin(); // !!!! chtApp_state->next_hashTableId = 0; twoPC_state->is_coordinator = (cht_type == CHT_COORDINATOR); twoPC_state->init_xact_2pc = init_xact_cht; + twoPC_state->continue_xact_2pc = NULL; + twoPC_state->eval_action_2pc = eval_action_cht; twoPC_state->veto_or_prepare_2pc = veto_or_prepare_cht; twoPC_state->abort_2pc = abort_cht; twoPC_state->commit_2pc = commit_cht; diff --git a/src/apps/cht/cht.h b/src/apps/cht/cht.h index 6bedeb0..aacf122 100644 --- a/src/apps/cht/cht.h +++ b/src/apps/cht/cht.h @@ -140,8 +140,8 @@ DfaSet * cHtClientInit(char * config_file); DfaSet * cHtCoordinatorInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *)); DfaSet * cHtSubordinateInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *), int subordinate_number); int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet); -/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet); - int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/ +int cHtCommit(state_machine_id xid, DfaSet * dfaSet); +/* int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/ /** The server side state for a CHT. */ typedef struct { @@ -155,6 +155,7 @@ typedef struct { } CHTAppState; +callback_fcn eval_action_cht; callback_fcn veto_or_prepare_cht; callback_fcn commit_cht; callback_fcn tally_cht; diff --git a/src/apps/cht/cht_client.c b/src/apps/cht/cht_client.c index 5fc3aef..a016335 100644 --- a/src/apps/cht/cht_client.c +++ b/src/apps/cht/cht_client.c @@ -4,6 +4,10 @@ #include #include #include + + +#define REQUEST_TYPE XACT_ACK_RESULT +//#define REQUEST_TYPE AWAIT_COMMIT_POINT /** The client side function that 'does everything' @@ -103,25 +107,25 @@ static int _chtEval(DfaSet * dfaSet, int cHtCreate(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * new_ht) { size_t zero = 0; - return _chtEval(dfaSet, CREATE, AWAIT_COMMIT_POINT, &xid, new_ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; + return _chtEval(dfaSet, CREATE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, new_ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; } int cHtInsert(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t datlen) { - return _chtEval(dfaSet, INSERT, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, &datlen) != SUBORDINATE_VETO_2PC; + return _chtEval(dfaSet, INSERT, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, &datlen) != SUBORDINATE_VETO_2PC; } int cHtLookup(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) { - return _chtEval(dfaSet, LOOKUP, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; + return _chtEval(dfaSet, LOOKUP, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; } int cHtRemove(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) { - return _chtEval(dfaSet, REMOVE, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; + return _chtEval(dfaSet, REMOVE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; } int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht) { size_t zero = 0; - return _chtEval(dfaSet, DELETE, AWAIT_COMMIT_POINT, &xid, ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; + return _chtEval(dfaSet, DELETE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; } int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) { @@ -132,26 +136,26 @@ int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) { be serviced exactly once, but will not conflict with real transactions or other begins.*/ - return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; + return _chtEval(dfaSet, GETXID, /*AWAIT_ARRIVAL,*/ REQUEST_TYPE, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; } DfaSet * cHtClientInit(char * configFile) { NetworkSetup * config = readNetworkConfig(configFile, 0); assert(config->coordinator); - printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n", - config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0])); + // printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n", + // config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0])); DfaSet * ret = cHtInit(CHT_CLIENT, NULL, config); assert(config->coordinator); free (config); return ret; } -/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) { +int cHtCommit(state_machine_id xid, DfaSet * dfaSet) { size_t zero = 0; - return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); + return _chtEval(dfaSet, COMMIT, XACT_COMMIT, &xid, NULL, NULL, &zero, NULL, &zero); } - +/* int cHtAbort(state_machine_id xid, DfaSet * dfaSet) { size_t zero = 0; return _chtEval(dfaSet, ABORT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); diff --git a/src/apps/cht/cht_message.c b/src/apps/cht/cht_message.c index fdbc26e..a966e6a 100644 --- a/src/apps/cht/cht_message.c +++ b/src/apps/cht/cht_message.c @@ -1,3 +1,4 @@ +#include "cht.h" #include "cht_message.h" #include void * getKeyAddr(Message *m) { diff --git a/src/apps/cht/cht_message.h b/src/apps/cht/cht_message.h index 0fda337..9875349 100644 --- a/src/apps/cht/cht_message.h +++ b/src/apps/cht/cht_message.h @@ -1,3 +1,4 @@ + #include #define CREATE 1 @@ -8,15 +9,14 @@ /** Unimplemented: Evaluate a function call from a table provided by the library user. */ #define TSTSET 6 #define GETXID 7 -/* #define COMMIT 8 - #define ABORT 9 */ +#define COMMIT 8 +/* #define ABORT 9 */ typedef struct { unsigned short key_length; unsigned short value_length; unsigned char request_type; - // unsigned char response_type; - int hashTable; + clusterHashTable_t hashTable; } payload_header; #define __header_ptr(m) ((payload_header*)(&((m)->payload))) diff --git a/src/apps/cht/cht_server.c b/src/apps/cht/cht_server.c index 8f8e6ed..599e070 100644 --- a/src/apps/cht/cht_server.c +++ b/src/apps/cht/cht_server.c @@ -3,75 +3,84 @@ #include #include #include -/*#define setup_vars \ -TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \ -CHTAppState * app_state_cht = app_state_2pc->app_state; \ -jbHashTable_t * xid_ht = app_state_cht->xid_ht; \ -jbHashTable_t * ht_ht = app_state_cht->ht_ht; \ -int ht_xid = app_state_cht->ht_xid; \ -int xid; \ -int xid_exists = (-1 != jbHtLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)); \ -jbHashTable_t ht; \ -int ht_exists = (-1 != jbHtLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte*) &ht))*/ + #define setup_vars \ TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \ CHTAppState * app_state_cht = app_state_2pc->app_state; \ recordid xid_ht = app_state_cht->xid_ht; \ -recordid ht_ht = app_state_cht->ht_ht; \ -int ht_xid = app_state_cht->ht_xid; \ -int xid; \ -int * xid_ptr = 0; \ -int xid_exists = (-1 != ThashLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte**)&xid_ptr)); \ -if(xid_ptr) { xid=*xid_ptr; free(xid_ptr); }\ -recordid ht; \ -recordid * ht_ptr = 0; \ -int ht_exists = (-1 != ThashLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte**) &ht_ptr)); \ -if(ht_ptr) { ht=*ht_ptr; free(ht_ptr) ;} +int ht_xid = app_state_cht->ht_xid; -/** TODO For now, we ignore the possiblity that jbHashTable's functions +int getXid(int ht_xid, recordid xid_ht, state_machine_id id) { + int * xid; + int ret; + int size = ThashLookup(ht_xid, xid_ht, (byte*)&id, sizeof(state_machine_id), (byte**)&xid); + if(size == sizeof(int)) { + ret = *xid; + free(xid); + } else { + assert(size == -1); + ret = -1; + } + return ret; +} + + +recordid getHashTable (int ht_xid, recordid ht_ht, clusterHashTable_t hashTable) { + recordid * ht; + recordid ret; + int size = ThashLookup(ht_xid, ht_ht, (byte*)&(hashTable), sizeof(clusterHashTable_t), (byte**)&ht); + if(size == sizeof(recordid)) { + ret = *ht; + free(ht); + } else { + assert(size == -1); + ret.page = 0; + ret.slot = 0; + ret.size = -1; + } + return ret; +} + + +/** TODO For now, we ignore the possiblity that LLADD's functions return error codes. Instead, we assume that they always succeed. */ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - + setup_vars; + + recordid ht = getHashTable(ht_xid, app_state_cht->ht_ht, __header_ptr(m)->hashTable); + int xid = getXid(ht_xid, xid_ht, stateMachine->machine_id); + int ht_exists = (ht.size != -1); + int ret = 1; switch(*requestType(m)) { case CREATE: { recordid new = ThashCreate(ht_xid, VARIABLE_LENGTH, VARIABLE_LENGTH); - // jbHashTable_t * new = jbHtCreate(ht_xid, 79); - // if(new != NULL) { - ThashInsert(ht_xid, ht_ht, + + ThashInsert(ht_xid, app_state_cht->ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&new, sizeof(recordid)); - // ret = (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)new, sizeof(jbHashTable_t)) >= 0); - // } else { - // ret = 0; - // } - // if(ret) { - printf("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable)); fflush(stdout); - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); - // } else { - // printf("Failed to insert new hash table slice!"); - // } - + + DEBUG("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable)); + //Tcommit(app_state_cht->ht_xid); + //app_state_cht->ht_xid = Tbegin(); + } break; case INSERT: { if(!ht_exists) { - printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(stdout); ret = 0; + printf ("Hash table %d doesn't exist!\n", __header_ptr(m)->hashTable.id); fflush(stdout); ret = 0; } else { - //ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0); + ThashInsert(xid, ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)); - printf("Insert: %d ht=%d (key length %d) %d -> %d\n", ret, + + DEBUG("Insert: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m)); - fflush(stdout); - - // (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); } } break; @@ -91,10 +100,9 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * ret = 0; } // ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); - printf("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret, + DEBUG("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m)); - fflush(stdout); } } break; @@ -103,22 +111,17 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(stdout); ret = 0; } else { - /** @todo no longer return old value on remove... */ + /** @todo we no longer return old value on remove... */ ret = ThashRemove(xid, ht, getKeyAddr(m), getKeyLength(m)); - // ret = (jbHtRemove(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); - // (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); } } break; case DELETE: { - if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { - ThashRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m)); - // jbHtRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m), NULL); - // (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); - /* ret = (jbHtDelete(xid, &ht) >= 0); */ /* Don't need this--jbHtDelete just frees the (stack!) pointer. */ - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); + if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(stdout); ret = 0; } else { + ThashRemove(xid, app_state_cht->ht_ht, getKeyAddr(m), getKeyLength(m)); + //Tcommit(app_state_cht->ht_xid); + //app_state_cht->ht_xid = Tbegin(); } } break; @@ -147,14 +150,16 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * ret = 0; } NOOP */ } break; - - /* case COMMIT: + case COMMIT: // placeholder (2pc commits for us) + break; + /* + case COMMIT: { ret = (Tcommit(xid) >= 0); } break; case ABORT: - { + { ret = (Tabort(xid) >= 0); } break; */ @@ -184,70 +189,98 @@ state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m memcpy(&(__header_ptr(m)->hashTable), &new_cht, sizeof(clusterHashTable_t)); - printf("Allocated hashtable %d\n", new_cht.id); + DEBUG("Allocated hashtable %d\n", new_cht.id); } - printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), m->response_type, *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id); + // printf("requestType: %d, responseType: %d from %s:%ld\n", *requestType(m), m->response_type, /**(int*)getKeyAddr(m),*/ m->initiator, m->initiator_machine_id); return 1; } /* Begins new transaction, does the work the transaction requests, stores the message, and returns the corresponding error code. */ state_name veto_or_prepare_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - state_name ret; setup_vars; -// int xid_exists = (-1 != jbHtLookup(ht_xid, app_state_cht->xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id, &xid)); - - if(xid_exists) { printf("Warning: Stale xid found!\n"); } - assert(!xid_exists); - printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), m->response_type, *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id); - + int xid = getXid(ht_xid, app_state_cht->xid_ht, stateMachine->machine_id); + int xid_exists = (-1 != xid); + if(stateMachine->current_state != XACT_ACTIVE) { + if(xid_exists) { printf("Warning: Stale xid found!\n"); } + assert(!xid_exists); + + // printf("requestType: %d, responseType: %d from %s:%ld\n", *requestType(m), m->response_type, /**(int*)getKeyAddr(m),*/ m->initiator, m->initiator_machine_id); + /* This is the start of a new transaction */ - xid = Tbegin(); // !!!! - if(xid < 0) { - - printf("Tbegin failed; %d\n", xid); - - // } else if(jbHtInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)) == -1) { - } else { - ThashInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)); - xid_exists = 1; + xid = Tbegin(); + if(xid < 0) { + printf("Tbegin failed; %d\n", xid); + } else { + ThashInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)); + xid_exists = 1; + } } - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); + + ret = do_work(dfaSet, stateMachine, m, from); + + ret = ret ? SUBORDINATE_PREPARED_2PC : SUBORDINATE_VETO_2PC; + + if(ret == SUBORDINATE_VETO_2PC) { + abort_cht(dfaSet, stateMachine, m, from); + } + return ret; +} +state_name eval_action_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + state_name ret; + setup_vars; + + int xid = getXid(ht_xid, app_state_cht->xid_ht, stateMachine->machine_id); + int xid_exists = (-1 != xid); + if(xid_exists) { + assert(stateMachine->current_state == XACT_ACTIVE); + } else { + + /* This is the start of a new transaction */ + xid = Tbegin(); + if(xid < 0) { + printf("Tbegin failed; %d\n", xid); + } else { + ThashInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)); + xid_exists = 1; + } + + } + + // printf("requestType: %d, responseType: %d from %s:%ld\n", *requestType(m), m->response_type, /**(int*)getKeyAddr(m),*/ m->initiator, m->initiator_machine_id); if(xid_exists) { ret = do_work(dfaSet, stateMachine, m, from); - ret = ret ? SUBORDINATE_PREPARED_2PC : SUBORDINATE_VETO_2PC; - - } else { - - ret = SUBORDINATE_VETO_2PC; - + assert(ret); // for now! + } - if(ret == SUBORDINATE_VETO_2PC) { - abort_cht(dfaSet, stateMachine, m, from); - } + // if(ret == SUBORDINATE_VETO_2PC) { + // abort_cht(dfaSet, stateMachine, m, from); + // } return ret; } state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { setup_vars; + int xid = getXid(ht_xid, app_state_cht->xid_ht, stateMachine->machine_id); + int xid_exists = (-1 != xid); + printf("Aborting!!\n"); assert(xid_exists); Tabort(xid); // !!!! - // jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid); + ThashRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id)); - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); + // Tcommit(app_state_cht->ht_xid); + // app_state_cht->ht_xid = Tbegin(); return 1; } @@ -255,21 +288,17 @@ state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, ch state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { setup_vars; - + + int xid = getXid(ht_xid, app_state_cht->xid_ht, stateMachine->machine_id); + int xid_exists = (-1 != xid); + assert(xid_exists); Tcommit(xid); - // jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid); + ThashRemove(app_state_cht->ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id)); Tcommit(app_state_cht->ht_xid); app_state_cht->ht_xid = Tbegin(); - /* }*/ - /** @todo why was there an assert(0) in commit_cht?? */ - /** @todo On a commit, should responses of type AWAIT_RESULT ever be done by subordinates? */ -// if(m->response_type == AWAIT_RESULT) { - // printf("commit_cht responding on an AWAIT_RESULT request.\n"); - // assert(0); - /* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */ -// } + /* TODO: Check error codes, and return accordingly... */ return 1; } @@ -280,10 +309,6 @@ state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, ch DfaSet * cHtCoordinatorInit(char * configFile, short (*partition_function)(DfaSet *, Message *)) { NetworkSetup * config = readNetworkConfig(configFile, COORDINATOR); - /* DfaSet * ret = cHtInit(CHT_COORDINATOR, config->localhost, partition_function, - config->localport, config->broadcast_lists, config->broadcast_lists_count, - config->broadcast_list_host_count); - free (config);*/ DfaSet * ret = cHtInit(CHT_COORDINATOR, partition_function, config); free(config); return ret; @@ -291,14 +316,10 @@ DfaSet * cHtCoordinatorInit(char * configFile, short (*partition_function)(DfaSe DfaSet * cHtSubordinateInit(char * configFile, short (*partition_function)(DfaSet *, Message *), int subordinate_number) { NetworkSetup * config = readNetworkConfig(configFile, subordinate_number); -/* DfaSet * ret = cHtInit(CHT_SERVER, config->localhost, partition_function, - config->localport, config->broadcast_lists, config->broadcast_lists_count, - config->broadcast_list_host_count);*/ DfaSet * ret = cHtInit(CHT_SERVER, partition_function, config); free (config); return ret; } void debug_print_message(Message * m) { printf("debug: (key length %d) %d -> %d\n", getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m)); - fflush(NULL); } diff --git a/src/libdfa/libdfa.c b/src/libdfa/libdfa.c index 46484f5..34cd254 100644 --- a/src/libdfa/libdfa.c +++ b/src/libdfa/libdfa.c @@ -328,6 +328,15 @@ void* main_loop(DfaSet *dfaSet) { } } +state_name callback_false(void * dfaSet, StateMachine * stateMachine, + Message * message, char * message_recipient) { + return 0; +} +state_name callback_true(void * dfaSet, StateMachine * stateMachine, + Message * message, char * message_recipient) { + return 1; +} + void * inner_worker_loop(void * arg_void) { WorkerLoopArgs * arg = arg_void; @@ -336,7 +345,7 @@ void * inner_worker_loop(void * arg_void) { int timeout = 0; /* Run through the loop immediately the first time around. */ int state = 0; - + // int first = 1; StateMachine* stateMachine; @@ -416,11 +425,22 @@ void * inner_worker_loop(void * arg_void) { if(dfaSet->states[i].name == stateMachine->current_state) { state_idx = i; } - } + } DEBUG("Worker loop for state machine: %ld still active\n", machine_id); - send_message(&(dfaSet->networkSetup), &(stateMachine->message), stateMachine->message_recipient); + int send = 1; + if(dfaSet->states[state_idx].retry_fcn != NULL) { + send = dfaSet->states[state_idx].retry_fcn(dfaSet, stateMachine, &(stateMachine->message), stateMachine->message_recipient); + } + if(send) { + /* if(first) { + first = 0; + } else { + printf("Resending message. Machine # %ld State # %d\n", stateMachine->machine_id, stateMachine->current_state); + } */ + send_message(&(dfaSet->networkSetup), &(stateMachine->message), stateMachine->message_recipient); + } } @@ -530,7 +550,7 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s asprintf(&initiator, "%s:%d", dfaSet->networkSetup.localhost, dfaSet->networkSetup.localport); strcpy(initial_sm->message.initiator, initiator); free(initiator); - DEBUG("Set message initiator to %s", initial_sm->message.initiator); + // printf("Set message initiator to %s\n", initial_sm->message.initiator); fflush(stdout); initial_sm->message.initiator_machine_id = initial_sm->machine_id; strcpy(initial_sm->message_recipient, recipient_addr); @@ -539,7 +559,6 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s ret = (int)run_request(dfaSet, machine_id); - writelock(dfaSet->lock, machine_id); assert(initial_sm == getSmash(dfaSet->smash, machine_id)); if(message != NULL) { diff --git a/test/cht/client.c b/test/cht/client.c index 7c3670f..040306c 100644 --- a/test/cht/client.c +++ b/test/cht/client.c @@ -21,10 +21,10 @@ int main(int argc, char ** argv) { clusterHashTable_t * new_ht; cHtCreate(xid, cht_client, new_ht); int i; - for(i = 0; i < 100; i++) { + for(i = 0; i < 1000; i++) { int one = i; int two = i+1; cHtInsert(xid, cht_client, new_ht, &one, sizeof(int), &two, sizeof(int)); - xid++; + // xid++; /* int i =0; for(i =0; i < 100; i++) { printf("\n"); @@ -36,14 +36,14 @@ int main(int argc, char ** argv) { newTwo = 0; unsigned int newLen = sizeof(int); int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen); - xid++; + // xid++; //printf("lookup returned %d (%d->%d)\n", ret, newOne, newTwo); assert(ret); assert(newOne == one); assert(newTwo == two); assert(newLen == sizeof(int)); } - + cHtCommit(xid, cht_client); /** @todo devise a way to cleanly shut a CHT down. */ // dfa_free(cht_client);