From 1cf16f62f2de5fd04d4246b865c90fb7c6fd7e9c Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 15 Oct 2009 22:46:25 +0000 Subject: [PATCH] add optional support for gcc atomics to transaction table --- CMakeLists.txt | 7 ++ config.h.cmake | 1 + src/stasis/logger/logger2.c | 16 ++-- src/stasis/transactionTable.c | 138 +++++++++++++++++++++------------- stasis/transactionTable.h | 2 + test/stasis/check_logWriter.c | 2 +- 6 files changed, 105 insertions(+), 61 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 988a0bb..b870f5a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -79,6 +79,13 @@ int main(int argc, char * argv[]) { } " HAVE_GETLINE) +CHECK_C_SOURCE_COMPILES(" +int main(int argc, char* argv[]) { + int i; + __sync_bool_compare_and_swap(&i, 0, 1); +} +" HAVE_GCC_ATOMICS) + MACRO(CREATE_CHECK NAME) ADD_EXECUTABLE(${NAME} ${NAME}.c) TARGET_LINK_LIBRARIES(${NAME} ${COMMON_LIBRARIES}) diff --git a/config.h.cmake b/config.h.cmake index c5b92d9..f575808 100644 --- a/config.h.cmake +++ b/config.h.cmake @@ -2,4 +2,5 @@ #cmakedefine HAVE_FDATASYNC #cmakedefine HAVE_SYNC_FILE_RANGE #cmakedefine HAVE_O_DIRECT +#cmakedefine HAVE_GCC_ATOMICS diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index c6049ed..2452e1c 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -58,10 +58,10 @@ static lsn_t stasis_log_write_common(stasis_log_t* log, stasis_transaction_table log->write_entry(log, e); - pthread_mutex_lock(&l->mut); +// pthread_mutex_lock(&l->mut); if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; - pthread_mutex_unlock(&l->mut); +// pthread_mutex_unlock(&l->mut); DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); @@ -81,10 +81,10 @@ static lsn_t stasis_log_write_prepare(stasis_log_t* log, stasis_transaction_tabl e->xid, e->prevLSN, l->recLSN, getPrepareRecLSN(e)); log->write_entry(log, e); - pthread_mutex_lock(&l->mut); +// pthread_mutex_lock(&l->mut); if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; - pthread_mutex_unlock(&l->mut); +// pthread_mutex_unlock(&l->mut); DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); @@ -106,10 +106,10 @@ LogEntry * stasis_log_write_update(stasis_log_t* log, stasis_transaction_table_e log->write_entry(log, e); DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size); - pthread_mutex_lock(&l->mut); +// pthread_mutex_lock(&l->mut); if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; - pthread_mutex_unlock(&l->mut); +// pthread_mutex_unlock(&l->mut); return e; } @@ -120,10 +120,10 @@ LogEntry * stasis_log_begin_nta(stasis_log_t* log, stasis_transaction_table_entr } lsn_t stasis_log_end_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, LogEntry * e) { log->write_entry(log, e); - pthread_mutex_lock(&l->mut); +// pthread_mutex_lock(&l->mut); if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; } lsn_t ret = l->prevLSN = e->LSN; - pthread_mutex_unlock(&l->mut); +// pthread_mutex_unlock(&l->mut); freeLogEntry(e); return ret; } diff --git a/src/stasis/transactionTable.c b/src/stasis/transactionTable.c index f85fb7a..e09e188 100644 --- a/src/stasis/transactionTable.c +++ b/src/stasis/transactionTable.c @@ -12,11 +12,13 @@ struct stasis_transaction_table_t { int active_count; +#ifndef HAVE_GCC_ATMOICS /** This mutex protects the rest of the struct xidCount. */ pthread_mutex_t mut; +#endif /** This key points to thread local state, including fast-path access to RESERVED_XIDs. */ @@ -24,7 +26,56 @@ struct stasis_transaction_table_t { stasis_transaction_table_entry_t table[MAX_TRANSACTIONS]; }; +static inline int test_and_set_entry(stasis_transaction_table_entry_t* e, int old, int new) { +#ifdef HAVE_GCC_ATOMICS + return __sync_bool_compare_and_swap(&(e->xid), old, new); +#else + pthread_mutex_lock(&(e->mut)); + if(e->xid == old) { + e->xid = new; + pthread_mutex_unlock(&(e->mut)); + return 1; + } else { + pthread_mutex_unlock(&(e->mut)); + return 0; + } +#endif +} +// May not be called in race; protects readers from incomplete reads. +static inline void set_entry(stasis_transaction_table_entry_t* e, int new) { +#ifdef HAVE_GCC_ATOMICS + int i = __sync_fetch_and_add(&(e->xid),0); + int succ = test_and_set_entry(e, i, new); + assert(succ); +#else + pthread_mutex_lock(&(e->mut)); + e->xid = new; + + pthread_mutex_unlock(&(e->mut)); +#endif +} +static inline int incr_active_count(stasis_transaction_table_t* t, int d) { +#ifdef HAVE_GCC_ATOMICS + return __sync_fetch_and_add(&(t->active_count), d); +#else + pthread_mutex_lock(&(t->mut)); + int ret = t->active_count; + (t->active_count) += d; + pthread_mutex_unlock(&(t->mut)); + return ret; +#endif +} +static inline int get_entry_xid(stasis_transaction_table_entry_t* e) { +#ifdef HAVE_GCC_ATOMICS + return __sync_fetch_and_add(&(e->xid), 0); +#else + pthread_mutex_lock(&e->mut); + int ret = e->xid; + pthread_mutex_unlock(&e->mut); + return ret; +#endif +} struct stasis_transaction_table_thread_local_state_t { intptr_t last_entry; intptr_t num_entries; @@ -43,10 +94,7 @@ typedef enum { } stasis_transaction_table_status; int stasis_transaction_table_num_active_threads(stasis_transaction_table_t *tbl) { - pthread_mutex_lock(&tbl->mut); - int ret = tbl->active_count; - pthread_mutex_unlock(&tbl->mut); - return ret; + return incr_active_count(tbl, 0); } static void stasis_transaction_table_thread_destructor(void * p) { @@ -56,15 +104,10 @@ static void stasis_transaction_table_thread_destructor(void * p) { stasis_transaction_table_entry_t * e; for(int i = 0; i < tls->num_entries; i++) { e = tls->entries[i]; - pthread_mutex_lock(&e->mut); - if(e->xid == RESERVED_XTABLE_XID) { - e->xid = INVALID_XTABLE_XID; /// XXX can leak; is possible that our xids are still running. - } - pthread_mutex_unlock(&e->mut); + // XXX can leak; is possible that our xids are still running. + test_and_set_entry(e, RESERVED_XTABLE_XID, INVALID_XTABLE_XID); } - pthread_mutex_lock(&tls->tbl->mut); - tls->tbl->active_count--; - pthread_mutex_unlock(&tls->tbl->mut); + incr_active_count(tls->tbl, -1); free(tls->entries); free(tls->indexes); @@ -83,26 +126,30 @@ int* stasis_transaction_table_list_active(stasis_transaction_table_t *tbl, int * ret[0] = INVALID_XID; *count = 0; for(int i = 0; i < MAX_TRANSACTIONS; i++) { - pthread_mutex_lock(&tbl->table[i].mut); - if(tbl->table[i].xid >= 0) { - ret[*count] = tbl->table[i].xid; + int e_xid = get_entry_xid(&tbl->table[i]); + if(e_xid >= 0) { + ret[*count] = e_xid; (*count)++; ret = realloc(ret, ((*count)+1) * sizeof(*ret)); ret[*count] = INVALID_XID; } - pthread_mutex_unlock(&tbl->table[i].mut); } return ret; } stasis_transaction_table_t * stasis_transaction_table_init() { stasis_transaction_table_t * tbl = malloc(sizeof(*tbl)); - pthread_mutex_init(&tbl->mut, NULL); tbl->active_count = 0; +#ifndef HAVE_GCC_ATOMICS + pthread_mutex_init(&tbl->mut, NULL); +#endif + for(int i = 0; i < MAX_TRANSACTIONS; i++) { tbl->table[i].xid = INVALID_XTABLE_XID; +#ifndef HAVE_GCC_ATOMICS pthread_mutex_init(&(tbl->table[i].mut),0); +#endif } DEBUG("initted xact table!\n"); @@ -113,11 +160,14 @@ stasis_transaction_table_t * stasis_transaction_table_init() { } void stasis_transaction_table_deinit(stasis_transaction_table_t *tbl) { +#ifndef HAVE_GCC_ATOMICS pthread_mutex_destroy(&tbl->mut); for(int i = 0; i < MAX_TRANSACTIONS; i++) { pthread_mutex_destroy(&tbl->table[i].mut); } +#endif + pthread_key_delete(tbl->key); free(tbl); } @@ -125,13 +175,15 @@ lsn_t stasis_transaction_table_minRecLSN(stasis_transaction_table_t *tbl) { lsn_t minRecLSN = LSN_T_MAX; DEBUG("Looping for minRecLSN"); for(int i = 0; i < MAX_TRANSACTIONS; i++) { - if(tbl->table[i].xid >= 0) { - pthread_mutex_lock(&tbl->table[i].mut); + int e_xid = get_entry_xid(&(tbl->table[i])); + if(e_xid >= 0) { +// pthread_mutex_lock(&tbl->table[i].mut); + //XXX assumes reading LSNs is an atomic memory operation. lsn_t recLSN = tbl->table[i].recLSN; if(recLSN != -1 && recLSN < minRecLSN) { minRecLSN = recLSN; } - pthread_mutex_unlock(&tbl->table[i].mut); +// pthread_mutex_unlock(&tbl->table[i].mut); } } return minRecLSN; @@ -140,17 +192,17 @@ lsn_t stasis_transaction_table_minRecLSN(stasis_transaction_table_t *tbl) { int stasis_transaction_table_roll_forward(stasis_transaction_table_t *tbl, int xid, lsn_t lsn, lsn_t prevLSN) { assert(xid >= 0 && xid < MAX_TRANSACTIONS); stasis_transaction_table_entry_t * l = &tbl->table[xid]; - pthread_mutex_lock(&l->mut); - if(l->xid == xid) { + if(test_and_set_entry(l, xid, xid)) { +// if(l->xid == xid) { // rolling forward CLRs / NTAs makes prevLSN decrease. assert(l->prevLSN >= prevLSN); } else { - assert(l->xid == INVALID_XTABLE_XID || l->xid == RESERVED_XTABLE_XID); - l->xid = xid; + int b2 = test_and_set_entry(l, RESERVED_XTABLE_XID, xid); + int b1 = test_and_set_entry(l, INVALID_XTABLE_XID, xid); + assert(b1 || b2); l->recLSN = lsn; } l->prevLSN = lsn; - pthread_mutex_unlock(&l->mut); return 0; } int stasis_transaction_table_roll_forward_with_reclsn(stasis_transaction_table_t *tbl, int xid, lsn_t lsn, @@ -179,9 +231,7 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac tls->tbl = tbl; - pthread_mutex_lock(&tbl->mut); - (tbl->active_count)++; - pthread_mutex_unlock(&tbl->mut); + incr_active_count(tbl, 1); } // Fast path @@ -193,15 +243,10 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac for(intptr_t i = 0; i < tls->num_entries; i++) { intptr_t idx = (1 + i + tls->last_entry) % tls->num_entries; ret = tls->entries[idx]; - pthread_mutex_lock(&ret->mut); - if(ret->xid == RESERVED_XTABLE_XID) { + if(test_and_set_entry(ret, RESERVED_XTABLE_XID, PENDING_XTABLE_XID)) { index = tls->indexes[idx]; - tls->entries[idx]->xid = PENDING_XTABLE_XID; - pthread_mutex_unlock(&ret->mut); break; } - assert(ret->xid != INVALID_XTABLE_XID && ret->xid != PENDING_XTABLE_XID); - pthread_mutex_unlock(&ret->mut); } // Slow path @@ -209,19 +254,14 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac if(index == INVALID_XID) { for(int i = 0; i < MAX_TRANSACTIONS; i++ ) { - pthread_mutex_lock(&tbl->table[i].mut); - if( tbl->table[i].xid == INVALID_XTABLE_XID ) { + if(test_and_set_entry(&tbl->table[i], INVALID_XTABLE_XID, PENDING_XTABLE_XID)) { index = i; - tbl->table[index].xid = PENDING_XTABLE_XID; - pthread_mutex_unlock(&tbl->table[i].mut); tls->num_entries++; tls->entries = realloc(tls->entries, sizeof(sizeof(ret)) * tls->num_entries); tls->entries[tls->num_entries-1] = &tbl->table[index]; tls->indexes = realloc(tls->indexes, sizeof(int) * tls->num_entries); tls->indexes[tls->num_entries-1] = index; break; - } else { - pthread_mutex_unlock(&tbl->table[i].mut); } } } @@ -246,27 +286,21 @@ stasis_transaction_table_entry_t * stasis_transaction_table_get(stasis_transacti } } int stasis_transaction_table_commit(stasis_transaction_table_t *tbl, int xid) { - pthread_mutex_lock(&tbl->table[xid%MAX_TRANSACTIONS].mut); + assert(xid >= 0 && xid < MAX_TRANSACTIONS); - tbl->table[xid%MAX_TRANSACTIONS].xid = RESERVED_XTABLE_XID; - DEBUG("%s:%d deactivate %d\n",__FILE__,__LINE__,xid); + set_entry(&(tbl->table[xid]), RESERVED_XTABLE_XID); - pthread_mutex_unlock(&tbl->table[xid%MAX_TRANSACTIONS].mut); return 0; } int stasis_transaction_table_forget(stasis_transaction_table_t *tbl, int xid) { assert(xid >= 0 && xid < MAX_TRANSACTIONS); stasis_transaction_table_entry_t * l = &tbl->table[xid]; - pthread_mutex_lock(&tbl->table[xid].mut); - if(l->xid != xid) { - assert(l->xid < 0); // otherwise, more than one xact had this slot at once... + if(test_and_set_entry(&tbl->table[xid], xid, RESERVED_XTABLE_XID)) { + // success } else { - l->xid = RESERVED_XTABLE_XID; - l->prevLSN = -1; - l->recLSN = -1; + // during recovery, we might forget something we've never heard of. + assert(l->xid < 0); // otherwise, more than one xact had this slot at once... } - pthread_mutex_unlock(&tbl->table[xid].mut); - return 0; } diff --git a/stasis/transactionTable.h b/stasis/transactionTable.h index e8754fd..73d0b38 100644 --- a/stasis/transactionTable.h +++ b/stasis/transactionTable.h @@ -21,7 +21,9 @@ struct stasis_transaction_table_entry_t { int xid; lsn_t prevLSN; lsn_t recLSN; +#ifndef HAVE_GCC_ATOMICS pthread_mutex_t mut; +#endif }; /** Initialize Stasis' transaction table. Called by Tinit() and unit diff --git a/test/stasis/check_logWriter.c b/test/stasis/check_logWriter.c index 70fce27..1f3b75e 100644 --- a/test/stasis/check_logWriter.c +++ b/test/stasis/check_logWriter.c @@ -445,7 +445,7 @@ void reopenLogWorkload(int truncating) { int xid = 1; stasis_transaction_table_entry_t l; - pthread_mutex_init(&l.mut,0); +// pthread_mutex_init(&l.mut,0); stasis_log_begin_transaction(stasis_log_file, xid, &l); lsn_t startLSN = 0;