diff --git a/libdfa/messages.h b/libdfa/messages.h index bef9377..513a48c 100644 --- a/libdfa/messages.h +++ b/libdfa/messages.h @@ -81,6 +81,7 @@ typedef struct message { state_machine_id from_machine_id; state_machine_id initiator_machine_id; char initiator[MAX_ADDRESS_LENGTH]; + unsigned char response_type; message_name type; /** Payload is a byte array of arbitrary length. **/ char payload[MAX_PAYLOAD]; diff --git a/src/2pc/2pc.c b/src/2pc/2pc.c index 4133f77..34c1b61 100644 --- a/src/2pc/2pc.c +++ b/src/2pc/2pc.c @@ -186,7 +186,18 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine, } else { ret = 1; } - /* Where was this before?? */ + + if(m->type==AWAIT_ARRIVAL && ret) { + // need to (n)ack the client: + // Respond using the machine id expected by the client. + m->from_machine_id = m->initiator_machine_id; + + printf("Responding\n"); + + respond_once(&((DfaSet*)dfaSet)->networkSetup, + COORDINATOR_START_2PC, m, m->initiator); + } + m->from_machine_id = stateMachine->machine_id; return ret; @@ -204,7 +215,9 @@ state_name send_ack_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); - return app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from); + int ret = app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from); + + return ret; } /** @@ -213,10 +226,21 @@ state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Messa state_name abort_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); - send_ack_2pc(dfaSet, stateMachine, m, from); - - return app_state->abort_2pc(dfaSet, stateMachine, m, from); + int ret = app_state->abort_2pc(dfaSet, stateMachine, m, from); + + //if((*responseType(m) == AWAIT_COMMIT_POINT || *responseType(m) == AWAIT_RESULT)) { + if(m->response_type == AWAIT_COMMIT_POINT || m->response_type == AWAIT_RESULT) { + state_machine_id tmp = m->from_machine_id; + + /* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */ + m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/ + + printf("Response being sent to: %s:%ld\n", m->initiator, m->to_machine_id); + respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_VETO_2PC, m, m->initiator); + m->from_machine_id = tmp; + } + return ret; } state_name commit_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { @@ -270,11 +294,22 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, sprintf(from, "bc:%d", bc_group); - if(ret && app_state->tally_2pc != NULL) { - return app_state->tally_2pc(dfaSet, stateMachine, m, from); - } else { - return ret; + if(ret && app_state->tally_2pc != NULL) { + ret = app_state->tally_2pc(dfaSet, stateMachine, m, from); } + /* TODO: CORRECTNESS BUG Make sure this is after tally forces the log. Also, need to + make sure that it increments the (currently unimplemented) + sequence number before flushing... */ + + if(ret && (m->response_type == AWAIT_COMMIT_POINT && m->response_type == COORDINATOR_START_2PC)) { +// if(ret && (*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC)) { + state_machine_id tmp = m->from_machine_id; + m->from_machine_id = m->initiator_machine_id; + //printf("Coordinator responding: ? ht=? (key length %d) %d -> to %s:%ld\n", getKeyLength(m), *(int*)getKeyAddr(m), /*getValAddr(m),*/ m->initiator, m->initiator_machine_id ); + respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator); + m->from_machine_id = tmp; + } + return ret; } else { sprintf(from, "bc:%d", bc_group); @@ -282,4 +317,3 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m, } } - diff --git a/src/2pc/2pc.h b/src/2pc/2pc.h index 8d13c8a..f97de05 100644 --- a/src/2pc/2pc.h +++ b/src/2pc/2pc.h @@ -41,6 +41,8 @@ terms specified in this license. ---*/ #include +#ifndef __TWOPC_H +#define __TWOPC_H #define MAX_SUBORDINATES 10 /** To use this library, you need to implement: @@ -69,7 +71,9 @@ the current implementation, but no scheduling is done. The first request that retries after a transaction completes is the one that gets to reuse the transaction id. - +@todo Users of 2pc should not have to deal with dfaSets, so they shouldn't + be passed into the callbacks. Instead, only pass in the app-specific + payload. */ /* These will generally be defined by the user of the library. @@ -151,3 +155,4 @@ extern Transition client_transitions_2pc[]; extern Transition transitions_2pc[]; extern State states_2pc[]; /* #endif */ +#endif /*__TWOPC_H */ diff --git a/src/apps/cht/Makefile.am b/src/apps/cht/Makefile.am index 95cb6b5..d25f906 100644 --- a/src/apps/cht/Makefile.am +++ b/src/apps/cht/Makefile.am @@ -1,5 +1,5 @@ LDADD=$(top_builddir)/src/libdfa/libdfa.a $(top_builddir)/src/2pc/lib2pc.a -SOURCES=cht.c +SOURCES=cht.c lib_LIBRARIES=libcht.a -libcht_a_SOURCES=cht.c +libcht_a_SOURCES=cht.c cht_client.c cht_server.c cht_message.c AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/src/apps/cht/cht.c b/src/apps/cht/cht.c index 587c75d..1f792df 100644 --- a/src/apps/cht/cht.c +++ b/src/apps/cht/cht.c @@ -47,74 +47,17 @@ terms specified in this license. #include "../../2pc/2pc.h" #include "../../libdfa/callbacks.h" #include -#define CREATE 1 -#define INSERT 2 -#define LOOKUP 3 -#define REMOVE 4 -#define DELETE 5 -/** Unimplemented: Evaluate a function call from a table provided by the library user. */ -#define TSTSET 6 -#define GETXID 7 -/* #define COMMIT 8 - #define ABORT 9 */ - -typedef struct { - unsigned short key_length; - unsigned short value_length; - unsigned char request_type; - unsigned char response_type; - clusterHashTable_t hashTable; -} payload_header; - -typedef struct { - int ht_xid; - jbHashTable_t * xid_ht; - jbHashTable_t * ht_ht; - int next_hashTableId; -} CHTAppState; +#include "cht_message.h" -#define __header_ptr(m) ((payload_header*)(&((m)->payload))) -static unsigned short* _key_length(Message * m) { - return &(__header_ptr(m)->key_length); -} - -static unsigned short* _value_length(Message *m) { - return &(__header_ptr(m)->value_length); -} - -#define getKeyLength(m) (ntohs(*_key_length(m))) -#define setKeyLength(m, x) (*_key_length(m)=htons(x)) - - -#define getValLength(m) (ntohs(*_value_length(m))) -#define setValLength(m, x) (*_value_length(m)=htons(x)) - -static unsigned char * requestType(Message *m) { - return &(__header_ptr(m)->request_type); -} - -static unsigned char * responseType(Message *m) { - return &(__header_ptr(m)->response_type); -} /** TODO: Endianness. (ICK) */ //state_name request_type =*(requestType(m)); \sdsd //state_name response_type =*(responseType(m)); \sd //TwoPCMachineState * machine_state_2pc = (TwoPCMachineState*) &(stateMachine->app_state); \sd -#define setup_vars \ -TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \ -CHTAppState * app_state_cht = app_state_2pc->app_state; \ -jbHashTable_t * xid_ht = app_state_cht->xid_ht; \ -jbHashTable_t * ht_ht = app_state_cht->ht_ht; \ -int ht_xid = app_state_cht->ht_xid; \ -int xid; \ -int xid_exists = (-1 != jbHtLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)); \ -jbHashTable_t ht; \ -int ht_exists = (-1 != jbHtLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte*) &ht)) @@ -137,27 +80,6 @@ static int hash( const unsigned char * key, size_t keylen , int table_length) { return( ret % table_length ); } -void * getKeyAddr(Message *m) { - char * stuff = m->payload; - - return (stuff + sizeof(payload_header)); /* Just add the header length. */ - -} - -void * getValAddr(Message * m) { - return ((char*)getKeyAddr(m)) + getKeyLength(m); /* key address + key length. */ -} - -/** - - @return 1 if the payload is valid (key_length and value length do not over-run the message's memory, 0 otherwise.) - -*/ -int checkPayload(Message * m) { - char * a = (char*)m; - char * b = getValAddr(m); - return (a+ sizeof(Message) ) >= (b + getValLength(m)); -} /** TODO: multiplex_interleaved needs to return a special group for begin/commit/abort requests. There will then be two types of @@ -184,143 +106,8 @@ short multiplex_interleaved(DfaSet * dfaSet, Message * m) { state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from); -/** - The client side function that 'does everything' - - @param request_type The type of request to be run. - - @param response_type The expected response type. Returns 1 if this - remote state is returned, 0 otherwise. (TODO: - Double check this documentation.) - - @param xid The (stateMachine) transaction id. Set to a random - number when calling BEGIN. To prevent deadlock, it's - best to choose a number unlikely to correspond to an - active transaction. (A random number within 2^32 of the - highest 64-bit integer will work.) - - @param reply_type When should the local call return? The choices - are AWAIT_ARRIVAL, which returns after hearing - back from the coordinator, AWAIT_COMMIT_POINT to - wait until after the transaction commits/aborts, - AWAIT_RESULT, which waits for the actual result - from one of the replicas. - - @param key, key_size, value, value_size depend on the value of request_type. - - @return 1 on success, 0 on failure. -*/ - -int _chtEval(DfaSet * dfaSet, - unsigned char request_type, - unsigned char response_type, - state_machine_id * xid, - clusterHashTable_t * ht, - void * key, size_t * key_size, - void * value, size_t * value_size) { - - /* Fill out a message payload. */ - - Message m; - - if(ht != NULL) { - printf("_chtEval(request=%d, response=%d, xid=%ld, ht=%d ", request_type, response_type, *xid, ht->id); - } else { - printf("_chtEval(request=%d, response=%d, xid=%ld, ht=NULL ", request_type, response_type, *xid); - } - if(key == NULL) { - printf(")\n"); - } else { - printf("key=%d)\n", *(int*)key); - } - * requestType(&m) = request_type; - * responseType(&m) = response_type; - - setKeyLength(&m, *key_size); - setValLength(&m, *value_size); - - assert(checkPayload(&m)); - if(key_size != 0) { - memcpy(getKeyAddr(&m), key, *key_size); - } - if(value_size != 0) { - memcpy(getValAddr(&m), value, *value_size); - } - if(ht != NULL) { - memcpy(&(__header_ptr(&m)->hashTable), ht, sizeof(clusterHashTable_t)); - } - - /* printf("%s <- %s\n", __header_ptr(&m)->initiator, dfaSet->networkSetup.localhost); */ - - /* Synchronously run the request */ - request(dfaSet, response_type, "bc:0", *xid, &m); - - if(!checkPayload(&m)) { - printf("_chtEval failed: Invalid response.\n"); - assert(0); - } - - /* Copy message contents back into caller's buffers, even if the - request failed. (There may be app-specific information in the - response...) */ - - if(ht != NULL) { - memcpy(ht, &(__header_ptr(&m)->hashTable), sizeof(clusterHashTable_t)); - } - if (*key_size != 0) { - /* printf("\n+%x<-%x+, length %d value=%s and %s\n", (unsigned int) value, (unsigned int)getValAddr(&m), getValLength(&m), value, getValAddr(&m)); */ - memcpy(value, getValAddr(&m), getValLength(&m)); - } - if (*value_size != 0) { - memcpy(key, getKeyAddr(&m), getKeyLength(&m)); - } - - *xid = m.to_machine_id; - - printf("+chtEval returning %d\n", m.type); - - return m.type; -} -state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - - - /* TwoPCMachineState * state = (TwoPCMachineState*) &(stateMachine->app_state);*/ - TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); - CHTAppState * app_state_cht = app_state_2pc->app_state; - - if(m->type != *responseType(m)) { - printf("Bug in client! m->type != response_type(m).\n"); - } - - if(*requestType(m) == CREATE) { - clusterHashTable_t new_cht; - new_cht.id = app_state_cht->next_hashTableId; - app_state_cht->next_hashTableId++; - - memcpy(&(__header_ptr(m)->hashTable), &new_cht, sizeof(clusterHashTable_t)); - - printf("Allocated hashtable %d\n", new_cht.id); - } - - printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), *responseType(m), *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id); - - if(*responseType(m) == AWAIT_ARRIVAL) { - state_machine_id tmp = m->from_machine_id; - - /* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */ - m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/ - - printf("Responding\n"); - - respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_START_2PC, m, m->initiator); - - m->from_machine_id = tmp; - } - - return 1; -} int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine) { int xid; @@ -332,234 +119,6 @@ int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine) } } -callback_fcn abort_cht; - -/* Begins new transaction, does the work the transaction requests, stores the message, and returns the corresponding error code. */ -state_name veto_or_prepare_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - - state_name ret; - setup_vars; - -// int xid_exists = (-1 != jbHtLookup(ht_xid, app_state_cht->xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id, &xid)); - - if(xid_exists) { printf("Warning: Stale xid found!\n"); } - assert(!xid_exists); - printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), *responseType(m), *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id); - - /* This is the start of a new transaction */ - xid = Tbegin(); // !!!! - if(xid < 0) { - - printf("Tbegin failed; %d\n", xid); - - } else if(jbHtInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)) == -1) { - - printf("jbHtInsert failed.\n"); - - } else { - - xid_exists = 1; - } - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); - - if(xid_exists) { - - ret = do_work(dfaSet, stateMachine, m, from); - - ret = ret ? SUBORDINATE_PREPARED_2PC : SUBORDINATE_VETO_2PC; - - } else { - - ret = SUBORDINATE_VETO_2PC; - - } - - if(ret == SUBORDINATE_VETO_2PC) { - abort_cht(dfaSet, stateMachine, m, from); - } - return ret; -} - -state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - setup_vars; - - printf("Aborting!!\n"); - - if(*responseType(m) == AWAIT_COMMIT_POINT || *responseType(m) == AWAIT_RESULT) { - state_machine_id tmp = m->from_machine_id; - - /* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */ - m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/ - - printf("Response being sent to: %s:%ld\n", m->initiator, m->to_machine_id); - respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_VETO_2PC, m, m->initiator); - m->from_machine_id = tmp; - } - - assert(xid_exists); - - Tabort(xid); // !!!! - jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid); - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); - return 1; -} - -/** TODO For now, we ignore the possiblity that jbHashTable's functions - return error codes. Instead, we assume that they always - succeed. */ -state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - int ret; - setup_vars; - - /* printf("ht_ht = %x, ht = %x\n", ht_ht, ht); */ - - switch(*requestType(m)) - { - case CREATE: - { - jbHashTable_t * new = jbHtCreate(ht_xid, 79); - if(new != NULL) { - ret = (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)new, sizeof(jbHashTable_t)) >= 0); - } else { - ret = 0; - } - if(ret) { - printf("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable).id); - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); - } else { - printf("Failed to insert new hash table slice!"); - } - - } break; - - case INSERT: - { - if(!ht_exists) { printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable).id); fflush(NULL); ret = 0; } else { - ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0); - printf("Insert: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable).id, getKeyLength(m), *(int*)getKeyAddr(m), getValAddr(m)); - (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); - - } - } break; - - case LOOKUP: - { - if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { - ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); - printf("Lookup: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable).id, getKeyLength(m), *(int*)getKeyAddr(m), getValAddr(m)); - } - } break; - - case REMOVE: - { - if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { - ret = (jbHtRemove(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); - (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); - } - } break; - - case DELETE: - { - if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { - jbHtRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m), NULL); - (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); - /* ret = (jbHtDelete(xid, &ht) >= 0); */ /* Don't need this--jbHtDelete just frees the (stack!) pointer. */ - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); - - } - } break; - - case TSTSET: - { - printf("Unimplemented request!\n"); - } break; - - case GETXID: - { - /* int new_xid = Tbegin(); - if(new_xid >= 0) { - setKeyLength(m, 0); - setValLength(m, sizeof(int)); - *((int*)getValAddr(m)) = new_xid; - ret = 1; - if(jbHtInsert(ht_xid, xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id), &xid, sizeof(int)) == -1) { - printf("Begin failed on jbHtInsert!\n"); - } else { - printf("Created local xid for global xid: %ld\n", stateMachine->machine_id); - } - } else { - printf("Begin failed on Tbegin()!\n"); - - ret = 0; - } NOOP */ - } break; - - /* case COMMIT: - { - ret = (Tcommit(xid) >= 0); - } break; - - case ABORT: - { - ret = (Tabort(xid) >= 0); - } break; - */ - default: - { - printf("Unknown request type: %d\n", *requestType(m)); - } - } - - return ret; -} - -state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - - setup_vars; - - assert(xid_exists); - Tcommit(xid); - jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid); - Tcommit(app_state_cht->ht_xid); - app_state_cht->ht_xid = Tbegin(); - /* }*/ - - if(*responseType(m) == AWAIT_RESULT) { - printf("commit_cht responding on an AWAIT_RESULT request.\n"); - assert(0); - /* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */ - } - /* TODO: Check error codes, and return accordingly... */ - return 1; -} - -state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - - // setup_vars; - - /* TODO: Make sure this is after tally forces the log. Also, need to - make sure that it increments the (currently unimplemented) - sequence number before flushing... */ - - if(*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC) { - state_machine_id tmp = m->from_machine_id; - - /* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */ - m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/ - - printf("Coordinator responding: ? ht=? (key length %d) %d -> to %s:%ld\n", getKeyLength(m), *(int*)getKeyAddr(m), /*getValAddr(m),*/ m->initiator, m->initiator_machine_id ); - respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator); - - m->from_machine_id = tmp; - } - - return 1; -} - DfaSet * cHtInit(int cht_type, char * localhost, short (* get_broadcast_group)(DfaSet *, Message *), short port, @@ -618,48 +177,3 @@ DfaSet * cHtInit(int cht_type, char * localhost, return dfaSet; } - -int cHtCreate(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * new_ht) { - size_t zero = 0; - return _chtEval(dfaSet, CREATE, AWAIT_COMMIT_POINT, &xid, new_ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; -} - - -int cHtInsert(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t datlen) { - return _chtEval(dfaSet, INSERT, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, &datlen) != SUBORDINATE_VETO_2PC; -} - -int cHtLookup(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) { - return _chtEval(dfaSet, LOOKUP, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; -} - -int cHtRemove(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) { - return _chtEval(dfaSet, REMOVE, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; -} - -int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht) { - size_t zero = 0; - return _chtEval(dfaSet, DELETE, AWAIT_COMMIT_POINT, &xid, ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; -} - -int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) { - size_t zero = 0; - *xid = NULL_MACHINE; /* Will be overwritten by - _chtEval... Need a large random - value so that the request will - be serviced exactly once, but - will not conflict with real - transactions or other begins.*/ - return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; -} - -/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) { - size_t zero = 0; - return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); -} - - -int cHtAbort(state_machine_id xid, DfaSet * dfaSet) { - size_t zero = 0; - return _chtEval(dfaSet, ABORT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); - }*/ diff --git a/src/apps/cht/cht.h b/src/apps/cht/cht.h index 03368a6..91a3508 100644 --- a/src/apps/cht/cht.h +++ b/src/apps/cht/cht.h @@ -40,7 +40,8 @@ permission to use and distribute the software in accordance with the terms specified in this license. ---*/ #include - +#include +#include "../../2pc/2pc.h" #define CHT_COORDINATOR 1 #define CHT_SERVER 2 #define CHT_CLIENT 3 @@ -138,3 +139,17 @@ DfaSet * cHtInit(int cht_type, char * localhost, int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet); /*int cHtCommit(state_machine_id xid, DfaSet * dfaSet); int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/ + +typedef struct { + int ht_xid; + jbHashTable_t * xid_ht; + jbHashTable_t * ht_ht; + int next_hashTableId; +} CHTAppState; + + +callback_fcn veto_or_prepare_cht; +callback_fcn commit_cht; +callback_fcn tally_cht; +callback_fcn abort_cht; +callback_fcn init_xact_cht; diff --git a/src/apps/cht/cht_client.c b/src/apps/cht/cht_client.c new file mode 100644 index 0000000..770398e --- /dev/null +++ b/src/apps/cht/cht_client.c @@ -0,0 +1,147 @@ +#include "cht.h" +#include "cht_message.h" +#include "../../2pc/2pc.h" +#include +#include +#include +/** + The client side function that 'does everything' + + @param request_type The type of request to be run. + + @param response_type The expected response type. Returns 1 if this + remote state is returned, 0 otherwise. (TODO: + Double check this documentation.) + + @param xid The (stateMachine) transaction id. Set to a random + number when calling BEGIN. To prevent deadlock, it's + best to choose a number unlikely to correspond to an + active transaction. (A random number within 2^32 of the + highest 64-bit integer will work.) + + @param reply_type When should the local call return? The choices + are AWAIT_ARRIVAL, which returns after hearing + back from the coordinator, AWAIT_COMMIT_POINT to + wait until after the transaction commits/aborts, + AWAIT_RESULT, which waits for the actual result + from one of the replicas. + + @param key, key_size, value, value_size depend on the value of request_type. + + @return 1 on success, 0 on failure. +*/ +static int _chtEval(DfaSet * dfaSet, + unsigned char request_type, + unsigned char response_type, + state_machine_id * xid, + clusterHashTable_t * ht, + void * key, size_t * key_size, + void * value, size_t * value_size) { + + /* Fill out a message payload. */ + + Message m; + + if(ht != NULL) { + printf("_chtEval(request=%d, response=%d, xid=%ld, ht=%d ", request_type, response_type, *xid, ht->id); + } else { + printf("_chtEval(request=%d, response=%d, xid=%ld, ht=NULL ", request_type, response_type, *xid); + } + if(key == NULL) { + printf(")\n"); + } else { + printf("key=%d)\n", *(int*)key); + } + * requestType(&m) = request_type; + m.response_type = response_type; + + setKeyLength(&m, *key_size); + setValLength(&m, *value_size); + + assert(checkPayload(&m)); + if(key_size != 0) { + memcpy(getKeyAddr(&m), key, *key_size); + } + if(value_size != 0) { + memcpy(getValAddr(&m), value, *value_size); + } + if(ht != NULL) { + memcpy(&(__header_ptr(&m)->hashTable), ht, sizeof(clusterHashTable_t)); + } + + /* printf("%s <- %s\n", __header_ptr(&m)->initiator, dfaSet->networkSetup.localhost); */ + + /* Synchronously run the request */ + request(dfaSet, response_type, "bc:0", *xid, &m); + + if(!checkPayload(&m)) { + printf("_chtEval failed: Invalid response.\n"); + assert(0); + } + + /* Copy message contents back into caller's buffers, even if the + request failed. (There may be app-specific information in the + response...) */ + + if(ht != NULL) { + memcpy(ht, &(__header_ptr(&m)->hashTable), sizeof(clusterHashTable_t)); + } + if (*key_size != 0) { + /* printf("\n+%x<-%x+, length %d value=%s and %s\n", (unsigned int) value, (unsigned int)getValAddr(&m), getValLength(&m), value, getValAddr(&m)); */ + memcpy(value, getValAddr(&m), getValLength(&m)); + } + if (*value_size != 0) { + memcpy(key, getKeyAddr(&m), getKeyLength(&m)); + } + + *xid = m.to_machine_id; + + printf("+chtEval returning %d\n", m.type); + + return m.type; +} + +int cHtCreate(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * new_ht) { + size_t zero = 0; + return _chtEval(dfaSet, CREATE, AWAIT_COMMIT_POINT, &xid, new_ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; +} + + +int cHtInsert(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t datlen) { + return _chtEval(dfaSet, INSERT, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, &datlen) != SUBORDINATE_VETO_2PC; +} + +int cHtLookup(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) { + return _chtEval(dfaSet, LOOKUP, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; +} + +int cHtRemove(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) { + return _chtEval(dfaSet, REMOVE, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC; +} + +int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht) { + size_t zero = 0; + return _chtEval(dfaSet, DELETE, AWAIT_COMMIT_POINT, &xid, ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; +} + +int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) { + size_t zero = 0; + *xid = NULL_MACHINE; /* Will be overwritten by + _chtEval... Need a large random + value so that the request will + be serviced exactly once, but + will not conflict with real + transactions or other begins.*/ + return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC; +} + +/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) { + size_t zero = 0; + return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); +} + + +int cHtAbort(state_machine_id xid, DfaSet * dfaSet) { + size_t zero = 0; + return _chtEval(dfaSet, ABORT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero); + }*/ diff --git a/src/apps/cht/cht_message.c b/src/apps/cht/cht_message.c new file mode 100644 index 0000000..fdbc26e --- /dev/null +++ b/src/apps/cht/cht_message.c @@ -0,0 +1,22 @@ +#include "cht_message.h" +#include +void * getKeyAddr(Message *m) { + char * stuff = m->payload; + + return (stuff + sizeof(payload_header)); /* Just add the header length. */ + +} +void * getValAddr(Message * m) { + return ((char*)getKeyAddr(m)) + getKeyLength(m); /* key address + key length. */ +} + +/** + + @return 1 if the payload is valid (key_length and value length do not over-run the message's memory, 0 otherwise.) + +*/ +int checkPayload(Message * m) { + char * a = (char*)m; + char * b = getValAddr(m); + return (a+ sizeof(Message) ) >= (b + getValLength(m)); +} diff --git a/src/apps/cht/cht_message.h b/src/apps/cht/cht_message.h new file mode 100644 index 0000000..0fda337 --- /dev/null +++ b/src/apps/cht/cht_message.h @@ -0,0 +1,55 @@ +#include + +#define CREATE 1 +#define INSERT 2 +#define LOOKUP 3 +#define REMOVE 4 +#define DELETE 5 +/** Unimplemented: Evaluate a function call from a table provided by the library user. */ +#define TSTSET 6 +#define GETXID 7 +/* #define COMMIT 8 + #define ABORT 9 */ + +typedef struct { + unsigned short key_length; + unsigned short value_length; + unsigned char request_type; + // unsigned char response_type; + int hashTable; +} payload_header; + +#define __header_ptr(m) ((payload_header*)(&((m)->payload))) + +#define __key_length(m) (&(__header_ptr(m)->key_length)) +#define __value_length(m) (&(__header_ptr(m)->value_length)) + +#define getKeyLength(m) (ntohs(*__key_length(m))) +#define setKeyLength(m, x) (*__key_length(m)=htons(x)) + +#define getValLength(m) (ntohs(*__value_length(m))) +#define setValLength(m, x) (*__value_length(m)=htons(x)) + +#define requestType(m) (&(__header_ptr(m)->request_type)) +#define responseType(m) (&(__header_ptr(m)->response_type)) + +void * getKeyAddr(Message *m); +void * getValAddr(Message * m); +int checkPayload(Message * m); + + +/* +static unsigned short* _key_length(Message * m) { + return &(__header_ptr(m)->key_length); +} + +static unsigned short* _value_length(Message *m) { + return &(__header_ptr(m)->value_length); +} +static unsigned char * requestType(Message *m) { + return &(__header_ptr(m)->request_type); +} + +static unsigned char * responseType(Message *m) { + return &(__header_ptr(m)->response_type); +}*/ diff --git a/src/apps/cht/cht_message.o b/src/apps/cht/cht_message.o new file mode 100644 index 0000000..a159f75 Binary files /dev/null and b/src/apps/cht/cht_message.o differ diff --git a/src/apps/cht/cht_server.c b/src/apps/cht/cht_server.c new file mode 100644 index 0000000..2398775 --- /dev/null +++ b/src/apps/cht/cht_server.c @@ -0,0 +1,236 @@ +#include "cht.h" +#include "cht_message.h" +#include +#include +#include +#define setup_vars \ +TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \ +CHTAppState * app_state_cht = app_state_2pc->app_state; \ +jbHashTable_t * xid_ht = app_state_cht->xid_ht; \ +jbHashTable_t * ht_ht = app_state_cht->ht_ht; \ +int ht_xid = app_state_cht->ht_xid; \ +int xid; \ +int xid_exists = (-1 != jbHtLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)); \ +jbHashTable_t ht; \ +int ht_exists = (-1 != jbHtLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte*) &ht)) + +/** TODO For now, we ignore the possiblity that jbHashTable's functions + return error codes. Instead, we assume that they always + succeed. */ +static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + int ret; + setup_vars; + + switch(*requestType(m)) + { + case CREATE: + { + jbHashTable_t * new = jbHtCreate(ht_xid, 79); + if(new != NULL) { + ret = (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)new, sizeof(jbHashTable_t)) >= 0); + } else { + ret = 0; + } + if(ret) { + printf("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable)); + Tcommit(app_state_cht->ht_xid); + app_state_cht->ht_xid = Tbegin(); + } else { + printf("Failed to insert new hash table slice!"); + } + + } break; + + case INSERT: + { + if(!ht_exists) { printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(NULL); ret = 0; } else { + ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0); + printf("Insert: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), (char*)getValAddr(m)); + (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); + + } + } break; + + case LOOKUP: + { + if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { + ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); + printf("Lookup: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), (char*)getValAddr(m)); + } + } break; + + case REMOVE: + { + if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { + ret = (jbHtRemove(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0); + (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); + } + } break; + + case DELETE: + { + if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else { + jbHtRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m), NULL); + (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t))); + /* ret = (jbHtDelete(xid, &ht) >= 0); */ /* Don't need this--jbHtDelete just frees the (stack!) pointer. */ + Tcommit(app_state_cht->ht_xid); + app_state_cht->ht_xid = Tbegin(); + + } + } break; + + case TSTSET: + { + printf("Unimplemented request!\n"); + } break; + + case GETXID: + { + /* int new_xid = Tbegin(); + if(new_xid >= 0) { + setKeyLength(m, 0); + setValLength(m, sizeof(int)); + *((int*)getValAddr(m)) = new_xid; + ret = 1; + if(jbHtInsert(ht_xid, xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id), &xid, sizeof(int)) == -1) { + printf("Begin failed on jbHtInsert!\n"); + } else { + printf("Created local xid for global xid: %ld\n", stateMachine->machine_id); + } + } else { + printf("Begin failed on Tbegin()!\n"); + + ret = 0; + } NOOP */ + } break; + + /* case COMMIT: + { + ret = (Tcommit(xid) >= 0); + } break; + + case ABORT: + { + ret = (Tabort(xid) >= 0); + } break; + */ + default: + { + printf("Unknown request type: %d\n", *requestType(m)); + } + } + + return ret; +} + +/* Run by the coordinator when the request is received from the client. */ +state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + + TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); + CHTAppState * app_state_cht = app_state_2pc->app_state; + + if(m->type != m->response_type) { + printf("Bug in client! m->type != response_type(m).\n"); + } + + if(*requestType(m) == CREATE) { + clusterHashTable_t new_cht; + new_cht.id = app_state_cht->next_hashTableId; + app_state_cht->next_hashTableId++; + + memcpy(&(__header_ptr(m)->hashTable), &new_cht, sizeof(clusterHashTable_t)); + + printf("Allocated hashtable %d\n", new_cht.id); + } + + printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), m->response_type, *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id); + + return 1; +} + +/* Begins new transaction, does the work the transaction requests, stores the message, and returns the corresponding error code. */ +state_name veto_or_prepare_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + + state_name ret; + setup_vars; + +// int xid_exists = (-1 != jbHtLookup(ht_xid, app_state_cht->xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id, &xid)); + + if(xid_exists) { printf("Warning: Stale xid found!\n"); } + assert(!xid_exists); + printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), m->response_type, *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id); + + /* This is the start of a new transaction */ + xid = Tbegin(); // !!!! + if(xid < 0) { + + printf("Tbegin failed; %d\n", xid); + + } else if(jbHtInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)) == -1) { + + printf("jbHtInsert failed.\n"); + + } else { + + xid_exists = 1; + } + Tcommit(app_state_cht->ht_xid); + app_state_cht->ht_xid = Tbegin(); + + if(xid_exists) { + + ret = do_work(dfaSet, stateMachine, m, from); + + ret = ret ? SUBORDINATE_PREPARED_2PC : SUBORDINATE_VETO_2PC; + + } else { + + ret = SUBORDINATE_VETO_2PC; + + } + + if(ret == SUBORDINATE_VETO_2PC) { + abort_cht(dfaSet, stateMachine, m, from); + } + return ret; +} + +state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + setup_vars; + + printf("Aborting!!\n"); + + assert(xid_exists); + + Tabort(xid); // !!!! + jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid); + Tcommit(app_state_cht->ht_xid); + app_state_cht->ht_xid = Tbegin(); + return 1; +} + + +state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + + setup_vars; + + assert(xid_exists); + Tcommit(xid); + jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid); + Tcommit(app_state_cht->ht_xid); + app_state_cht->ht_xid = Tbegin(); + /* }*/ + /** @todo why was there an assert(0) in commit_cht?? */ + /** @todo On a commit, should responses of type AWAIT_RESULT ever be done by subordinates? */ +// if(m->response_type == AWAIT_RESULT) { +// printf("commit_cht responding on an AWAIT_RESULT request.\n"); +// assert(0); + /* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */ +// } + /* TODO: Check error codes, and return accordingly... */ + return 1; +} + +state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { + return 1; +}