fix transaction table leaks during recovery; add debugging field to xact table entries

This commit is contained in:
Sears Russell 2010-11-05 18:57:18 +00:00
parent 36546b3a2d
commit 6b302226ab
3 changed files with 43 additions and 4 deletions

View file

@ -353,6 +353,7 @@ void stasis_recovery_initiate(stasis_log_t* log, stasis_transaction_table_t * tb
stasis_recovery_undo(log, tbl, 1);
stasis_alloc_post_init(alloc);
DEBUG("Recovery complete.\n");
stasis_transaction_post_recovery(tbl);
for(void * it = pblHtFirst(transactionLSN); it; it = pblHtNext(transactionLSN)) {
free(pblHtCurrent(transactionLSN));

View file

@ -4,11 +4,13 @@
* Created on: Oct 14, 2009
* Author: sears
*/
#include <config.h>
#include <stasis/common.h>
#include <stasis/constants.h>
#include <stasis/transactionTable.h>
#include <stasis/transactional.h>
#include <assert.h>
#include <sys/syscall.h> // SYS_gettid
struct stasis_transaction_table_t {
int active_count;
@ -84,6 +86,7 @@ struct stasis_transaction_table_thread_local_state_t {
stasis_transaction_table_t * tbl;
stasis_transaction_table_entry_t ** entries;
int * indexes;
pid_t tid;
};
typedef enum {
@ -196,9 +199,11 @@ stasis_transaction_table_t * stasis_transaction_table_init() {
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
tbl->table[i].xid = INVALID_XTABLE_XID;
tbl->table[i].xidWhenFree = INVALID_XTABLE_XID;
tbl->table[i].commitArgs[0] = 0;
tbl->table[i].commitArgs[1] = 0;
tbl->table[i].commitArgs[2] = 0;
tbl->table[i].tid = -1;
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_init(&(tbl->table[i].mut),0);
#endif
@ -216,6 +221,23 @@ stasis_transaction_table_t * stasis_transaction_table_init() {
return tbl;
}
void stasis_transaction_post_recovery(stasis_transaction_table_t *tbl) {
/*
* Verify that all the entries are either:
*
* - free in the global pool (common case)
* - active (recovered prepared transactions)
*
* In particular there should be no entries being initialized or
* reserved by a thread
*/
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
assert(tbl->table[i].xid == INVALID_XTABLE_XID ||
tbl->table[i].xid >= 0);
assert(tbl->table[i].tid == -1);
}
}
void stasis_transaction_table_deinit(stasis_transaction_table_t *tbl) {
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_destroy(&tbl->mut);
@ -292,6 +314,7 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac
tls->num_entries = 0;
tls->entries = NULL;
tls->indexes = NULL;
tls->tid = syscall(SYS_gettid);
pthread_setspecific(tbl->key, tls);
tls->tbl = tbl;
@ -299,7 +322,7 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac
incr_active_count(tbl, 1);
}
// Fast path
// Fast path - allocate from the local pool
ret = 0;
@ -314,7 +337,7 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac
}
}
// Slow path
// Slow path - allocate from the global pool
if(index == INVALID_XID) {
@ -326,6 +349,8 @@ stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transac
tls->entries[tls->num_entries-1] = &tbl->table[index];
tls->indexes = realloc(tls->indexes, sizeof(int) * tls->num_entries);
tls->indexes[tls->num_entries-1] = index;
tbl->table[index].xidWhenFree = RESERVED_XTABLE_XID;
tbl->table[index].tid = tls->tid;
break;
}
}
@ -353,7 +378,7 @@ stasis_transaction_table_entry_t * stasis_transaction_table_get(stasis_transacti
int stasis_transaction_table_commit(stasis_transaction_table_t *tbl, int xid) {
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
set_entry(&(tbl->table[xid]), RESERVED_XTABLE_XID);
set_entry(&(tbl->table[xid]), tbl->table[xid].xidWhenFree);
return 0;
}
@ -361,7 +386,7 @@ int stasis_transaction_table_forget(stasis_transaction_table_t *tbl, int xid) {
assert(xid >= 0 && xid < MAX_TRANSACTIONS);
stasis_transaction_table_entry_t * l = &tbl->table[xid];
if(test_and_set_entry(&tbl->table[xid], xid, RESERVED_XTABLE_XID)) {
if(test_and_set_entry(&tbl->table[xid], xid, tbl->table[xid].xidWhenFree)) {
// success
} else {
// during recovery, we might forget something we've never heard of.

View file

@ -22,9 +22,20 @@ typedef struct stasis_transaction_table_t stasis_transaction_table_t;
*/
struct stasis_transaction_table_entry_t {
int xid;
/**
Identifies where the entry was allocated from (and must be returned to):
- INVALID_XTABLE_XID: global pool (used only during recovery)
- RESERVED_XTABLE_XID: local thread pool
*/
int xidWhenFree;
lsn_t prevLSN;
lsn_t recLSN;
void ** commitArgs[3];
/**
Thread id (as returned by gettid()) of the thread which owns this entry
or -1 if not owned by any thread
*/
pid_t tid;
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_t mut;
#endif
@ -37,6 +48,8 @@ stasis_transaction_table_t* stasis_transaction_table_init();
/** Free resources associated with the transaction table */
void stasis_transaction_table_deinit(stasis_transaction_table_t*);
void stasis_transaction_post_recovery(stasis_transaction_table_t *tbl);
int stasis_transaction_table_roll_forward(stasis_transaction_table_t*,int xid, lsn_t lsn, lsn_t prevLSN);
/**
@todo update Tprepare() to not write reclsn to log, then remove