Rearranged cht, moving some functionality into the 2pc library. Still

need some test cases.
This commit is contained in:
Sears Russell 2005-01-16 02:30:32 +00:00
parent 894e30085c
commit 7dacf21069
11 changed files with 530 additions and 501 deletions

View file

@ -81,6 +81,7 @@ typedef struct message {
state_machine_id from_machine_id; state_machine_id from_machine_id;
state_machine_id initiator_machine_id; state_machine_id initiator_machine_id;
char initiator[MAX_ADDRESS_LENGTH]; char initiator[MAX_ADDRESS_LENGTH];
unsigned char response_type;
message_name type; message_name type;
/** Payload is a byte array of arbitrary length. **/ /** Payload is a byte array of arbitrary length. **/
char payload[MAX_PAYLOAD]; char payload[MAX_PAYLOAD];

View file

@ -186,7 +186,18 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine,
} else { } else {
ret = 1; ret = 1;
} }
/* Where was this before?? */
if(m->type==AWAIT_ARRIVAL && ret) {
// need to (n)ack the client:
// Respond using the machine id expected by the client.
m->from_machine_id = m->initiator_machine_id;
printf("Responding\n");
respond_once(&((DfaSet*)dfaSet)->networkSetup,
COORDINATOR_START_2PC, m, m->initiator);
}
m->from_machine_id = stateMachine->machine_id; m->from_machine_id = stateMachine->machine_id;
return ret; return ret;
@ -204,7 +215,9 @@ state_name send_ack_2pc(void * dfaSet, StateMachine * stateMachine, Message * m,
state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
return app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from); int ret = app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from);
return ret;
} }
/** /**
@ -213,10 +226,21 @@ state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Messa
state_name abort_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { state_name abort_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
send_ack_2pc(dfaSet, stateMachine, m, from); send_ack_2pc(dfaSet, stateMachine, m, from);
int ret = app_state->abort_2pc(dfaSet, stateMachine, m, from);
return app_state->abort_2pc(dfaSet, stateMachine, m, from);
//if((*responseType(m) == AWAIT_COMMIT_POINT || *responseType(m) == AWAIT_RESULT)) {
if(m->response_type == AWAIT_COMMIT_POINT || m->response_type == AWAIT_RESULT) {
state_machine_id tmp = m->from_machine_id;
/* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */
m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/
printf("Response being sent to: %s:%ld\n", m->initiator, m->to_machine_id);
respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_VETO_2PC, m, m->initiator);
m->from_machine_id = tmp;
}
return ret;
} }
state_name commit_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { state_name commit_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
@ -270,11 +294,22 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
sprintf(from, "bc:%d", bc_group); sprintf(from, "bc:%d", bc_group);
if(ret && app_state->tally_2pc != NULL) { if(ret && app_state->tally_2pc != NULL) {
return app_state->tally_2pc(dfaSet, stateMachine, m, from); ret = app_state->tally_2pc(dfaSet, stateMachine, m, from);
} else {
return ret;
} }
/* TODO: CORRECTNESS BUG Make sure this is after tally forces the log. Also, need to
make sure that it increments the (currently unimplemented)
sequence number before flushing... */
if(ret && (m->response_type == AWAIT_COMMIT_POINT && m->response_type == COORDINATOR_START_2PC)) {
// if(ret && (*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC)) {
state_machine_id tmp = m->from_machine_id;
m->from_machine_id = m->initiator_machine_id;
//printf("Coordinator responding: ? ht=? (key length %d) %d -> to %s:%ld\n", getKeyLength(m), *(int*)getKeyAddr(m), /*getValAddr(m),*/ m->initiator, m->initiator_machine_id );
respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator);
m->from_machine_id = tmp;
}
return ret;
} else { } else {
sprintf(from, "bc:%d", bc_group); sprintf(from, "bc:%d", bc_group);
@ -282,4 +317,3 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
} }
} }

View file

@ -41,6 +41,8 @@ terms specified in this license.
---*/ ---*/
#include <libdfa/libdfa.h> #include <libdfa/libdfa.h>
#ifndef __TWOPC_H
#define __TWOPC_H
#define MAX_SUBORDINATES 10 #define MAX_SUBORDINATES 10
/** To use this library, you need to implement: /** To use this library, you need to implement:
@ -69,7 +71,9 @@ the current implementation, but no scheduling is done. The first
request that retries after a transaction completes is the one that request that retries after a transaction completes is the one that
gets to reuse the transaction id. gets to reuse the transaction id.
@todo Users of 2pc should not have to deal with dfaSets, so they shouldn't
be passed into the callbacks. Instead, only pass in the app-specific
payload.
*/ */
/* /*
These will generally be defined by the user of the library. These will generally be defined by the user of the library.
@ -151,3 +155,4 @@ extern Transition client_transitions_2pc[];
extern Transition transitions_2pc[]; extern Transition transitions_2pc[];
extern State states_2pc[]; extern State states_2pc[];
/* #endif */ /* #endif */
#endif /*__TWOPC_H */

View file

@ -1,5 +1,5 @@
LDADD=$(top_builddir)/src/libdfa/libdfa.a $(top_builddir)/src/2pc/lib2pc.a LDADD=$(top_builddir)/src/libdfa/libdfa.a $(top_builddir)/src/2pc/lib2pc.a
SOURCES=cht.c SOURCES=cht.c
lib_LIBRARIES=libcht.a lib_LIBRARIES=libcht.a
libcht_a_SOURCES=cht.c libcht_a_SOURCES=cht.c cht_client.c cht_server.c cht_message.c
AM_CFLAGS= -g -Wall -pedantic -std=gnu99 AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -47,74 +47,17 @@ terms specified in this license.
#include "../../2pc/2pc.h" #include "../../2pc/2pc.h"
#include "../../libdfa/callbacks.h" #include "../../libdfa/callbacks.h"
#include <pbl/jbhash.h> #include <pbl/jbhash.h>
#define CREATE 1 #include "cht_message.h"
#define INSERT 2
#define LOOKUP 3
#define REMOVE 4
#define DELETE 5
/** Unimplemented: Evaluate a function call from a table provided by the library user. */
#define TSTSET 6
#define GETXID 7
/* #define COMMIT 8
#define ABORT 9 */
typedef struct {
unsigned short key_length;
unsigned short value_length;
unsigned char request_type;
unsigned char response_type;
clusterHashTable_t hashTable;
} payload_header;
typedef struct {
int ht_xid;
jbHashTable_t * xid_ht;
jbHashTable_t * ht_ht;
int next_hashTableId;
} CHTAppState;
#define __header_ptr(m) ((payload_header*)(&((m)->payload)))
static unsigned short* _key_length(Message * m) {
return &(__header_ptr(m)->key_length);
}
static unsigned short* _value_length(Message *m) {
return &(__header_ptr(m)->value_length);
}
#define getKeyLength(m) (ntohs(*_key_length(m)))
#define setKeyLength(m, x) (*_key_length(m)=htons(x))
#define getValLength(m) (ntohs(*_value_length(m)))
#define setValLength(m, x) (*_value_length(m)=htons(x))
static unsigned char * requestType(Message *m) {
return &(__header_ptr(m)->request_type);
}
static unsigned char * responseType(Message *m) {
return &(__header_ptr(m)->response_type);
}
/** TODO: Endianness. (ICK) */ /** TODO: Endianness. (ICK) */
//state_name request_type =*(requestType(m)); \sdsd //state_name request_type =*(requestType(m)); \sdsd
//state_name response_type =*(responseType(m)); \sd //state_name response_type =*(responseType(m)); \sd
//TwoPCMachineState * machine_state_2pc = (TwoPCMachineState*) &(stateMachine->app_state); \sd //TwoPCMachineState * machine_state_2pc = (TwoPCMachineState*) &(stateMachine->app_state); \sd
#define setup_vars \
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \
CHTAppState * app_state_cht = app_state_2pc->app_state; \
jbHashTable_t * xid_ht = app_state_cht->xid_ht; \
jbHashTable_t * ht_ht = app_state_cht->ht_ht; \
int ht_xid = app_state_cht->ht_xid; \
int xid; \
int xid_exists = (-1 != jbHtLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)); \
jbHashTable_t ht; \
int ht_exists = (-1 != jbHtLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte*) &ht))
@ -137,27 +80,6 @@ static int hash( const unsigned char * key, size_t keylen , int table_length) {
return( ret % table_length ); return( ret % table_length );
} }
void * getKeyAddr(Message *m) {
char * stuff = m->payload;
return (stuff + sizeof(payload_header)); /* Just add the header length. */
}
void * getValAddr(Message * m) {
return ((char*)getKeyAddr(m)) + getKeyLength(m); /* key address + key length. */
}
/**
@return 1 if the payload is valid (key_length and value length do not over-run the message's memory, 0 otherwise.)
*/
int checkPayload(Message * m) {
char * a = (char*)m;
char * b = getValAddr(m);
return (a+ sizeof(Message) ) >= (b + getValLength(m));
}
/** TODO: multiplex_interleaved needs to return a special group for /** TODO: multiplex_interleaved needs to return a special group for
begin/commit/abort requests. There will then be two types of begin/commit/abort requests. There will then be two types of
@ -184,143 +106,8 @@ short multiplex_interleaved(DfaSet * dfaSet, Message * m) {
state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from); state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from);
/**
The client side function that 'does everything'
@param request_type The type of request to be run.
@param response_type The expected response type. Returns 1 if this
remote state is returned, 0 otherwise. (TODO:
Double check this documentation.)
@param xid The (stateMachine) transaction id. Set to a random
number when calling BEGIN. To prevent deadlock, it's
best to choose a number unlikely to correspond to an
active transaction. (A random number within 2^32 of the
highest 64-bit integer will work.)
@param reply_type When should the local call return? The choices
are AWAIT_ARRIVAL, which returns after hearing
back from the coordinator, AWAIT_COMMIT_POINT to
wait until after the transaction commits/aborts,
AWAIT_RESULT, which waits for the actual result
from one of the replicas.
@param key, key_size, value, value_size depend on the value of request_type.
@return 1 on success, 0 on failure.
*/
int _chtEval(DfaSet * dfaSet,
unsigned char request_type,
unsigned char response_type,
state_machine_id * xid,
clusterHashTable_t * ht,
void * key, size_t * key_size,
void * value, size_t * value_size) {
/* Fill out a message payload. */
Message m;
if(ht != NULL) {
printf("_chtEval(request=%d, response=%d, xid=%ld, ht=%d ", request_type, response_type, *xid, ht->id);
} else {
printf("_chtEval(request=%d, response=%d, xid=%ld, ht=NULL ", request_type, response_type, *xid);
}
if(key == NULL) {
printf(")\n");
} else {
printf("key=%d)\n", *(int*)key);
}
* requestType(&m) = request_type;
* responseType(&m) = response_type;
setKeyLength(&m, *key_size);
setValLength(&m, *value_size);
assert(checkPayload(&m));
if(key_size != 0) {
memcpy(getKeyAddr(&m), key, *key_size);
}
if(value_size != 0) {
memcpy(getValAddr(&m), value, *value_size);
}
if(ht != NULL) {
memcpy(&(__header_ptr(&m)->hashTable), ht, sizeof(clusterHashTable_t));
}
/* printf("%s <- %s\n", __header_ptr(&m)->initiator, dfaSet->networkSetup.localhost); */
/* Synchronously run the request */
request(dfaSet, response_type, "bc:0", *xid, &m);
if(!checkPayload(&m)) {
printf("_chtEval failed: Invalid response.\n");
assert(0);
}
/* Copy message contents back into caller's buffers, even if the
request failed. (There may be app-specific information in the
response...) */
if(ht != NULL) {
memcpy(ht, &(__header_ptr(&m)->hashTable), sizeof(clusterHashTable_t));
}
if (*key_size != 0) {
/* printf("\n+%x<-%x+, length %d value=%s and %s\n", (unsigned int) value, (unsigned int)getValAddr(&m), getValLength(&m), value, getValAddr(&m)); */
memcpy(value, getValAddr(&m), getValLength(&m));
}
if (*value_size != 0) {
memcpy(key, getKeyAddr(&m), getKeyLength(&m));
}
*xid = m.to_machine_id;
printf("+chtEval returning %d\n", m.type);
return m.type;
}
state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
/* TwoPCMachineState * state = (TwoPCMachineState*) &(stateMachine->app_state);*/
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
CHTAppState * app_state_cht = app_state_2pc->app_state;
if(m->type != *responseType(m)) {
printf("Bug in client! m->type != response_type(m).\n");
}
if(*requestType(m) == CREATE) {
clusterHashTable_t new_cht;
new_cht.id = app_state_cht->next_hashTableId;
app_state_cht->next_hashTableId++;
memcpy(&(__header_ptr(m)->hashTable), &new_cht, sizeof(clusterHashTable_t));
printf("Allocated hashtable %d\n", new_cht.id);
}
printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), *responseType(m), *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id);
if(*responseType(m) == AWAIT_ARRIVAL) {
state_machine_id tmp = m->from_machine_id;
/* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */
m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/
printf("Responding\n");
respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_START_2PC, m, m->initiator);
m->from_machine_id = tmp;
}
return 1;
}
int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine) { int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine) {
int xid; int xid;
@ -332,234 +119,6 @@ int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine)
} }
} }
callback_fcn abort_cht;
/* Begins new transaction, does the work the transaction requests, stores the message, and returns the corresponding error code. */
state_name veto_or_prepare_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
state_name ret;
setup_vars;
// int xid_exists = (-1 != jbHtLookup(ht_xid, app_state_cht->xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id, &xid));
if(xid_exists) { printf("Warning: Stale xid found!\n"); }
assert(!xid_exists);
printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), *responseType(m), *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id);
/* This is the start of a new transaction */
xid = Tbegin(); // !!!!
if(xid < 0) {
printf("Tbegin failed; %d\n", xid);
} else if(jbHtInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)) == -1) {
printf("jbHtInsert failed.\n");
} else {
xid_exists = 1;
}
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
if(xid_exists) {
ret = do_work(dfaSet, stateMachine, m, from);
ret = ret ? SUBORDINATE_PREPARED_2PC : SUBORDINATE_VETO_2PC;
} else {
ret = SUBORDINATE_VETO_2PC;
}
if(ret == SUBORDINATE_VETO_2PC) {
abort_cht(dfaSet, stateMachine, m, from);
}
return ret;
}
state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
setup_vars;
printf("Aborting!!\n");
if(*responseType(m) == AWAIT_COMMIT_POINT || *responseType(m) == AWAIT_RESULT) {
state_machine_id tmp = m->from_machine_id;
/* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */
m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/
printf("Response being sent to: %s:%ld\n", m->initiator, m->to_machine_id);
respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_VETO_2PC, m, m->initiator);
m->from_machine_id = tmp;
}
assert(xid_exists);
Tabort(xid); // !!!!
jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
return 1;
}
/** TODO For now, we ignore the possiblity that jbHashTable's functions
return error codes. Instead, we assume that they always
succeed. */
state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
int ret;
setup_vars;
/* printf("ht_ht = %x, ht = %x\n", ht_ht, ht); */
switch(*requestType(m))
{
case CREATE:
{
jbHashTable_t * new = jbHtCreate(ht_xid, 79);
if(new != NULL) {
ret = (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)new, sizeof(jbHashTable_t)) >= 0);
} else {
ret = 0;
}
if(ret) {
printf("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable).id);
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
} else {
printf("Failed to insert new hash table slice!");
}
} break;
case INSERT:
{
if(!ht_exists) { printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable).id); fflush(NULL); ret = 0; } else {
ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0);
printf("Insert: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable).id, getKeyLength(m), *(int*)getKeyAddr(m), getValAddr(m));
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
}
} break;
case LOOKUP:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
printf("Lookup: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable).id, getKeyLength(m), *(int*)getKeyAddr(m), getValAddr(m));
}
} break;
case REMOVE:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
ret = (jbHtRemove(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
}
} break;
case DELETE:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
jbHtRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m), NULL);
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
/* ret = (jbHtDelete(xid, &ht) >= 0); */ /* Don't need this--jbHtDelete just frees the (stack!) pointer. */
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
}
} break;
case TSTSET:
{
printf("Unimplemented request!\n");
} break;
case GETXID:
{
/* int new_xid = Tbegin();
if(new_xid >= 0) {
setKeyLength(m, 0);
setValLength(m, sizeof(int));
*((int*)getValAddr(m)) = new_xid;
ret = 1;
if(jbHtInsert(ht_xid, xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id), &xid, sizeof(int)) == -1) {
printf("Begin failed on jbHtInsert!\n");
} else {
printf("Created local xid for global xid: %ld\n", stateMachine->machine_id);
}
} else {
printf("Begin failed on Tbegin()!\n");
ret = 0;
} NOOP */
} break;
/* case COMMIT:
{
ret = (Tcommit(xid) >= 0);
} break;
case ABORT:
{
ret = (Tabort(xid) >= 0);
} break;
*/
default:
{
printf("Unknown request type: %d\n", *requestType(m));
}
}
return ret;
}
state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
setup_vars;
assert(xid_exists);
Tcommit(xid);
jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
/* }*/
if(*responseType(m) == AWAIT_RESULT) {
printf("commit_cht responding on an AWAIT_RESULT request.\n");
assert(0);
/* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */
}
/* TODO: Check error codes, and return accordingly... */
return 1;
}
state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
// setup_vars;
/* TODO: Make sure this is after tally forces the log. Also, need to
make sure that it increments the (currently unimplemented)
sequence number before flushing... */
if(*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC) {
state_machine_id tmp = m->from_machine_id;
/* TODO: Could the chages to from_machine_id be moved into libdfa (it does this anyway, but it does it too late.) */
m->from_machine_id = m->initiator_machine_id; /*stateMachine->machine_id;*/
printf("Coordinator responding: ? ht=? (key length %d) %d -> to %s:%ld\n", getKeyLength(m), *(int*)getKeyAddr(m), /*getValAddr(m),*/ m->initiator, m->initiator_machine_id );
respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator);
m->from_machine_id = tmp;
}
return 1;
}
DfaSet * cHtInit(int cht_type, char * localhost, DfaSet * cHtInit(int cht_type, char * localhost,
short (* get_broadcast_group)(DfaSet *, Message *), short (* get_broadcast_group)(DfaSet *, Message *),
short port, short port,
@ -618,48 +177,3 @@ DfaSet * cHtInit(int cht_type, char * localhost,
return dfaSet; return dfaSet;
} }
int cHtCreate(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * new_ht) {
size_t zero = 0;
return _chtEval(dfaSet, CREATE, AWAIT_COMMIT_POINT, &xid, new_ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
int cHtInsert(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t datlen) {
return _chtEval(dfaSet, INSERT, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, &datlen) != SUBORDINATE_VETO_2PC;
}
int cHtLookup(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) {
return _chtEval(dfaSet, LOOKUP, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
}
int cHtRemove(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) {
return _chtEval(dfaSet, REMOVE, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
}
int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht) {
size_t zero = 0;
return _chtEval(dfaSet, DELETE, AWAIT_COMMIT_POINT, &xid, ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) {
size_t zero = 0;
*xid = NULL_MACHINE; /* Will be overwritten by
_chtEval... Need a large random
value so that the request will
be serviced exactly once, but
will not conflict with real
transactions or other begins.*/
return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) {
size_t zero = 0;
return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero);
}
int cHtAbort(state_machine_id xid, DfaSet * dfaSet) {
size_t zero = 0;
return _chtEval(dfaSet, ABORT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero);
}*/

View file

@ -40,7 +40,8 @@ permission to use and distribute the software in accordance with the
terms specified in this license. terms specified in this license.
---*/ ---*/
#include <libdfa/libdfa.h> #include <libdfa/libdfa.h>
#include <pbl/jbhash.h>
#include "../../2pc/2pc.h"
#define CHT_COORDINATOR 1 #define CHT_COORDINATOR 1
#define CHT_SERVER 2 #define CHT_SERVER 2
#define CHT_CLIENT 3 #define CHT_CLIENT 3
@ -138,3 +139,17 @@ DfaSet * cHtInit(int cht_type, char * localhost,
int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet); int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet);
/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet); /*int cHtCommit(state_machine_id xid, DfaSet * dfaSet);
int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/ int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/
typedef struct {
int ht_xid;
jbHashTable_t * xid_ht;
jbHashTable_t * ht_ht;
int next_hashTableId;
} CHTAppState;
callback_fcn veto_or_prepare_cht;
callback_fcn commit_cht;
callback_fcn tally_cht;
callback_fcn abort_cht;
callback_fcn init_xact_cht;

147
src/apps/cht/cht_client.c Normal file
View file

@ -0,0 +1,147 @@
#include "cht.h"
#include "cht_message.h"
#include "../../2pc/2pc.h"
#include <netinet/in.h>
#include <assert.h>
#include <string.h>
/**
The client side function that 'does everything'
@param request_type The type of request to be run.
@param response_type The expected response type. Returns 1 if this
remote state is returned, 0 otherwise. (TODO:
Double check this documentation.)
@param xid The (stateMachine) transaction id. Set to a random
number when calling BEGIN. To prevent deadlock, it's
best to choose a number unlikely to correspond to an
active transaction. (A random number within 2^32 of the
highest 64-bit integer will work.)
@param reply_type When should the local call return? The choices
are AWAIT_ARRIVAL, which returns after hearing
back from the coordinator, AWAIT_COMMIT_POINT to
wait until after the transaction commits/aborts,
AWAIT_RESULT, which waits for the actual result
from one of the replicas.
@param key, key_size, value, value_size depend on the value of request_type.
@return 1 on success, 0 on failure.
*/
static int _chtEval(DfaSet * dfaSet,
unsigned char request_type,
unsigned char response_type,
state_machine_id * xid,
clusterHashTable_t * ht,
void * key, size_t * key_size,
void * value, size_t * value_size) {
/* Fill out a message payload. */
Message m;
if(ht != NULL) {
printf("_chtEval(request=%d, response=%d, xid=%ld, ht=%d ", request_type, response_type, *xid, ht->id);
} else {
printf("_chtEval(request=%d, response=%d, xid=%ld, ht=NULL ", request_type, response_type, *xid);
}
if(key == NULL) {
printf(")\n");
} else {
printf("key=%d)\n", *(int*)key);
}
* requestType(&m) = request_type;
m.response_type = response_type;
setKeyLength(&m, *key_size);
setValLength(&m, *value_size);
assert(checkPayload(&m));
if(key_size != 0) {
memcpy(getKeyAddr(&m), key, *key_size);
}
if(value_size != 0) {
memcpy(getValAddr(&m), value, *value_size);
}
if(ht != NULL) {
memcpy(&(__header_ptr(&m)->hashTable), ht, sizeof(clusterHashTable_t));
}
/* printf("%s <- %s\n", __header_ptr(&m)->initiator, dfaSet->networkSetup.localhost); */
/* Synchronously run the request */
request(dfaSet, response_type, "bc:0", *xid, &m);
if(!checkPayload(&m)) {
printf("_chtEval failed: Invalid response.\n");
assert(0);
}
/* Copy message contents back into caller's buffers, even if the
request failed. (There may be app-specific information in the
response...) */
if(ht != NULL) {
memcpy(ht, &(__header_ptr(&m)->hashTable), sizeof(clusterHashTable_t));
}
if (*key_size != 0) {
/* printf("\n+%x<-%x+, length %d value=%s and %s\n", (unsigned int) value, (unsigned int)getValAddr(&m), getValLength(&m), value, getValAddr(&m)); */
memcpy(value, getValAddr(&m), getValLength(&m));
}
if (*value_size != 0) {
memcpy(key, getKeyAddr(&m), getKeyLength(&m));
}
*xid = m.to_machine_id;
printf("+chtEval returning %d\n", m.type);
return m.type;
}
int cHtCreate(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * new_ht) {
size_t zero = 0;
return _chtEval(dfaSet, CREATE, AWAIT_COMMIT_POINT, &xid, new_ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
int cHtInsert(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t datlen) {
return _chtEval(dfaSet, INSERT, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, &datlen) != SUBORDINATE_VETO_2PC;
}
int cHtLookup(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) {
return _chtEval(dfaSet, LOOKUP, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
}
int cHtRemove(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) {
return _chtEval(dfaSet, REMOVE, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
}
int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht) {
size_t zero = 0;
return _chtEval(dfaSet, DELETE, AWAIT_COMMIT_POINT, &xid, ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) {
size_t zero = 0;
*xid = NULL_MACHINE; /* Will be overwritten by
_chtEval... Need a large random
value so that the request will
be serviced exactly once, but
will not conflict with real
transactions or other begins.*/
return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) {
size_t zero = 0;
return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero);
}
int cHtAbort(state_machine_id xid, DfaSet * dfaSet) {
size_t zero = 0;
return _chtEval(dfaSet, ABORT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero);
}*/

View file

@ -0,0 +1,22 @@
#include "cht_message.h"
#include <netinet/in.h>
void * getKeyAddr(Message *m) {
char * stuff = m->payload;
return (stuff + sizeof(payload_header)); /* Just add the header length. */
}
void * getValAddr(Message * m) {
return ((char*)getKeyAddr(m)) + getKeyLength(m); /* key address + key length. */
}
/**
@return 1 if the payload is valid (key_length and value length do not over-run the message's memory, 0 otherwise.)
*/
int checkPayload(Message * m) {
char * a = (char*)m;
char * b = getValAddr(m);
return (a+ sizeof(Message) ) >= (b + getValLength(m));
}

View file

@ -0,0 +1,55 @@
#include <libdfa/libdfa.h>
#define CREATE 1
#define INSERT 2
#define LOOKUP 3
#define REMOVE 4
#define DELETE 5
/** Unimplemented: Evaluate a function call from a table provided by the library user. */
#define TSTSET 6
#define GETXID 7
/* #define COMMIT 8
#define ABORT 9 */
typedef struct {
unsigned short key_length;
unsigned short value_length;
unsigned char request_type;
// unsigned char response_type;
int hashTable;
} payload_header;
#define __header_ptr(m) ((payload_header*)(&((m)->payload)))
#define __key_length(m) (&(__header_ptr(m)->key_length))
#define __value_length(m) (&(__header_ptr(m)->value_length))
#define getKeyLength(m) (ntohs(*__key_length(m)))
#define setKeyLength(m, x) (*__key_length(m)=htons(x))
#define getValLength(m) (ntohs(*__value_length(m)))
#define setValLength(m, x) (*__value_length(m)=htons(x))
#define requestType(m) (&(__header_ptr(m)->request_type))
#define responseType(m) (&(__header_ptr(m)->response_type))
void * getKeyAddr(Message *m);
void * getValAddr(Message * m);
int checkPayload(Message * m);
/*
static unsigned short* _key_length(Message * m) {
return &(__header_ptr(m)->key_length);
}
static unsigned short* _value_length(Message *m) {
return &(__header_ptr(m)->value_length);
}
static unsigned char * requestType(Message *m) {
return &(__header_ptr(m)->request_type);
}
static unsigned char * responseType(Message *m) {
return &(__header_ptr(m)->response_type);
}*/

BIN
src/apps/cht/cht_message.o Normal file

Binary file not shown.

236
src/apps/cht/cht_server.c Normal file
View file

@ -0,0 +1,236 @@
#include "cht.h"
#include "cht_message.h"
#include <assert.h>
#include <string.h>
#include <netinet/in.h>
#define setup_vars \
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \
CHTAppState * app_state_cht = app_state_2pc->app_state; \
jbHashTable_t * xid_ht = app_state_cht->xid_ht; \
jbHashTable_t * ht_ht = app_state_cht->ht_ht; \
int ht_xid = app_state_cht->ht_xid; \
int xid; \
int xid_exists = (-1 != jbHtLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)); \
jbHashTable_t ht; \
int ht_exists = (-1 != jbHtLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte*) &ht))
/** TODO For now, we ignore the possiblity that jbHashTable's functions
return error codes. Instead, we assume that they always
succeed. */
static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
int ret;
setup_vars;
switch(*requestType(m))
{
case CREATE:
{
jbHashTable_t * new = jbHtCreate(ht_xid, 79);
if(new != NULL) {
ret = (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)new, sizeof(jbHashTable_t)) >= 0);
} else {
ret = 0;
}
if(ret) {
printf("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable));
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
} else {
printf("Failed to insert new hash table slice!");
}
} break;
case INSERT:
{
if(!ht_exists) { printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(NULL); ret = 0; } else {
ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0);
printf("Insert: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), (char*)getValAddr(m));
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
}
} break;
case LOOKUP:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
printf("Lookup: %d ht=%d (key length %d) %d -> %s\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), (char*)getValAddr(m));
}
} break;
case REMOVE:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
ret = (jbHtRemove(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
}
} break;
case DELETE:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
jbHtRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m), NULL);
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
/* ret = (jbHtDelete(xid, &ht) >= 0); */ /* Don't need this--jbHtDelete just frees the (stack!) pointer. */
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
}
} break;
case TSTSET:
{
printf("Unimplemented request!\n");
} break;
case GETXID:
{
/* int new_xid = Tbegin();
if(new_xid >= 0) {
setKeyLength(m, 0);
setValLength(m, sizeof(int));
*((int*)getValAddr(m)) = new_xid;
ret = 1;
if(jbHtInsert(ht_xid, xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id), &xid, sizeof(int)) == -1) {
printf("Begin failed on jbHtInsert!\n");
} else {
printf("Created local xid for global xid: %ld\n", stateMachine->machine_id);
}
} else {
printf("Begin failed on Tbegin()!\n");
ret = 0;
} NOOP */
} break;
/* case COMMIT:
{
ret = (Tcommit(xid) >= 0);
} break;
case ABORT:
{
ret = (Tabort(xid) >= 0);
} break;
*/
default:
{
printf("Unknown request type: %d\n", *requestType(m));
}
}
return ret;
}
/* Run by the coordinator when the request is received from the client. */
state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
CHTAppState * app_state_cht = app_state_2pc->app_state;
if(m->type != m->response_type) {
printf("Bug in client! m->type != response_type(m).\n");
}
if(*requestType(m) == CREATE) {
clusterHashTable_t new_cht;
new_cht.id = app_state_cht->next_hashTableId;
app_state_cht->next_hashTableId++;
memcpy(&(__header_ptr(m)->hashTable), &new_cht, sizeof(clusterHashTable_t));
printf("Allocated hashtable %d\n", new_cht.id);
}
printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), m->response_type, *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id);
return 1;
}
/* Begins new transaction, does the work the transaction requests, stores the message, and returns the corresponding error code. */
state_name veto_or_prepare_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
state_name ret;
setup_vars;
// int xid_exists = (-1 != jbHtLookup(ht_xid, app_state_cht->xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id, &xid));
if(xid_exists) { printf("Warning: Stale xid found!\n"); }
assert(!xid_exists);
printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), m->response_type, *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id);
/* This is the start of a new transaction */
xid = Tbegin(); // !!!!
if(xid < 0) {
printf("Tbegin failed; %d\n", xid);
} else if(jbHtInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)) == -1) {
printf("jbHtInsert failed.\n");
} else {
xid_exists = 1;
}
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
if(xid_exists) {
ret = do_work(dfaSet, stateMachine, m, from);
ret = ret ? SUBORDINATE_PREPARED_2PC : SUBORDINATE_VETO_2PC;
} else {
ret = SUBORDINATE_VETO_2PC;
}
if(ret == SUBORDINATE_VETO_2PC) {
abort_cht(dfaSet, stateMachine, m, from);
}
return ret;
}
state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
setup_vars;
printf("Aborting!!\n");
assert(xid_exists);
Tabort(xid); // !!!!
jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
return 1;
}
state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
setup_vars;
assert(xid_exists);
Tcommit(xid);
jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
/* }*/
/** @todo why was there an assert(0) in commit_cht?? */
/** @todo On a commit, should responses of type AWAIT_RESULT ever be done by subordinates? */
// if(m->response_type == AWAIT_RESULT) {
// printf("commit_cht responding on an AWAIT_RESULT request.\n");
// assert(0);
/* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */
// }
/* TODO: Check error codes, and return accordingly... */
return 1;
}
state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
return 1;
}