Fixed some cht bugs. Abort is flakey.

This commit is contained in:
Sears Russell 2005-02-06 03:48:12 +00:00
parent f96061a1bd
commit e33319175f
10 changed files with 95 additions and 48 deletions

View file

@ -64,7 +64,7 @@ callback_fcn coordinator_continue_xact_2pc;
/* Remember to update transition_count_2pc if you add/remove transitions */
const int transition_count_2pc = 26;
const int transition_count_2pc = 25;
Transition transitions_2pc[] = {
@ -76,7 +76,6 @@ Transition transitions_2pc[] = {
{ XACT_ACK_ARRIVAL, XACT_ACTIVE, XACT_ACTION_RUNNING, coordinator_continue_xact_2pc, FALSE },
{ XACT_ACK_RESULT, XACT_ACTIVE, XACT_ACTION_RUNNING, coordinator_continue_xact_2pc, FALSE },
{ XACT_COMMIT, XACT_ACTIVE, COORDINATOR_START_2PC, coordinator_continue_xact_2pc, FALSE },
{ XACT_ABORT, XACT_ACTIVE, COORDINATOR_START_2PC, coordinator_continue_xact_2pc, FALSE },
{ XACT_SUBORDINATE_ACK, XACT_ACTION_RUNNING, XACT_ACTIVE, &tally_2pc, FALSE },
/* Library user must provide callback that init_xact_2pc calls. */
@ -112,7 +111,7 @@ Transition transitions_2pc[] = {
};
const int client_transition_count_2pc = 8;
const int client_transition_count_2pc = 7;
Transition client_transitions_2pc[] = {
@ -127,13 +126,12 @@ Transition client_transitions_2pc[] = {
{ XACT_ACTION_RUNNING, XACT_ACK_ARRIVAL, NULL_STATE, NULL, FALSE},
{ XACT_ACTIVE, XACT_ACK_RESULT, NULL_STATE, NULL, FALSE},
{ COORDINATOR_COMMITTING_2PC, XACT_COMMIT, NULL_STATE, NULL, FALSE},
{ COORDINATOR_ABORTING_2PC, XACT_ABORT, NULL_STATE, NULL, FALSE}
{ COORDINATOR_COMMITTING_2PC, XACT_COMMIT, NULL_STATE, NULL, FALSE}
};
const int state_count_2pc = 16;
const int state_count_2pc = 15;
State states_2pc[MAX_STATE_COUNT] = {
@ -167,7 +165,6 @@ State states_2pc[MAX_STATE_COUNT] = {
{ XACT_ACK_ARRIVAL, NULL, NULL},
{ XACT_COMMIT, NULL, NULL},
{ XACT_ABORT, NULL, NULL}
};
@ -336,11 +333,12 @@ state_name commit_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, c
int ret = app_state->commit_2pc(dfaSet, stateMachine, m, from);
if(ret) { printf("COMMIT %ld\n", m->to_machine_id); }
if(ret && m->response_type == AWAIT_RESULT) {
printf("responding w/ commit to %s\n", m->initiator); fflush(stdout);
respond_once(&((DfaSet*)dfaSet)->networkSetup, SUBORDINATE_ACKING_2PC, m, m->initiator);
}
return ret;
}
/** @todo 2pc can't handle replica groups of size one. */
state_name check_veto_2pc(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
/* Clear subordinate_votes array, so that it can be used to
tally acks after the votes are tallied. */
@ -355,6 +353,12 @@ state_name check_veto_2pc(void * dfaSet, StateMachine * stateMachine, Message *
memset(machine_state->subordinate_votes, 0, MAX_SUBORDINATES);
// sprintf(from, "bc:%d", bc_group);
printf("Sending commit message to %s\n", m->initiator);
respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator);
if(tally_2pc(dfaSet, stateMachine, m, from)) {
printf("YOU FOUND A BUG: 2pc doesn't support size one replication groups!\n");
}
return 1;
}
@ -384,7 +388,7 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
dfaSet->networkSetup.broadcast_list_host_count[bc_group-1],
(char*)(machine_state->subordinate_votes), from);
}
// fprintf(stderr, "Tally returned: %d", ret);
// fprintf(stderr, "Tally returned: %d", ret);
if(ret) {
/* Clear subordinate_votes array, so that it can be used to
tally acks after the votes are tallied. */
@ -405,14 +409,15 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
sequence number before flushing... */
// fprintf(stderr, " response type %d current state %d\n", m->response_type, stateMachine->current_state);
if(ret && (
(
(
m->response_type == AWAIT_COMMIT_POINT ||
m->response_type==XACT_COMMIT ||
m->response_type==XACT_ABORT
m->response_type==XACT_COMMIT
) && (
stateMachine->current_state == COORDINATOR_START_2PC
stateMachine->current_state == COORDINATOR_START_2PC ||
stateMachine->current_state == COORDINATOR_ABORTING_2PC
)
) || (
m->response_type == XACT_ACK_RESULT &&
@ -426,9 +431,10 @@ state_name tally_2pc(void * dfaSetPtr, StateMachine * stateMachine, Message * m,
m->from_machine_id = m->initiator_machine_id;
// printf("Coordinator responding: ? ht=? (key length %d) %d -> /*%d*/ to %s:%ld\n", 0/*getKeyLength(m),*/ , 0, 0, /**(int*)getKeyAddr(m), *//**(int*)getValAddr(m),*/ m->initiator, m->initiator_machine_id );
//debug_print_message(m);
if(m->response_type == AWAIT_COMMIT_POINT || m->response_type == XACT_COMMIT || m->response_type == XACT_ABORT) {
if((m->response_type == AWAIT_COMMIT_POINT) || (m->response_type == XACT_COMMIT)) {
printf("COMMIT POINT %ld\n", m->to_machine_id);
fflush(stdout);
printf("Sending commit point message to %s\n", m->initiator); fflush(stdout);
respond_once(&((DfaSet*)dfaSet)->networkSetup, COORDINATOR_COMMITTING_2PC, m, m->initiator);
} else {
// printf("COMPLETED %ld\n", m->to_machine_id);

View file

@ -101,9 +101,7 @@ gets to reuse the transaction id.
#define XACT_ACTION_RUNNING 224
#define XACT_COMMIT 225
#define XACT_ABORT 226
#define XACT_SUBORDINATE_ACK 227
#define XACT_SUBORDINATE_ACK 226
/**
The callbacks are called whenever the transition 'should' succeed.

View file

@ -91,6 +91,7 @@ short multiplex_interleaved(DfaSet * dfaSet, Message * m) {
} else {
/* Need to add one so that no requests are assigned to the coordinator (bc:0) */
bc_group = hash(getKeyAddr(m), getKeyLength(m), table_length) + 1;
// printf("group %d: %d (%d)\n", bc_group, *(int*)getKeyAddr(m), getKeyLength(m)); fflush(stdout);
}
DEBUG("request %d bc group: %d\n", *requestType(m), bc_group);

View file

@ -97,12 +97,14 @@ static int _chtEval(DfaSet * dfaSet,
if (*value_size != 0) {
memcpy(key, getKeyAddr(&m), getKeyLength(&m));
}
*value_size = getValLength(&m);
*key_size = getKeyLength(&m);
*xid = m.to_machine_id;
DEBUG("+chtEval returning %d\n", m.type);
return m.type;
// return m.type;
return 1;
}
int cHtCreate(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * new_ht) {
@ -116,11 +118,16 @@ int cHtInsert(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, vo
}
int cHtLookup(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) {
return _chtEval(dfaSet, LOOKUP, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
assert(keylen != 0);
int ret = _chtEval(dfaSet, LOOKUP, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
assert(ret);
return (*datlen != 0);
}
int cHtRemove(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t * ht, void * key, size_t keylen, void * dat, size_t * datlen) {
return _chtEval(dfaSet, REMOVE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
int ret = _chtEval(dfaSet, REMOVE, /*AWAIT_COMMIT_POINT,*/ REQUEST_TYPE, &xid, ht, key, &keylen, dat, datlen) != SUBORDINATE_VETO_2PC;
if(!ret) { return -1; }
return *datlen;
}
int cHtDelete(state_machine_id xid, DfaSet * dfaSet, clusterHashTable_t *ht) {
@ -142,8 +149,7 @@ int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet) {
DfaSet * cHtClientInit(char * configFile) {
NetworkSetup * config = readNetworkConfig(configFile, 0);
assert(config->coordinator);
// printf("config->localhost:%s config->broadcast_lists[0][0]:%s (localport %d)(port %d)\n",
// config->localhost, config->broadcast_lists[0][0], config->localport, parse_port(config->broadcast_lists[0][0]));
DfaSet * ret = cHtInit(CHT_CLIENT, NULL, config);
assert(config->coordinator);
free (config);
@ -155,8 +161,9 @@ int cHtCommit(state_machine_id xid, DfaSet * dfaSet) {
return _chtEval(dfaSet, COMMIT, XACT_COMMIT, &xid, NULL, NULL, &zero, NULL, &zero);
}
/*
int cHtAbort(state_machine_id xid, DfaSet * dfaSet) {
size_t zero = 0;
return _chtEval(dfaSet, ABORT, AWAIT_COMMIT_POINT, &xid, NULL, NULL, &zero, NULL, &zero);
}*/
return _chtEval(dfaSet, ABORT, XACT_COMMIT, &xid, NULL, NULL, &zero, NULL, &zero);
abort();
}

View file

@ -10,7 +10,7 @@
#define TSTSET 6
#define GETXID 7
#define COMMIT 8
/* #define ABORT 9 */
#define ABORT 9
typedef struct {
unsigned short key_length;

View file

@ -96,10 +96,10 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message *
assert(valueLength <= getValLength(m));
memcpy(getValAddr(m), new, valueLength);
free(new);
setValLength(m, valueLength);
} else {
ret = 0;
setValLength(m, 0);
}
// ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
DEBUG("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret,
(__header_ptr(m)->hashTable), getKeyLength(m),
*(int*)getKeyAddr(m), *(int*)getValAddr(m));
@ -112,7 +112,12 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message *
printf ("Hash table doesn't exist!\n"); fflush(stdout); ret = 0;
} else {
/** @todo we no longer return old value on remove... */
ret = ThashRemove(xid, ht, getKeyAddr(m), getKeyLength(m));
int remove_ret = ThashRemove(xid, ht, getKeyAddr(m), getKeyLength(m));
if(remove_ret == 0) {
setValLength(m, 0);
} else {
setValLength(m, 1);
}
}
} break;
@ -150,19 +155,15 @@ static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message *
ret = 0;
} NOOP */
} break;
case COMMIT: // placeholder (2pc commits for us)
break;
/*
case COMMIT:
{
ret = (Tcommit(xid) >= 0);
// placeholder (2pc commits for us unless there's an error)
} break;
case ABORT:
{
ret = (Tabort(xid) >= 0);
ret = 0; // Insert an 'error' to cause 2pc to abort the transaction.
} break;
*/
default:
{
printf("Unknown request type: %d\n", *requestType(m));

View file

@ -218,8 +218,8 @@ void* main_loop(DfaSet *dfaSet) {
if(i == dfaSet->transition_count) {
fprintf(stderr, "%ld received: %ld-%d:%d->? (bad message)\n", stateMachine->machine_id, message->from_machine_id,
message->type, current_state);
fprintf(stderr, "%ld received: %ld-%d:%d->? (bad message from %s)\n", stateMachine->machine_id, message->from_machine_id,
message->type, current_state, from);
continue;
}

View file

@ -21,16 +21,9 @@ int main(int argc, char ** argv) {
clusterHashTable_t * new_ht;
cHtCreate(xid, cht_client, new_ht);
int i;
for(i = 0; i < 1000; i++) {
for(i = 0; i < 10000; i++) {
int one = i; int two = i+1;
cHtInsert(xid, cht_client, new_ht, &one, sizeof(int), &two, sizeof(int));
// xid++;
/* int i =0;
for(i =0; i < 100; i++) {
printf("\n");
}
fflush(NULL); */
int newOne, newTwo;
newOne = i;
newTwo = 0;
@ -44,6 +37,46 @@ int main(int argc, char ** argv) {
assert(newLen == sizeof(int));
}
cHtCommit(xid, cht_client);
for(i = 0; i < 10000; i+=10) {
int one = i; int two = -1;
unsigned int size = sizeof(int);
int removed = cHtRemove(xid, cht_client, new_ht, &one, sizeof(int), &two, &size);
assert(removed);
size = sizeof(int);
two = -1;
removed = cHtRemove(xid, cht_client, new_ht, &one, sizeof(int), &two, &size);
assert(!removed);
int newOne, newTwo;
newOne = i;
newTwo = 0;
unsigned int newLen = sizeof(int);
int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen);
assert(!ret);
}
cHtAbort(xid, cht_client);
exit(0); // @todo this test case should continue on...
for(i = 0; i < 10000; i++) {
int one = i; int two = i+1;
// cHtInsert(xid, cht_client, new_ht, &one, sizeof(int), &two, sizeof(int));
int newOne, newTwo;
newOne = i;
newTwo = 0;
unsigned int newLen = sizeof(int);
int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen);
// xid++;
//printf("lookup returned %d (%d->%d)\n", ret, newOne, newTwo);
assert(ret);
assert(newOne == one);
assert(newTwo == two);
assert(newLen == sizeof(int));
}
cHtCommit(xid, cht_client);
/** @todo devise a way to cleanly shut a CHT down. */
// dfa_free(cht_client);

View file

@ -39,6 +39,7 @@ authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#define _GNU_SOURCE
#include <string.h>
#include "../../src/apps/cht/cht.h"
#include <assert.h>