add optional support for gcc atomics to transaction table

This commit is contained in:
Sears Russell 2009-10-15 22:46:25 +00:00
parent e6a718a7f9
commit 1cf16f62f2
6 changed files with 105 additions and 61 deletions

View file

@ -79,6 +79,13 @@ int main(int argc, char * argv[]) {
}
" HAVE_GETLINE)
CHECK_C_SOURCE_COMPILES("
int main(int argc, char* argv[]) {
int i;
__sync_bool_compare_and_swap(&i, 0, 1);
}
" HAVE_GCC_ATOMICS)
MACRO(CREATE_CHECK NAME)
ADD_EXECUTABLE(${NAME} ${NAME}.c)
TARGET_LINK_LIBRARIES(${NAME} ${COMMON_LIBRARIES})

View file

@ -2,4 +2,5 @@
#cmakedefine HAVE_FDATASYNC
#cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_O_DIRECT
#cmakedefine HAVE_GCC_ATOMICS

View file

@ -58,10 +58,10 @@ static lsn_t stasis_log_write_common(stasis_log_t* log, stasis_transaction_table
log->write_entry(log, e);
pthread_mutex_lock(&l->mut);
// pthread_mutex_lock(&l->mut);
if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
pthread_mutex_unlock(&l->mut);
// pthread_mutex_unlock(&l->mut);
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
@ -81,10 +81,10 @@ static lsn_t stasis_log_write_prepare(stasis_log_t* log, stasis_transaction_tabl
e->xid, e->prevLSN, l->recLSN, getPrepareRecLSN(e));
log->write_entry(log, e);
pthread_mutex_lock(&l->mut);
// pthread_mutex_lock(&l->mut);
if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
pthread_mutex_unlock(&l->mut);
// pthread_mutex_unlock(&l->mut);
DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n",
e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
@ -106,10 +106,10 @@ LogEntry * stasis_log_write_update(stasis_log_t* log, stasis_transaction_table_e
log->write_entry(log, e);
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size);
pthread_mutex_lock(&l->mut);
// pthread_mutex_lock(&l->mut);
if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
pthread_mutex_unlock(&l->mut);
// pthread_mutex_unlock(&l->mut);
return e;
}
@ -120,10 +120,10 @@ LogEntry * stasis_log_begin_nta(stasis_log_t* log, stasis_transaction_table_entr
}
lsn_t stasis_log_end_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, LogEntry * e) {
log->write_entry(log, e);
pthread_mutex_lock(&l->mut);
// pthread_mutex_lock(&l->mut);
if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; }
lsn_t ret = l->prevLSN = e->LSN;
pthread_mutex_unlock(&l->mut);
// pthread_mutex_unlock(&l->mut);
freeLogEntry(e);
return ret;
}

View file

@ -12,11 +12,13 @@
struct stasis_transaction_table_t {
int active_count;
#ifndef HAVE_GCC_ATMOICS
/**
This mutex protects the rest of the struct
xidCount.
*/
pthread_mutex_t mut;
#endif
/**
This key points to thread local state, including fast-path access to RESERVED_XIDs.
*/
@ -24,7 +26,56 @@ struct stasis_transaction_table_t {
stasis_transaction_table_entry_t table[MAX_TRANSACTIONS];
};
static inline int test_and_set_entry(stasis_transaction_table_entry_t* e, int old, int new) {
#ifdef HAVE_GCC_ATOMICS
return __sync_bool_compare_and_swap(&(e->xid), old, new);
#else
pthread_mutex_lock(&(e->mut));
if(e->xid == old) {
e->xid = new;
pthread_mutex_unlock(&(e->mut));
return 1;
} else {
pthread_mutex_unlock(&(e->mut));
return 0;
}
#endif
}
// May not be called in race; protects readers from incomplete reads.
static inline void set_entry(stasis_transaction_table_entry_t* e, int new) {
#ifdef HAVE_GCC_ATOMICS
int i = __sync_fetch_and_add(&(e->xid),0);
int succ = test_and_set_entry(e, i, new);
assert(succ);
#else
pthread_mutex_lock(&(e->mut));
e->xid = new;
pthread_mutex_unlock(&(e->mut));
#endif
}
static inline int incr_active_count(stasis_transaction_table_t* t, int d) {
#ifdef HAVE_GCC_ATOMICS
return __sync_fetch_and_add(&(t->active_count), d);
#else
pthread_mutex_lock(&(t->mut));
int ret = t->active_count;
(t->active_count) += d;
pthread_mutex_unlock(&(t->mut));
return ret;
#endif
}
static inline int get_entry_xid(stasis_transaction_table_entry_t* e) {
#ifdef HAVE_GCC_ATOMICS
return __sync_fetch_and_add(&(e->xid), 0);
#else
pthread_mutex_lock(&e->mut);
int ret = e->xid;
pthread_mutex_unlock(&e->mut);
return ret;
#endif
}
struct stasis_transaction_table_thread_local_state_t {
intptr_t last_entry;
intptr_t num_entries;
@ -43,10 +94,7 @@ typedef enum {
} stasis_transaction_table_status;
int stasis_transaction_table_num_active_threads(stasis_transaction_table_t *tbl) {
pthread_mutex_lock(&tbl->mut);
int ret = tbl->active_count;
pthread_mutex_unlock(&tbl->mut);
return ret;
return incr_active_count(tbl, 0);
}
static void stasis_transaction_table_thread_destructor(void * p) {
@ -56,15 +104,10 @@ static void stasis_transaction_table_thread_destructor(void * p) {
stasis_transaction_table_entry_t * e;
for(int i = 0; i < tls->num_entries; i++) {
e = tls->entries[i];
pthread_mutex_lock(&e->mut);
if(e->xid == RESERVED_XTABLE_XID) {
e->xid = INVALID_XTABLE_XID; /// XXX can leak; is possible that our xids are still running.
}
pthread_mutex_unlock(&e->mut);
// XXX can leak; is possible that our xids are still running.
test_and_set_entry(e, RESERVED_XTABLE_XID, INVALID_XTABLE_XID);
}
pthread_mutex_lock(&tls->tbl->mut);
tls->tbl->active_count--;
pthread_mutex_unlock(&tls->tbl->mut);
incr_active_count(tls->tbl, -1);
free(tls->entries);
free(tls->indexes);
@ -83,26 +126,30 @@ int* stasis_transaction_table_list_active(stasis_transaction_table_t *tbl, int *
ret[0] = INVALID_XID;
*count = 0;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_lock(&tbl->table[i].mut);
if(tbl->table[i].xid >= 0) {
ret[*count] = tbl->table[i].xid;
int e_xid = get_entry_xid(&tbl->table[i]);
if(e_xid >= 0) {
ret[*count] = e_xid;
(*count)++;
ret = realloc(ret, ((*count)+1) * sizeof(*ret));
ret[*count] = INVALID_XID;
}
pthread_mutex_unlock(&tbl->table[i].mut);
}
return ret;
}
stasis_transaction_table_t * stasis_transaction_table_init() {
stasis_transaction_table_t * tbl = malloc(sizeof(*tbl));
pthread_mutex_init(&tbl->mut, NULL);
tbl->active_count = 0;
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_init(&tbl->mut, NULL);
#endif
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
tbl->table[i].xid = INVALID_XTABLE_XID;
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_init(&(tbl->table[i].mut),0);
#endif
}
DEBUG("initted xact table!\n");
@ -113,11 +160,14 @@ stasis_transaction_table_t * stasis_transaction_table_init() {
}
void stasis_transaction_table_deinit(stasis_transaction_table_t *tbl) {
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_destroy(&tbl->mut);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_destroy(&tbl->table[i].mut);
}
#endif
pthread_key_delete(tbl->key);
free(tbl);
}
@ -125,13 +175,15 @@ lsn_t stasis_transaction_table_minRecLSN(stasis_transaction_table_t *tbl) {
lsn_t minRecLSN = LSN_T_MAX;
DEBUG("Looping for minRecLSN");
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(tbl->table[i].xid >= 0) {
pthread_mutex_lock(&tbl->table[i].mut);
int e_xid = get_entry_xid(&(tbl->table[i]));
if(e_xid >= 0) {
// pthread_mutex_lock(&tbl->table[i].mut);
//XXX assumes reading LSNs is an atomic memory operation.
lsn_t recLSN = tbl->table[i].recLSN;
if(recLSN != -1 && recLSN < minRecLSN) {
minRecLSN = recLSN;
}
pthread_mutex_unlock(&tbl->table[i].mut);
// pthread_mutex_unlock(&tbl->table[i].mut);
}
}
return minRecLSN;
@ -140,17 +192,17 @@ lsn_t stasis_transaction_table_minRecLSN(stasis_transaction_table_t *tbl) {
int stasis_transaction_table_roll_forward(stasis_transaction_table_t *tbl, int xid, lsn_t lsn, lsn_t prevLSN) {
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
stasis_transaction_table_entry_t * l = &tbl->table[xid];
pthread_mutex_lock(&l->mut);
if(l->xid == xid) {
if(test_and_set_entry(l, xid, xid)) {
// if(l->xid == xid) {
// rolling forward CLRs / NTAs makes prevLSN decrease.
assert(l->prevLSN >= prevLSN);
} else {
assert(l->xid == INVALID_XTABLE_XID || l->xid == RESERVED_XTABLE_XID);
l->xid = xid;
int b2 = test_and_set_entry(l, RESERVED_XTABLE_XID, xid);
int b1 = test_and_set_entry(l, INVALID_XTABLE_XID, xid);
assert(b1 || b2);
l->recLSN = lsn;
}
l->prevLSN = lsn;
pthread_mutex_unlock(&l->mut);
return 0;
}
int stasis_transaction_table_roll_forward_with_reclsn(stasis_transaction_table_t *tbl, int xid, lsn_t lsn,
@ -179,9 +231,7 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac
tls->tbl = tbl;
pthread_mutex_lock(&tbl->mut);
(tbl->active_count)++;
pthread_mutex_unlock(&tbl->mut);
incr_active_count(tbl, 1);
}
// Fast path
@ -193,15 +243,10 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac
for(intptr_t i = 0; i < tls->num_entries; i++) {
intptr_t idx = (1 + i + tls->last_entry) % tls->num_entries;
ret = tls->entries[idx];
pthread_mutex_lock(&ret->mut);
if(ret->xid == RESERVED_XTABLE_XID) {
if(test_and_set_entry(ret, RESERVED_XTABLE_XID, PENDING_XTABLE_XID)) {
index = tls->indexes[idx];
tls->entries[idx]->xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&ret->mut);
break;
}
assert(ret->xid != INVALID_XTABLE_XID && ret->xid != PENDING_XTABLE_XID);
pthread_mutex_unlock(&ret->mut);
}
// Slow path
@ -209,19 +254,14 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac
if(index == INVALID_XID) {
for(int i = 0; i < MAX_TRANSACTIONS; i++ ) {
pthread_mutex_lock(&tbl->table[i].mut);
if( tbl->table[i].xid == INVALID_XTABLE_XID ) {
if(test_and_set_entry(&tbl->table[i], INVALID_XTABLE_XID, PENDING_XTABLE_XID)) {
index = i;
tbl->table[index].xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&tbl->table[i].mut);
tls->num_entries++;
tls->entries = realloc(tls->entries, sizeof(sizeof(ret)) * tls->num_entries);
tls->entries[tls->num_entries-1] = &tbl->table[index];
tls->indexes = realloc(tls->indexes, sizeof(int) * tls->num_entries);
tls->indexes[tls->num_entries-1] = index;
break;
} else {
pthread_mutex_unlock(&tbl->table[i].mut);
}
}
}
@ -246,27 +286,21 @@ stasis_transaction_table_entry_t * stasis_transaction_table_get(stasis_transacti
}
}
int stasis_transaction_table_commit(stasis_transaction_table_t *tbl, int xid) {
pthread_mutex_lock(&tbl->table[xid%MAX_TRANSACTIONS].mut);
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
tbl->table[xid%MAX_TRANSACTIONS].xid = RESERVED_XTABLE_XID;
DEBUG("%s:%d deactivate %d\n",__FILE__,__LINE__,xid);
set_entry(&(tbl->table[xid]), RESERVED_XTABLE_XID);
pthread_mutex_unlock(&tbl->table[xid%MAX_TRANSACTIONS].mut);
return 0;
}
int stasis_transaction_table_forget(stasis_transaction_table_t *tbl, int xid) {
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
stasis_transaction_table_entry_t * l = &tbl->table[xid];
pthread_mutex_lock(&tbl->table[xid].mut);
if(l->xid != xid) {
assert(l->xid < 0); // otherwise, more than one xact had this slot at once...
if(test_and_set_entry(&tbl->table[xid], xid, RESERVED_XTABLE_XID)) {
// success
} else {
l->xid = RESERVED_XTABLE_XID;
l->prevLSN = -1;
l->recLSN = -1;
// during recovery, we might forget something we've never heard of.
assert(l->xid < 0); // otherwise, more than one xact had this slot at once...
}
pthread_mutex_unlock(&tbl->table[xid].mut);
return 0;
}

View file

@ -21,7 +21,9 @@ struct stasis_transaction_table_entry_t {
int xid;
lsn_t prevLSN;
lsn_t recLSN;
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_t mut;
#endif
};
/**
Initialize Stasis' transaction table. Called by Tinit() and unit

View file

@ -445,7 +445,7 @@ void reopenLogWorkload(int truncating) {
int xid = 1;
stasis_transaction_table_entry_t l;
pthread_mutex_init(&l.mut,0);
// pthread_mutex_init(&l.mut,0);
stasis_log_begin_transaction(stasis_log_file, xid, &l);
lsn_t startLSN = 0;