Cluster hash table now allows transactions to contain multiple requests.

This commit is contained in:
Sears Russell 2005-02-03 02:01:23 +00:00
parent 54ecc0d3c4
commit a27787b2fa
12 changed files with 330 additions and 177 deletions

View file

@ -54,6 +54,10 @@ terms specified in this license.
*/
#define DFA_MACHINE_COUNT 100
/** Defined in libdfa.c */
callback_fcn callback_false;
callback_fcn callback_true;
typedef struct dfaSet {
/* MonoTree monoTree; */
smash_t * smash;

View file

@ -112,7 +112,6 @@ typedef struct stateMachine {
typedef state_name(callback_fcn)(void * dfaSet, StateMachine * stateMachine, Message * m, char * from);
/* All function pointers follow this prototype:
TODO
@ -130,7 +129,8 @@ typedef struct state {
*/
callback_fcn* retry_fcn;
/** NULL unless the machine can be aborted while in this state. If
not-null, then it should point to a function that performs a
not-null, then
it should point to a function that performs a
No-op or does any house-keeping that should be performed before
the machine gets nuked.

View file

@ -48,10 +48,6 @@ terms specified in this license.
#define FALSE 0
/* #define _TWO_PC 1 */
const int transition_count_2pc = 14;
const int client_transition_count_2pc = 4;
const int state_count_2pc = 9;
callback_fcn check_veto_2pc;
@ -61,13 +57,28 @@ callback_fcn coordinator_init_xact_2pc;
callback_fcn veto_or_prepare_2pc;
callback_fcn abort_2pc;
callback_fcn commit_2pc;
callback_fcn eval_action_2pc;
/* new fcns */
callback_fcn coordinator_continue_xact_2pc;
/* Remember to update transition_count_2pc if you add/remove transitions */
const int transition_count_2pc = 26;
Transition transitions_2pc[] = {
/* Coordinator transitions */
{ XACT_ACK_ARRIVAL, NULL_STATE, XACT_ACTION_RUNNING, coordinator_init_xact_2pc, FALSE },
{ XACT_ACK_RESULT, NULL_STATE, XACT_ACTION_RUNNING, coordinator_init_xact_2pc, FALSE },
{ XACT_ACK_ARRIVAL, XACT_ACTIVE, XACT_ACTION_RUNNING, coordinator_continue_xact_2pc, FALSE },
{ XACT_ACK_RESULT, XACT_ACTIVE, XACT_ACTION_RUNNING, coordinator_continue_xact_2pc, FALSE },
{ XACT_COMMIT, XACT_ACTIVE, COORDINATOR_START_2PC, coordinator_continue_xact_2pc, FALSE },
{ XACT_ABORT, XACT_ACTIVE, COORDINATOR_START_2PC, coordinator_continue_xact_2pc, FALSE },
{ XACT_SUBORDINATE_ACK, XACT_ACTION_RUNNING, XACT_ACTIVE, &tally_2pc, FALSE },
/* Library user must provide callback that init_xact_2pc calls. */
{ AWAIT_ARRIVAL, NULL_STATE, COORDINATOR_START_2PC, coordinator_init_xact_2pc, FALSE },
{ AWAIT_COMMIT_POINT, NULL_STATE, COORDINATOR_START_2PC, coordinator_init_xact_2pc, FALSE },
@ -83,8 +94,12 @@ Transition transitions_2pc[] = {
/* Subordinate transitions */
/* veto_or_prepare overrides target state. */
{ XACT_ACTION_RUNNING, NULL_STATE, XACT_ACTIVE, &eval_action_2pc, FALSE},
{ XACT_ACTION_RUNNING, XACT_ACTIVE, XACT_ACTIVE, &eval_action_2pc, FALSE},
/* Library user must provide the subordinate function pointers for these transitions */
{ COORDINATOR_START_2PC, NULL_STATE, OVERRIDDEN_STATE, &veto_or_prepare_2pc, TRUE },
{ COORDINATOR_START_2PC, XACT_ACTIVE, OVERRIDDEN_STATE, &veto_or_prepare_2pc, TRUE },
{ COORDINATOR_ABORTING_2PC, SUBORDINATE_PREPARED_2PC, NULL_STATE, &abort_2pc, FALSE},
{ COORDINATOR_COMMITTING_2PC, SUBORDINATE_PREPARED_2PC, NULL_STATE, &commit_2pc, FALSE},
{ COORDINATOR_ABORTING_2PC, SUBORDINATE_VETO_2PC, NULL_STATE, NULL, FALSE},
@ -92,9 +107,13 @@ Transition transitions_2pc[] = {
/* transition fcn always fails, but sends ack to coordinator */
{ COORDINATOR_COMMITTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE},
{ COORDINATOR_ABORTING_2PC, NULL_STATE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE},
{ COORDINATOR_COMMITTING_2PC, XACT_ACTIVE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE},
{ COORDINATOR_ABORTING_2PC, XACT_ACTIVE, OVERRIDDEN_STATE, &send_ack_2pc, TRUE},
};
const int client_transition_count_2pc = 8;
Transition client_transitions_2pc[] = {
/* Caller transitions */
@ -106,9 +125,16 @@ Transition client_transitions_2pc[] = {
{ SUBORDINATE_ACKING_2PC, AWAIT_RESULT, NULL_STATE, NULL, FALSE},
{ XACT_ACTION_RUNNING, XACT_ACK_ARRIVAL, NULL_STATE, NULL, FALSE},
{ XACT_ACTIVE, XACT_ACK_RESULT, NULL_STATE, NULL, FALSE},
{ COORDINATOR_COMMITTING_2PC, XACT_COMMIT, NULL_STATE, NULL, FALSE},
{ COORDINATOR_ABORTING_2PC, XACT_ABORT, NULL_STATE, NULL, FALSE}
};
const int state_count_2pc = 16;
State states_2pc[MAX_STATE_COUNT] = {
/* Coordinator states */
@ -117,18 +143,32 @@ State states_2pc[MAX_STATE_COUNT] = {
{ COORDINATOR_COMMITTING_2PC, NULL, NULL },
{ COORDINATOR_ABORTING_2PC, NULL, NULL },
{ XACT_ACTION_RUNNING, NULL, NULL },
/* Subordinate states */
{ SUBORDINATE_VETO_2PC, NULL, NULL }, /* Need to think about callback fcns */
{ SUBORDINATE_PREPARED_2PC, NULL, NULL },
{ SUBORDINATE_ACKING_2PC, NULL, NULL },
{ XACT_SUBORDINATE_ACK, NULL, NULL }, /* Just used for one-time acks. */
/* mixed client/subordinate states */
{ XACT_ACTIVE, callback_false, NULL}, /* Never send a message indicating the current state. */
/* Client states */
{ AWAIT_ARRIVAL, NULL, NULL},
{ AWAIT_RESULT, NULL, NULL},
{ AWAIT_COMMIT_POINT, NULL, NULL},
{ XACT_ACK_RESULT, NULL, NULL},
{ XACT_ACK_ARRIVAL, NULL, NULL},
{ XACT_COMMIT, NULL, NULL},
{ XACT_ABORT, NULL, NULL}
};
/*
@ -141,6 +181,40 @@ State states_2pc[MAX_STATE_COUNT] = {
*/
state_name coordinator_continue_xact_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
int ret;
TwoPCMachineState * state = (TwoPCMachineState*) &(stateMachine->app_state);
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
/** Overwrite the from parameter, with the appropriate broadcast group. */
short bc_group = app_state->get_broadcast_group(dfaSet, m);
sprintf(from, "bc:%d\n", bc_group);
memset(state->subordinate_votes, 0, MAX_SUBORDINATES);
if(app_state->continue_xact_2pc != NULL) {
ret = app_state->continue_xact_2pc(dfaSet, stateMachine, m, from);
} else {
ret = 1;
}
// printf("CONTINUE %ld\n", m->initiator_machine_id);
if(m->type==AWAIT_ARRIVAL && ret) {
// need to (n)ack the client:
// Respond using the machine id expected by the client.
m->from_machine_id = m->initiator_machine_id;
/** @todo What if the message to the client is dropped? Is there an easy way to just ACK if it resends? */
printf("ACK %ld (to %s)\n", m->initiator_machine_id, m->initiator);
respond_once(&((DfaSet*)dfaSet)->networkSetup,
COORDINATOR_START_2PC, m, m->initiator);
}
m->from_machine_id = stateMachine->machine_id;
return ret;
}
/* Probably should ack the client that called commit(), instead of
hoping that UDP got it here for them... ;) So, they should have a
@ -161,8 +235,6 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine,
memset(state->subordinate_votes, 0, MAX_SUBORDINATES);
state->xid = stateMachine->machine_id;
// handled by the client.
/*strncpy(state->initiator, from, MAX_ADDRESS_LENGTH);*/
if(strncmp(m->initiator, from, MAX_ADDRESS_LENGTH)) {
printf("WARNING: Mismatch between request source (%s) and initiator field (%s). Proceeding. (Trusting the client)\n", m->initiator, from);
@ -170,23 +242,16 @@ state_name coordinator_init_xact_2pc(void * dfaSet, StateMachine * stateMachine,
sprintf(from, "bc:%d\n", bc_group);
/* TODO: (n)ack the client. (Implies yes / no / already pending return values for callback on last line)
Currently, this is handled by the library user. It could be moved back into here.
*/
if(app_state->init_xact_2pc != NULL) {
ret = app_state->init_xact_2pc(dfaSet, stateMachine, m, from);
} else {
ret = 1;
}
printf("INIT %ld\n", m->initiator_machine_id);
printf("INIT %ld\n", m->initiator_machine_id); fflush(stdout);
if(m->type==AWAIT_ARRIVAL && ret) {
// need to (n)ack the client:
// Respond using the machine id expected by the client.
// (n)ack the client: Respond using the machine id expected by the client.
m->from_machine_id = m->initiator_machine_id;
// printf("Responding\n");
printf("ACK %ld (to %s)\n", m->initiator_machine_id, m->initiator);
@ -213,7 +278,7 @@ state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Messa
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
int ret = app_state->veto_or_prepare_2pc(dfaSet, stateMachine, m, from);
// printf("veto_or_prepare returned: %d", ret);
if(ret == SUBORDINATE_VETO_2PC) {
printf("VETO %ld\n", m->to_machine_id);
} else {
@ -223,6 +288,22 @@ state_name veto_or_prepare_2pc(void * dfaSet, StateMachine * stateMachine, Messa
return ret;
}
state_name eval_action_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
int ret = app_state->eval_action_2pc(dfaSet, stateMachine, m, from);
if(ret) {
respond_once(&((DfaSet*)dfaSet)->networkSetup,
XACT_SUBORDINATE_ACK, m, from);
}
/* Just ack with the respond_once. Don't need to change states. */
if(stateMachine->current_state == XACT_ACTIVE) {
return 0;
} else {
return 1;
}
}
/**
TODO: The next two functions should fork and immediately return true.
*/
@ -285,18 +366,17 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
TwoPCAppState * app_state = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
/* if (!check_from()) { return 0; } */
short bc_group = app_state->get_broadcast_group(dfaSet, m);
// fprintf(stderr, "tally: %s, broadcast group: %d\n", from, bc_group);
// fprintf(stderr, "tally: %s, broadcast group: %d\n", from, bc_group);
if(bc_group < dfaSet->networkSetup.broadcast_lists_count+1) {
state_name ret;
if(bc_group == ALL_BUT_GROUP_ZERO) {
char ** list;
int count = consolidate_bc_groups(&list, &dfaSet->networkSetup);
/* int i;
for(i = 0; i < count; i++) {
// int i;
/* for(i = 0; i < count; i++) {
fprintf(stderr, "count = %d tallyhost %d: %s\n", count, i, list[i]);
} */
} */
ret = tally(list, count, (char*)(machine_state->subordinate_votes), from);
free(list);
} else {
@ -323,18 +403,37 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
/* TODO: CORRECTNESS BUG Make sure this is after tally forces the log. Also, need to
make sure that it increments the (currently unimplemented)
sequence number before flushing... */
// printf("committed ret = %d response_type = %d ");
if(ret) {
printf("COMMIT POINT %ld\n", m->to_machine_id);
}
if(ret && (m->response_type == AWAIT_COMMIT_POINT && stateMachine->current_state == COORDINATOR_START_2PC)) {
// printf("sending ack to %s", m->initiator);
// if(ret && (*responseType(m) == AWAIT_COMMIT_POINT && stateMachine->current_state==COORDINATOR_START_2PC)) {
if(ret && (
(
(
m->response_type == AWAIT_COMMIT_POINT ||
m->response_type==XACT_COMMIT ||
m->response_type==XACT_ABORT
) && (
stateMachine->current_state == COORDINATOR_START_2PC
)
) || (
m->response_type == XACT_ACK_RESULT &&
stateMachine->current_state == XACT_ACTION_RUNNING
)
)
) {
// printf("sending ack to %s (%d)\n", m->initiator, m->response_type); fflush(stdout);
state_machine_id tmp = m->from_machine_id;
m->from_machine_id = m->initiator_machine_id;
// printf("Coordinator responding: ? ht=? (key length %d) %d -> /*%d*/ to %s:%ld\n", 0/*getKeyLength(m),*/ , 0, 0, /**(int*)getKeyAddr(m), *//**(int*)getValAddr(m),*/ m->initiator, m->initiator_machine_id );
//debug_print_message(m);
respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator);
if(m->response_type == AWAIT_COMMIT_POINT || m->response_type == XACT_COMMIT || m->response_type == XACT_ABORT) {
printf("COMMIT POINT %ld\n", m->to_machine_id);
fflush(stdout);
respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator);
} else {
// printf("COMPLETED %ld\n", m->to_machine_id);
respond_once(&((DfaSet*)dfaSet)->networkSetup, XACT_ACTIVE, m, m->initiator);
}
m->from_machine_id = tmp;
}
// printf("\n");

View file

@ -95,6 +95,16 @@ gets to reuse the transaction id.
#define AWAIT_COMMIT_POINT 212
#define AWAIT_RESULT 213
#define XACT_ACK_RESULT 221
#define XACT_ACK_ARRIVAL 222
#define XACT_ACTIVE 223
#define XACT_ACTION_RUNNING 224
#define XACT_COMMIT 225
#define XACT_ABORT 226
#define XACT_SUBORDINATE_ACK 227
/**
The callbacks are called whenever the transition 'should' succeed.
Other than tally_2pc, they are always called when a
@ -132,6 +142,8 @@ typedef struct {
two broadcast groups.) */
char is_coordinator;
callback_fcn *init_xact_2pc;
callback_fcn *continue_xact_2pc;
callback_fcn *eval_action_2pc;
callback_fcn *veto_or_prepare_2pc;
callback_fcn *abort_2pc;
callback_fcn *commit_2pc;

View file

@ -101,17 +101,9 @@ short multiplex_interleaved(DfaSet * dfaSet, Message * m) {
state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from);
int xid_exists(int ht_xid, recordid xid_ht, StateMachine * stateMachine) {
int * xid = 0;
/* if (-1 == jbHtLookup(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)) {
return 0;
} else {
assert(xid);
return xid;
} */
int size = ThashLookup(ht_xid, xid_ht,
(byte*)&(stateMachine->machine_id), sizeof(state_machine_id),
(byte**)&xid);
@ -155,15 +147,15 @@ DfaSet * cHtInit(int cht_type,
}
if(cht_type != CHT_CLIENT) {
/* chtApp_state->xid_ht = jbHtCreate(xid, 79);
chtApp_state->ht_ht = jbHtCreate(xid, 79); */
chtApp_state->xid_ht = ThashCreate(xid, sizeof(state_machine_id), sizeof(int));
chtApp_state->ht_ht = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
chtApp_state->ht_ht = ThashCreate(xid, sizeof(clusterHashTable_t), sizeof(recordid));
chtApp_state->ht_xid = Tbegin(); // !!!!
chtApp_state->next_hashTableId = 0;
twoPC_state->is_coordinator = (cht_type == CHT_COORDINATOR);
twoPC_state->init_xact_2pc = init_xact_cht;
twoPC_state->continue_xact_2pc = NULL;
twoPC_state->eval_action_2pc = eval_action_cht;
twoPC_state->veto_or_prepare_2pc = veto_or_prepare_cht;
twoPC_state->abort_2pc = abort_cht;
twoPC_state->commit_2pc = commit_cht;

View file

@ -140,8 +140,8 @@ DfaSet * cHtClientInit(char * config_file);
DfaSet * cHtCoordinatorInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *));
DfaSet * cHtSubordinateInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *), int subordinate_number);
int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet);
/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet);
int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/
int cHtCommit(state_machine_id xid, DfaSet * dfaSet);
/* int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/
/** The server side state for a CHT. */
typedef struct {
@ -155,6 +155,7 @@ typedef struct {
} CHTAppState;
callback_fcn eval_action_cht;
callback_fcn veto_or_prepare_cht;
callback_fcn commit_cht;
callback_fcn tally_cht;

View file

@ -4,6 +4,10 @@
#include <netinet/in.h>
#include <assert.h>
#include <string.h>
#define REQUEST_TYPE XACT_ACK_RESULT
//#define REQUEST_TYPE AWAIT_COMMIT_POINT
/**
The client side function that 'does everything'
@ -103,25 +107,25 @@ static int _chtEval(DfaSet * dfaSet,
int cHtCreate(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * new_ht) {
size_t zero = 0;
return _chtEval(dfaSet, CREATE, AWAIT_COMMIT_POINT, &xid, new_ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
return _chtEval(dfaSet, CREATE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, new_ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
int cHtInsert(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t datlen) {
return _chtEval(dfaSet, INSERT, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, &datlen) != SUBORDINATE_VETO_2PC;
return _chtEval(dfaSet, INSERT, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, &datlen) != SUBORDINATE_VETO_2PC;
}
int cHtLookup(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) {
return _chtEval(dfaSet, LOOKUP, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
return _chtEval(dfaSet, LOOKUP, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
}
int cHtRemove(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) {
return _chtEval(dfaSet, REMOVE, AWAIT_COMMIT_POINT, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
return _chtEval(dfaSet, REMOVE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
}
int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht) {
size_t zero = 0;
return _chtEval(dfaSet, DELETE, AWAIT_COMMIT_POINT, &xid, ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
return _chtEval(dfaSet, DELETE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) {
@ -132,26 +136,26 @@ int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) {
be serviced exactly once, but
will not conflict with real
transactions or other begins.*/
return _chtEval(dfaSet, GETXID, AWAIT_ARRIVAL, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
return _chtEval(dfaSet, GETXID, /*AWAIT_ARRIVAL,*/ REQUEST_TYPE, xid, NULL, NULL, &zero, NULL, &zero) != SUBORDINATE_VETO_2PC;
}
DfaSet * cHtClientInit(char * configFile) {
NetworkSetup * config = readNetworkConfig(configFile, 0);
assert(config->coordinator);
printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n",
config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0]));
// printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n",
// config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0]));
DfaSet * ret = cHtInit(CHT_CLIENT, NULL, config);
assert(config->coordinator);
free (config);
return ret;
}
/*int cHtCommit(state_machine_id xid, DfaSet * dfaSet) {
int cHtCommit(state_machine_id xid, DfaSet * dfaSet) {
size_t zero = 0;
return _chtEval(dfaSet, COMMIT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero);
return _chtEval(dfaSet, COMMIT, XACT_COMMIT, &xid, NULL, NULL, &zero, NULL, &zero);
}
/*
int cHtAbort(state_machine_id xid, DfaSet * dfaSet) {
size_t zero = 0;
return _chtEval(dfaSet, ABORT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero);

View file

@ -1,3 +1,4 @@
#include "cht.h"
#include "cht_message.h"
#include <netinet/in.h>
void * getKeyAddr(Message *m) {

View file

@ -1,3 +1,4 @@
#include <libdfa/libdfa.h>
#define CREATE 1
@ -8,15 +9,14 @@
/** Unimplemented: Evaluate a function call from a table provided by the library user. */
#define TSTSET 6
#define GETXID 7
/* #define COMMIT 8
#define ABORT 9 */
#define COMMIT 8
/* #define ABORT 9 */
typedef struct {
unsigned short key_length;
unsigned short value_length;
unsigned char request_type;
// unsigned char response_type;
int hashTable;
clusterHashTable_t hashTable;
} payload_header;
#define __header_ptr(m) ((payload_header*)(&((m)->payload)))

View file

@ -3,75 +3,84 @@
#include <assert.h>
#include <string.h>
#include <netinet/in.h>
/*#define setup_vars \
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \
CHTAppState * app_state_cht = app_state_2pc->app_state; \
jbHashTable_t * xid_ht = app_state_cht->xid_ht; \
jbHashTable_t * ht_ht = app_state_cht->ht_ht; \
int ht_xid = app_state_cht->ht_xid; \
int xid; \
int xid_exists = (-1 != jbHtLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)); \
jbHashTable_t ht; \
int ht_exists = (-1 != jbHtLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte*) &ht))*/
#define setup_vars \
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \
CHTAppState * app_state_cht = app_state_2pc->app_state; \
recordid xid_ht = app_state_cht->xid_ht; \
recordid ht_ht = app_state_cht->ht_ht; \
int ht_xid = app_state_cht->ht_xid; \
int xid; \
int * xid_ptr = 0; \
int xid_exists = (-1 != ThashLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte**)&xid_ptr)); \
if(xid_ptr) { xid=*xid_ptr; free(xid_ptr); }\
recordid ht; \
recordid * ht_ptr = 0; \
int ht_exists = (-1 != ThashLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte**) &ht_ptr)); \
if(ht_ptr) { ht=*ht_ptr; free(ht_ptr) ;}
int ht_xid = app_state_cht->ht_xid;
/** TODO For now, we ignore the possiblity that jbHashTable's functions
int getXid(int ht_xid, recordid xid_ht, state_machine_id id) {
int * xid;
int ret;
int size = ThashLookup(ht_xid, xid_ht, (byte*)&id, sizeof(state_machine_id), (byte**)&xid);
if(size == sizeof(int)) {
ret = *xid;
free(xid);
} else {
assert(size == -1);
ret = -1;
}
return ret;
}
recordid getHashTable (int ht_xid, recordid ht_ht, clusterHashTable_t hashTable) {
recordid * ht;
recordid ret;
int size = ThashLookup(ht_xid, ht_ht, (byte*)&(hashTable), sizeof(clusterHashTable_t), (byte**)&ht);
if(size == sizeof(recordid)) {
ret = *ht;
free(ht);
} else {
assert(size == -1);
ret.page = 0;
ret.slot = 0;
ret.size = -1;
}
return ret;
}
/** TODO For now, we ignore the possiblity that LLADD's functions
return error codes. Instead, we assume that they always
succeed. */
static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
setup_vars;
recordid ht = getHashTable(ht_xid, app_state_cht->ht_ht, __header_ptr(m)->hashTable);
int xid = getXid(ht_xid, xid_ht, stateMachine->machine_id);
int ht_exists = (ht.size != -1);
int ret = 1;
switch(*requestType(m))
{
case CREATE:
{
recordid new = ThashCreate(ht_xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
// jbHashTable_t * new = jbHtCreate(ht_xid, 79);
// if(new != NULL) {
ThashInsert(ht_xid, ht_ht,
ThashInsert(ht_xid, app_state_cht->ht_ht,
(byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),
(byte*)&new, sizeof(recordid));
// ret = (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)new, sizeof(jbHashTable_t)) >= 0);
// } else {
// ret = 0;
// }
// if(ret) {
printf("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable)); fflush(stdout);
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
// } else {
// printf("Failed to insert new hash table slice!");
// }
DEBUG("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable));
//Tcommit(app_state_cht->ht_xid);
//app_state_cht->ht_xid = Tbegin();
} break;
case INSERT:
{
if(!ht_exists) {
printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(stdout); ret = 0;
printf ("Hash table %d doesn't exist!\n", __header_ptr(m)->hashTable.id); fflush(stdout); ret = 0;
} else {
//ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0);
ThashInsert(xid, ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m));
printf("Insert: %d ht=%d (key length %d) %d -> %d\n", ret,
DEBUG("Insert: %d ht=%d (key length %d) %d -> %d\n", ret,
(__header_ptr(m)->hashTable), getKeyLength(m),
*(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(stdout);
// (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
}
} break;
@ -91,10 +100,9 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message *
ret = 0;
}
// ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
printf("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret,
DEBUG("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret,
(__header_ptr(m)->hashTable), getKeyLength(m),
*(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(stdout);
}
} break;
@ -103,22 +111,17 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message *
if(!ht_exists) {
printf ("Hash table doesn't exist!\n"); fflush(stdout); ret = 0;
} else {
/** @todo no longer return old value on remove... */
/** @todo we no longer return old value on remove... */
ret = ThashRemove(xid, ht, getKeyAddr(m), getKeyLength(m));
// ret = (jbHtRemove(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
// (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
}
} break;
case DELETE:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
ThashRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m));
// jbHtRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m), NULL);
// (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
/* ret = (jbHtDelete(xid, &ht) >= 0); */ /* Don't need this--jbHtDelete just frees the (stack!) pointer. */
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(stdout); ret = 0; } else {
ThashRemove(xid, app_state_cht->ht_ht, getKeyAddr(m), getKeyLength(m));
//Tcommit(app_state_cht->ht_xid);
//app_state_cht->ht_xid = Tbegin();
}
} break;
@ -147,14 +150,16 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message *
ret = 0;
} NOOP */
} break;
/* case COMMIT:
case COMMIT: // placeholder (2pc commits for us)
break;
/*
case COMMIT:
{
ret = (Tcommit(xid) >= 0);
} break;
case ABORT:
{
{
ret = (Tabort(xid) >= 0);
} break;
*/
@ -184,70 +189,98 @@ state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m
memcpy(&(__header_ptr(m)->hashTable), &new_cht, sizeof(clusterHashTable_t));
printf("Allocated hashtable %d\n", new_cht.id);
DEBUG("Allocated hashtable %d\n", new_cht.id);
}
printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), m->response_type, *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id);
// printf("requestType: %d, responseType: %d from %s:%ld\n", *requestType(m), m->response_type, /**(int*)getKeyAddr(m),*/ m->initiator, m->initiator_machine_id);
return 1;
}
/* Begins new transaction, does the work the transaction requests, stores the message, and returns the corresponding error code. */
state_name veto_or_prepare_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
state_name ret;
setup_vars;
// int xid_exists = (-1 != jbHtLookup(ht_xid, app_state_cht->xid_ht, &(stateMachine->machine_id), sizeof(state_machine_id, &xid));
if(xid_exists) { printf("Warning: Stale xid found!\n"); }
assert(!xid_exists);
printf("requestType: %d, responseType: %d key: %d from %s:%ld\n", *requestType(m), m->response_type, *(int*)getKeyAddr(m), m->initiator, m->initiator_machine_id);
int xid = getXid(ht_xid, app_state_cht->xid_ht, stateMachine->machine_id);
int xid_exists = (-1 != xid);
if(stateMachine->current_state != XACT_ACTIVE) {
if(xid_exists) { printf("Warning: Stale xid found!\n"); }
assert(!xid_exists);
// printf("requestType: %d, responseType: %d from %s:%ld\n", *requestType(m), m->response_type, /**(int*)getKeyAddr(m),*/ m->initiator, m->initiator_machine_id);
/* This is the start of a new transaction */
xid = Tbegin(); // !!!!
if(xid < 0) {
printf("Tbegin failed; %d\n", xid);
// } else if(jbHtInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)) == -1) {
} else {
ThashInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int));
xid_exists = 1;
xid = Tbegin();
if(xid < 0) {
printf("Tbegin failed; %d\n", xid);
} else {
ThashInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int));
xid_exists = 1;
}
}
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
ret = do_work(dfaSet, stateMachine, m, from);
ret = ret ? SUBORDINATE_PREPARED_2PC : SUBORDINATE_VETO_2PC;
if(ret == SUBORDINATE_VETO_2PC) {
abort_cht(dfaSet, stateMachine, m, from);
}
return ret;
}
state_name eval_action_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
state_name ret;
setup_vars;
int xid = getXid(ht_xid, app_state_cht->xid_ht, stateMachine->machine_id);
int xid_exists = (-1 != xid);
if(xid_exists) {
assert(stateMachine->current_state == XACT_ACTIVE);
} else {
/* This is the start of a new transaction */
xid = Tbegin();
if(xid < 0) {
printf("Tbegin failed; %d\n", xid);
} else {
ThashInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int));
xid_exists = 1;
}
}
// printf("requestType: %d, responseType: %d from %s:%ld\n", *requestType(m), m->response_type, /**(int*)getKeyAddr(m),*/ m->initiator, m->initiator_machine_id);
if(xid_exists) {
ret = do_work(dfaSet, stateMachine, m, from);
ret = ret ? SUBORDINATE_PREPARED_2PC : SUBORDINATE_VETO_2PC;
} else {
ret = SUBORDINATE_VETO_2PC;
assert(ret); // for now!
}
if(ret == SUBORDINATE_VETO_2PC) {
abort_cht(dfaSet, stateMachine, m, from);
}
// if(ret == SUBORDINATE_VETO_2PC) {
// abort_cht(dfaSet, stateMachine, m, from);
// }
return ret;
}
state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
setup_vars;
int xid = getXid(ht_xid, app_state_cht->xid_ht, stateMachine->machine_id);
int xid_exists = (-1 != xid);
printf("Aborting!!\n");
assert(xid_exists);
Tabort(xid); // !!!!
// jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
ThashRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id));
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
// Tcommit(app_state_cht->ht_xid);
// app_state_cht->ht_xid = Tbegin();
return 1;
}
@ -255,21 +288,17 @@ state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, ch
state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
setup_vars;
int xid = getXid(ht_xid, app_state_cht->xid_ht, stateMachine->machine_id);
int xid_exists = (-1 != xid);
assert(xid_exists);
Tcommit(xid);
// jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
ThashRemove(app_state_cht->ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id));
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
/* }*/
/** @todo why was there an assert(0) in commit_cht?? */
/** @todo On a commit, should responses of type AWAIT_RESULT ever be done by subordinates? */
// if(m->response_type == AWAIT_RESULT) {
// printf("commit_cht responding on an AWAIT_RESULT request.\n");
// assert(0);
/* respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, __header_ptr(m)->initiator); */
// }
/* TODO: Check error codes, and return accordingly... */
return 1;
}
@ -280,10 +309,6 @@ state_name tally_cht(void * dfaSet, StateMachine * stateMachine, Message * m, ch
DfaSet * cHtCoordinatorInit(char * configFile, short (*partition_function)(DfaSet *, Message *)) {
NetworkSetup * config = readNetworkConfig(configFile, COORDINATOR);
/* DfaSet * ret = cHtInit(CHT_COORDINATOR, config->localhost, partition_function,
config->localport, config->broadcast_lists, config->broadcast_lists_count,
config->broadcast_list_host_count);
free (config);*/
DfaSet * ret = cHtInit(CHT_COORDINATOR, partition_function, config);
free(config);
return ret;
@ -291,14 +316,10 @@ DfaSet * cHtCoordinatorInit(char * configFile, short (*partition_function)(DfaSe
DfaSet * cHtSubordinateInit(char * configFile, short (*partition_function)(DfaSet *, Message *), int subordinate_number) {
NetworkSetup * config = readNetworkConfig(configFile, subordinate_number);
/* DfaSet * ret = cHtInit(CHT_SERVER, config->localhost, partition_function,
config->localport, config->broadcast_lists, config->broadcast_lists_count,
config->broadcast_list_host_count);*/
DfaSet * ret = cHtInit(CHT_SERVER, partition_function, config);
free (config);
return ret;
}
void debug_print_message(Message * m) {
printf("debug: (key length %d) %d -> %d\n", getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(NULL);
}

View file

@ -328,6 +328,15 @@ void* main_loop(DfaSet *dfaSet) {
}
}
state_name callback_false(void * dfaSet, StateMachine * stateMachine,
Message * message, char * message_recipient) {
return 0;
}
state_name callback_true(void * dfaSet, StateMachine * stateMachine,
Message * message, char * message_recipient) {
return 1;
}
void * inner_worker_loop(void * arg_void) {
WorkerLoopArgs * arg = arg_void;
@ -336,7 +345,7 @@ void * inner_worker_loop(void * arg_void) {
int timeout = 0; /* Run through the loop immediately the first time around. */
int state = 0;
// int first = 1;
StateMachine* stateMachine;
@ -416,11 +425,22 @@ void * inner_worker_loop(void * arg_void) {
if(dfaSet->states[i].name == stateMachine->current_state) {
state_idx = i;
}
}
}
DEBUG("Worker loop for state machine: %ld still active\n", machine_id);
send_message(&(dfaSet->networkSetup), &(stateMachine->message), stateMachine->message_recipient);
int send = 1;
if(dfaSet->states[state_idx].retry_fcn != NULL) {
send = dfaSet->states[state_idx].retry_fcn(dfaSet, stateMachine, &(stateMachine->message), stateMachine->message_recipient);
}
if(send) {
/* if(first) {
first = 0;
} else {
printf("Resending message. Machine # %ld State # %d\n", stateMachine->machine_id, stateMachine->current_state);
} */
send_message(&(dfaSet->networkSetup), &(stateMachine->message), stateMachine->message_recipient);
}
}
@ -530,7 +550,7 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
asprintf(&initiator, "%s:%d", dfaSet->networkSetup.localhost, dfaSet->networkSetup.localport);
strcpy(initial_sm->message.initiator, initiator);
free(initiator);
DEBUG("Set message initiator to %s", initial_sm->message.initiator);
// printf("Set message initiator to %s\n", initial_sm->message.initiator); fflush(stdout);
initial_sm->message.initiator_machine_id = initial_sm->machine_id;
strcpy(initial_sm->message_recipient, recipient_addr);
@ -539,7 +559,6 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
ret = (int)run_request(dfaSet, machine_id);
writelock(dfaSet->lock, machine_id);
assert(initial_sm == getSmash(dfaSet->smash, machine_id));
if(message != NULL) {

View file

@ -21,10 +21,10 @@ int main(int argc, char ** argv) {
clusterHashTable_t * new_ht;
cHtCreate(xid, cht_client, new_ht);
int i;
for(i = 0; i < 100; i++) {
for(i = 0; i < 1000; i++) {
int one = i; int two = i+1;
cHtInsert(xid, cht_client, new_ht, &one, sizeof(int), &two, sizeof(int));
xid++;
// xid++;
/* int i =0;
for(i =0; i < 100; i++) {
printf("\n");
@ -36,14 +36,14 @@ int main(int argc, char ** argv) {
newTwo = 0;
unsigned int newLen = sizeof(int);
int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen);
xid++;
// xid++;
//printf("lookup returned %d (%d->%d)\n", ret, newOne, newTwo);
assert(ret);
assert(newOne == one);
assert(newTwo == two);
assert(newLen == sizeof(int));
}
cHtCommit(xid, cht_client);
/** @todo devise a way to cleanly shut a CHT down. */
// dfa_free(cht_client);