improved concurrency for transaction table

This commit is contained in:
Sears Russell 2009-10-15 20:11:43 +00:00
parent 5df5512d91
commit e6a718a7f9
6 changed files with 123 additions and 70 deletions

View file

@ -71,10 +71,7 @@ void stasis_log_group_force(stasis_log_group_force_t* lh, lsn_t lsn) {
}
lh->pendingCommits++;
if(!stasis_log_group_force_should_wait(lh->minNumActive, lh->pendingCommits)) {
lh->minNumActive = TactiveTransactionCount();
}
int xactcount = lh->minNumActive;;
int xactcount = TactiveThreadCount();
if(stasis_log_group_force_should_wait(xactcount, lh->pendingCommits)) {
int retcode;
while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&lh->tooFewXacts, &lh->check_commit, &timeout))) {

View file

@ -11,23 +11,26 @@
#include <assert.h>
struct stasis_transaction_table_t {
// int active_count;
// int xid_count;
int active_count;
/**
This mutex protects the rest of the struct
xidCount.
*/
// pthread_mutex_t mut;
pthread_mutex_t mut;
/**
This key points to thread local state, including fast-path access to RESERVED_XIDs.
*/
// pthread_key_t key;
pthread_key_t key;
stasis_transaction_table_entry_t table[MAX_TRANSACTIONS];
};
struct stasis_transaction_table_thread_local_state_t {
intptr_t last_entry;
intptr_t num_entries;
stasis_transaction_table_t * tbl;
stasis_transaction_table_entry_t ** entries;
int * indexes;
};
typedef enum {
@ -39,49 +42,53 @@ typedef enum {
RESERVED_XTABLE_XID = -3
} stasis_transaction_table_status;
int stasis_transaction_table_num_active(stasis_transaction_table_t *tbl) {
int ret = 0;
printf("Looping for num_active");
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_lock(&(tbl->table[i].mut));
if(tbl->table[i].xid >= 0) { ret++; }
pthread_mutex_unlock(&(tbl->table[i].mut));
}
int stasis_transaction_table_num_active_threads(stasis_transaction_table_t *tbl) {
pthread_mutex_lock(&tbl->mut);
int ret = tbl->active_count;
pthread_mutex_unlock(&tbl->mut);
return ret;
}
/*
static void stasis_transaction_table_thread_destructor(void * p) {
assert(p);
struct stasis_transaction_table_thread_local_state_t * tls = p;
stasis_transaction_table_entry_t * e;
for(int i = 0; NULL != (e = tls->entries[i]); i++) {
for(int i = 0; i < tls->num_entries; i++) {
e = tls->entries[i];
pthread_mutex_lock(&e->mut);
if(e->xid == RESERVED_XTABLE_XID) {
e->xid = INVALID_XTABLE_XID;
e->xid = INVALID_XTABLE_XID; /// XXX can leak; is possible that our xids are still running.
}
pthread_mutex_unlock(&e->mut);
}
pthread_mutex_lock(&tls->tbl->mut);
tls->tbl->active_count--;
pthread_mutex_unlock(&tls->tbl->mut);
free(tls->entries);
free(tls->indexes);
free(tls);
}
*/
int stasis_transaction_table_is_active(stasis_transaction_table_t *tbl, int xid) {
return xid >= 0 && tbl->table[xid % MAX_TRANSACTIONS].xid == xid;
assert(xid < MAX_TRANSACTIONS);
return xid >= 0 && tbl->table[xid].xid == xid;
}
int* stasis_transaction_table_list_active(stasis_transaction_table_t *tbl) {
int* stasis_transaction_table_list_active(stasis_transaction_table_t *tbl, int *count) {
int * ret = malloc(sizeof(*ret));
ret[0] = 0;
int retcount = 0;
ret[0] = INVALID_XID;
*count = 0;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_lock(&tbl->table[i].mut);
if(tbl->table[i].xid >= 0) {
ret[retcount] = tbl->table[i].xid;
retcount++;
ret = realloc(ret, (retcount+1) * sizeof(*ret));
ret[retcount] = INVALID_XID;
ret[*count] = tbl->table[i].xid;
(*count)++;
ret = realloc(ret, ((*count)+1) * sizeof(*ret));
ret[*count] = INVALID_XID;
}
pthread_mutex_unlock(&tbl->table[i].mut);
}
@ -90,34 +97,33 @@ int* stasis_transaction_table_list_active(stasis_transaction_table_t *tbl) {
stasis_transaction_table_t * stasis_transaction_table_init() {
stasis_transaction_table_t * tbl = malloc(sizeof(*tbl));
// pthread_mutex_init(&tbl->mut, NULL);
pthread_mutex_init(&tbl->mut, NULL);
tbl->active_count = 0;
// memset(tbl->table, INVALID_XTABLE_XID,
// sizeof(stasis_transaction_table_entry_t)*MAX_TRANSACTIONS);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
tbl->table[i].xid = INVALID_XTABLE_XID;
pthread_mutex_init(&(tbl->table[i].mut),0);
}
printf("initted xact table!\n");
DEBUG("initted xact table!\n");
// pthread_key_create(&tbl->key, stasis_transaction_table_thread_destructor);
pthread_key_create(&tbl->key, stasis_transaction_table_thread_destructor);
return tbl;
}
void stasis_transaction_table_deinit(stasis_transaction_table_t *tbl) {
// pthread_mutex_destroy(&tbl->mut);
pthread_mutex_destroy(&tbl->mut);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_destroy(&tbl->table[i].mut);
}
// pthread_key_delete(tbl->key);
pthread_key_delete(tbl->key);
free(tbl);
}
lsn_t stasis_transaction_table_minRecLSN(stasis_transaction_table_t *tbl) {
lsn_t minRecLSN = LSN_T_MAX;
printf("Looping for minRecLSN");
DEBUG("Looping for minRecLSN");
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(tbl->table[i].xid >= 0) {
pthread_mutex_lock(&tbl->table[i].mut);
@ -132,7 +138,8 @@ lsn_t stasis_transaction_table_minRecLSN(stasis_transaction_table_t *tbl) {
}
int stasis_transaction_table_roll_forward(stasis_transaction_table_t *tbl, int xid, lsn_t lsn, lsn_t prevLSN) {
stasis_transaction_table_entry_t * l = &tbl->table[xid%MAX_TRANSACTIONS];
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
stasis_transaction_table_entry_t * l = &tbl->table[xid];
pthread_mutex_lock(&l->mut);
if(l->xid == xid) {
// rolling forward CLRs / NTAs makes prevLSN decrease.
@ -149,43 +156,91 @@ int stasis_transaction_table_roll_forward(stasis_transaction_table_t *tbl, int x
int stasis_transaction_table_roll_forward_with_reclsn(stasis_transaction_table_t *tbl, int xid, lsn_t lsn,
lsn_t prevLSN,
lsn_t recLSN) {
assert(tbl->table[xid%MAX_TRANSACTIONS].recLSN == recLSN);
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
assert(tbl->table[xid].recLSN == recLSN);
return stasis_transaction_table_roll_forward(tbl, xid, lsn, prevLSN);
}
stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transaction_table_t *tbl, int * xid) {
stasis_transaction_table_entry_t * ret;
// Initialize tls
struct stasis_transaction_table_thread_local_state_t * tls = pthread_getspecific(tbl->key);
if(tls == NULL) {
tls = malloc(sizeof(*tls));
tls->last_entry = 0;
tls->num_entries = 0;
tls->entries = NULL;
tls->indexes = NULL;
pthread_setspecific(tbl->key, tls);
tls->tbl = tbl;
pthread_mutex_lock(&tbl->mut);
(tbl->active_count)++;
pthread_mutex_unlock(&tbl->mut);
}
// Fast path
ret = 0;
int index = INVALID_XID;
for(int i = 0; i < MAX_TRANSACTIONS; i++ ) {
pthread_mutex_lock(&tbl->table[i].mut);
if( tbl->table[i].xid == INVALID_XTABLE_XID ) {
index = i;
tbl->table[index].xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&tbl->table[i].mut);
for(intptr_t i = 0; i < tls->num_entries; i++) {
intptr_t idx = (1 + i + tls->last_entry) % tls->num_entries;
ret = tls->entries[idx];
pthread_mutex_lock(&ret->mut);
if(ret->xid == RESERVED_XTABLE_XID) {
index = tls->indexes[idx];
tls->entries[idx]->xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&ret->mut);
break;
} else {
pthread_mutex_unlock(&tbl->table[i].mut);
}
assert(ret->xid != INVALID_XTABLE_XID && ret->xid != PENDING_XTABLE_XID);
pthread_mutex_unlock(&ret->mut);
}
// Slow path
if(index == INVALID_XID) {
for(int i = 0; i < MAX_TRANSACTIONS; i++ ) {
pthread_mutex_lock(&tbl->table[i].mut);
if( tbl->table[i].xid == INVALID_XTABLE_XID ) {
index = i;
tbl->table[index].xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&tbl->table[i].mut);
tls->num_entries++;
tls->entries = realloc(tls->entries, sizeof(sizeof(ret)) * tls->num_entries);
tls->entries[tls->num_entries-1] = &tbl->table[index];
tls->indexes = realloc(tls->indexes, sizeof(int) * tls->num_entries);
tls->indexes[tls->num_entries-1] = index;
break;
} else {
pthread_mutex_unlock(&tbl->table[i].mut);
}
}
}
if(index == INVALID_XID) {
*xid = LLADD_EXCEED_MAX_TRANSACTIONS;
ret = NULL;
} else {
printf("begin xid %d\n", index);
DEBUG("begin xid %d\n", index);
*xid = index;
tls->last_entry = index;
ret = &tbl->table[index];
}
return ret;
}
stasis_transaction_table_entry_t * stasis_transaction_table_get(stasis_transaction_table_t *tbl, int xid) {
if(tbl->table[xid % MAX_TRANSACTIONS].xid == xid) {
return &tbl->table[xid % MAX_TRANSACTIONS];
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
if(tbl->table[xid].xid == xid) {
return &tbl->table[xid];
} else {
return NULL;
}
@ -193,25 +248,25 @@ stasis_transaction_table_entry_t * stasis_transaction_table_get(stasis_transacti
int stasis_transaction_table_commit(stasis_transaction_table_t *tbl, int xid) {
pthread_mutex_lock(&tbl->table[xid%MAX_TRANSACTIONS].mut);
tbl->table[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
tbl->table[xid%MAX_TRANSACTIONS].xid = RESERVED_XTABLE_XID;
DEBUG("%s:%d deactivate %d\n",__FILE__,__LINE__,xid);
pthread_mutex_unlock(&tbl->table[xid%MAX_TRANSACTIONS].mut);
return 0;
}
int stasis_transaction_table_forget(stasis_transaction_table_t *tbl, int xid) {
assert(xid != INVALID_XTABLE_XID);
stasis_transaction_table_entry_t * l = &tbl->table[xid%MAX_TRANSACTIONS];
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
stasis_transaction_table_entry_t * l = &tbl->table[xid];
pthread_mutex_lock(&tbl->table[xid%MAX_TRANSACTIONS].mut);
pthread_mutex_lock(&tbl->table[xid].mut);
if(l->xid != xid) {
assert(l->xid < 0); // otherwise, more than one xact had this slot at once...
} else {
l->xid = INVALID_XTABLE_XID;
l->xid = RESERVED_XTABLE_XID;
l->prevLSN = -1;
l->recLSN = -1;
}
pthread_mutex_unlock(&tbl->table[xid%MAX_TRANSACTIONS].mut);
pthread_mutex_unlock(&tbl->table[xid].mut);
return 0;
}

View file

@ -326,9 +326,8 @@ int Tforget(int xid) {
return 0;
}
int Tdeinit() {
int * active = stasis_transaction_table_list_active(stasis_transaction_table);
int count = stasis_transaction_table_num_active(stasis_transaction_table);
int count;
int * active = stasis_transaction_table_list_active(stasis_transaction_table, &count);
for(int i = 0; i < count; i++) {
if(!stasis_suppress_unclean_shutdown_warnings) {
@ -336,8 +335,10 @@ int Tdeinit() {
}
Tabort(active[i]);
}
assert( stasis_transaction_table_num_active(stasis_transaction_table) == 0 );
free(active);
active = stasis_transaction_table_list_active(stasis_transaction_table, &count);
assert( count == 0 );
free(active);
stasis_truncation_deinit(stasis_truncation);
@ -434,11 +435,11 @@ lsn_t TendNestedTopAction(int xid, void * handle) {
return ret;
}
int TactiveTransactionCount(void) {
return stasis_transaction_table_num_active(stasis_transaction_table);
int TactiveThreadCount(void) {
return stasis_transaction_table_num_active_threads(stasis_transaction_table);
}
int* TlistActiveTransactions(void) {
return stasis_transaction_table_list_active(stasis_transaction_table);
int* TlistActiveTransactions(int *count) {
return stasis_transaction_table_list_active(stasis_transaction_table, count);
}
int TisActiveTransaction(int xid) {
return stasis_transaction_table_is_active(stasis_transaction_table, xid);

View file

@ -128,7 +128,7 @@ struct stasis_log_t {
@param log "this" log object
@param lsn The LSN of the log entry to be read. This must be the LSN of a valid log entry.
@return The LogEntry of interest. Should be freed with freeLogEntry().
@return The LogEntry of interest. Should be freed with freeLogEntry(). A NULL return value means the log was truncated past the requested entry.
*/
const LogEntry* (*read_entry)(struct stasis_log_t* log, lsn_t lsn);

View file

@ -49,8 +49,8 @@ stasis_transaction_table_entry_t * stasis_transaction_table_get(stasis_transacti
int stasis_transaction_table_commit(stasis_transaction_table_t*,int xid);
int stasis_transaction_table_forget(stasis_transaction_table_t*,int xid);
int stasis_transaction_table_num_active(stasis_transaction_table_t*);
int* stasis_transaction_table_list_active(stasis_transaction_table_t*);
int stasis_transaction_table_num_active_threads(stasis_transaction_table_t*);
int* stasis_transaction_table_list_active(stasis_transaction_table_t*, int *count);
int stasis_transaction_table_is_active(stasis_transaction_table_t*, int xid);
#endif /* TRANSACTIONTABLE_H_ */

View file

@ -725,7 +725,7 @@ lsn_t TendNestedTopAction(int xid, void * handle);
@return an array of transaction ids.
*/
int* TlistActiveTransactions(void);
int* TlistActiveTransactions(int *count);
/**
* Checks to see if a transaction is still active.
@ -737,7 +737,7 @@ int TisActiveTransaction(int xid);
/*
* @return the number of currently active transactions.
*/
int TactiveTransactionCount(void);
int TactiveThreadCount(void);
/**
* Called at the end of transactions aborted by recovery, after the transaction