refactoring transaction table; moved implementation into its own file. still need to remove globals

This commit is contained in:
Sears Russell 2009-10-14 18:57:50 +00:00
parent fbdc0f0499
commit dbe3ecf0d0
7 changed files with 292 additions and 220 deletions

View file

@ -6,7 +6,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c
bufferManager/legacy/legacyBufferManager.c
page.c bufferPool.c blobManager.c
recovery2.c truncation.c transactional2.c
dirtyPageTable.c
dirtyPageTable.c transactionTable.c
allocationPolicy.c lockManager.c iterator.c
consumer.c arrayCollection.c ringbuffer.c fifo.c
multiplexer.c graph.c logger/logEntry.c

View file

@ -3,7 +3,7 @@ lib_LTLIBRARIES=libstasis.la
libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common.c flags.c \
stats.c linkedlist.c operations.c pageHandle.c \
page.c bufferPool.c blobManager.c recovery2.c truncation.c \
dirtyPageTable.c \
dirtyPageTable.c transactionTable.c \
transactional2.c allocationPolicy.c \
lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\
logger/logEntry.c \

View file

@ -0,0 +1,189 @@
/*
* transactionTable.c
*
* Created on: Oct 14, 2009
* Author: sears
*/
#include <stasis/common.h>
#include <stasis/constants.h>
#include <stasis/transactionTable.h>
#include <assert.h>
TransactionLog stasis_transaction_table[MAX_TRANSACTIONS];
static int stasis_transaction_table_active_count = 0;
static int stasis_transaction_table_xid_count = 0;
int stasis_transaction_table_num_active() {
return stasis_transaction_table_active_count;
}
/**
This mutex protects stasis_transaction_table, numActiveXactions and
xidCount.
*/
static pthread_mutex_t stasis_transaction_table_mutex;
typedef enum {
INVALID_XTABLE_XID = INVALID_XID,
PENDING_XTABLE_XID = -2
} stasis_transaction_table_status;
void stasis_transaction_table_init() {
pthread_mutex_init(&stasis_transaction_table_mutex, NULL);
stasis_transaction_table_active_count = 0;
memset(stasis_transaction_table, INVALID_XTABLE_XID,
sizeof(TransactionLog)*MAX_TRANSACTIONS);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_init(&stasis_transaction_table[i].mut,0);
}
}
void stasis_transaction_table_deinit() {
pthread_mutex_destroy(&stasis_transaction_table_mutex);
stasis_transaction_table_active_count = 0;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_destroy(&stasis_transaction_table[i].mut);
}
memset(stasis_transaction_table, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS);
}
void stasis_transaction_table_max_transaction_id_set(int xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
stasis_transaction_table_xid_count = xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
}
void stasis_transaction_table_active_transaction_count_set(int xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
stasis_transaction_table_active_count = xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
}
lsn_t stasis_transaction_table_minRecLSN() {
lsn_t minRecLSN = LSN_T_MAX;
pthread_mutex_lock(&stasis_transaction_table_mutex);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) {
lsn_t recLSN = stasis_transaction_table[i].recLSN;
if(recLSN != -1 && recLSN < minRecLSN) {
minRecLSN = recLSN;
}
}
}
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return minRecLSN;
}
int TactiveTransactionCount() {
return stasis_transaction_table_active_count;
}
int* stasis_transaction_table_list_active() {
pthread_mutex_lock(&stasis_transaction_table_mutex);
int * ret = malloc(sizeof(*ret));
ret[0] = 0;
int retcount = 0;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) {
ret[retcount] = stasis_transaction_table[i].xid;
retcount++;
ret = realloc(ret, (retcount+1) * sizeof(*ret));
ret[retcount] = 0;
}
}
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return ret;
}
int* TlistActiveTransactions() {
return stasis_transaction_table_list_active();
}
int TisActiveTransaction(int xid) {
if(xid < 0) { return 0; }
pthread_mutex_lock(&stasis_transaction_table_mutex);
int ret = xid != INVALID_XTABLE_XID && stasis_transaction_table[xid%MAX_TRANSACTIONS].xid == xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return ret;
}
int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN) {
TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS];
if(l->xid == xid) {
// rolling forward CLRs / NTAs makes prevLSN decrease.
assert(l->prevLSN >= prevLSN);
} else {
pthread_mutex_lock(&stasis_transaction_table_mutex);
assert(l->xid == INVALID_XTABLE_XID);
l->xid = xid;
l->recLSN = lsn;
stasis_transaction_table_active_count++;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
}
l->prevLSN = lsn;
return 0;
}
int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn,
lsn_t prevLSN,
lsn_t recLSN) {
assert(stasis_transaction_table[xid%MAX_TRANSACTIONS].recLSN == recLSN);
return stasis_transaction_table_roll_forward(xid, lsn, prevLSN);
}
TransactionLog * stasis_transaction_table_begin(int * xid) {
int index = 0;
int xidCount_tmp;
pthread_mutex_lock(&stasis_transaction_table_mutex);
if( stasis_transaction_table_active_count == MAX_TRANSACTIONS ) {
pthread_mutex_unlock(&stasis_transaction_table_mutex);
*xid = LLADD_EXCEED_MAX_TRANSACTIONS;
return 0;
} else {
DEBUG("%s:%d activate in begin\n",__FILE__,__LINE__);
stasis_transaction_table_active_count++;
}
for(int i = 0; i < MAX_TRANSACTIONS; i++ ) {
stasis_transaction_table_xid_count++;
if( stasis_transaction_table[stasis_transaction_table_xid_count%MAX_TRANSACTIONS].xid == INVALID_XTABLE_XID ) {
index = stasis_transaction_table_xid_count%MAX_TRANSACTIONS;
break;
}
}
xidCount_tmp = stasis_transaction_table_xid_count;
stasis_transaction_table[index].xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
*xid = xidCount_tmp;
return &stasis_transaction_table[index];
}
TransactionLog * stasis_transaction_table_get(int xid) {
return &stasis_transaction_table[xid % MAX_TRANSACTIONS];
}
int stasis_transaction_table_commit(int xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
stasis_transaction_table[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
DEBUG("%s:%d deactivate %d\n",__FILE__,__LINE__,xid);
stasis_transaction_table_active_count--;
assert( stasis_transaction_table_active_count >= 0 );
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return 0;
}
int stasis_transaction_table_forget(int xid) {
assert(xid != INVALID_XTABLE_XID);
TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS];
if(l->xid == xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
l->xid = INVALID_XTABLE_XID;
l->prevLSN = -1;
l->recLSN = -1;
stasis_transaction_table_active_count--;
assert(stasis_transaction_table_active_count >= 0);
pthread_mutex_unlock(&stasis_transaction_table_mutex);
} else {
assert(l->xid == INVALID_XTABLE_XID);
}
return 0;
}

View file

@ -7,6 +7,7 @@
#include <stasis/compensations.h>
#include <stasis/pageHandle.h>
#include <stasis/page.h>
#include <stasis/transactionTable.h>
#include <stasis/bufferManager/legacy/pageFile.h>
@ -27,10 +28,6 @@
static int stasis_initted = 0;
TransactionLog stasis_transaction_table[MAX_TRANSACTIONS];
static int stasis_transaction_table_num_active = 0;
static int stasis_transaction_table_xid_count = 0;
static stasis_log_t* stasis_log_file = 0;
stasis_dirty_page_table_t * stasis_dirty_page_table = 0;
static stasis_truncation_t * stasis_truncation = 0;
@ -42,25 +39,6 @@ void * stasis_runtime_buffer_manager() {
return stasis_buffer_manager;
}
/**
This mutex protects stasis_transaction_table, numActiveXactions and
xidCount.
*/
static pthread_mutex_t stasis_transaction_table_mutex;
typedef enum {
INVALID_XTABLE_XID = INVALID_XID,
PENDING_XTABLE_XID = -2
} stasis_transaction_table_status;
void stasis_transaction_table_init() {
memset(stasis_transaction_table, INVALID_XTABLE_XID,
sizeof(TransactionLog)*MAX_TRANSACTIONS);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_init(&stasis_transaction_table[i].mut,0);
}
}
void * stasis_runtime_dirty_page_table() {
return stasis_dirty_page_table;
}
@ -81,9 +59,7 @@ stasis_page_handle_t* stasis_page_handle_default_factory(stasis_log_t *log, stas
}
int Tinit() {
pthread_mutex_init(&stasis_transaction_table_mutex, NULL);
stasis_initted = 1;
stasis_transaction_table_num_active = 0;
compensations_init();
@ -138,40 +114,21 @@ int Tinit() {
int Tbegin() {
int i, index = 0;
int xidCount_tmp;
assert(stasis_initted);
pthread_mutex_lock(&stasis_transaction_table_mutex);
int xid;
if( stasis_transaction_table_num_active == MAX_TRANSACTIONS ) {
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return LLADD_EXCEED_MAX_TRANSACTIONS;
TransactionLog* newXact = stasis_transaction_table_begin(&xid);
if(newXact != 0) {
stasis_log_begin_transaction(stasis_log_file, xid, newXact);
if(globalLockManager.begin) { globalLockManager.begin(newXact->xid); }
return newXact->xid;
} else {
assert(xid == LLADD_EXCEED_MAX_TRANSACTIONS);
return xid;
}
else {
DEBUG("%s:%d activate in begin\n",__FILE__,__LINE__);
stasis_transaction_table_num_active++;
}
for( i = 0; i < MAX_TRANSACTIONS; i++ ) {
stasis_transaction_table_xid_count++;
if( stasis_transaction_table[stasis_transaction_table_xid_count%MAX_TRANSACTIONS].xid == INVALID_XTABLE_XID ) {
index = stasis_transaction_table_xid_count%MAX_TRANSACTIONS;
break;
}
}
xidCount_tmp = stasis_transaction_table_xid_count;
stasis_transaction_table[index].xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
stasis_log_begin_transaction(stasis_log_file, xidCount_tmp, &stasis_transaction_table[index]);
if(globalLockManager.begin) { globalLockManager.begin(stasis_transaction_table[index].xid); }
return stasis_transaction_table[index].xid;
}
compensated_function void Tupdate(int xid, pageid_t page,
@ -322,13 +279,7 @@ static inline int TcommitHelper(int xid, int force) {
}
pthread_mutex_lock(&stasis_transaction_table_mutex);
stasis_transaction_table[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
DEBUG("%s:%d deactivate %d\n",__FILE__,__LINE__,xid);
stasis_transaction_table_num_active--;
assert( stasis_transaction_table_num_active >= 0 );
pthread_mutex_unlock(&stasis_transaction_table_mutex);
stasis_transaction_table_commit(xid);
return 0;
}
@ -376,18 +327,21 @@ int Tforget(int xid) {
return 0;
}
int Tdeinit() {
int i;
for( i = 0; i < MAX_TRANSACTIONS; i++ ) {
if( stasis_transaction_table[i].xid != INVALID_XTABLE_XID ) {
int * active = stasis_transaction_table_list_active();
int count = stasis_transaction_table_num_active();
for(int i = 0; i < count; i++) {
if(!stasis_suppress_unclean_shutdown_warnings) {
fprintf(stderr, "WARNING: Tdeinit() is aborting transaction %d\n",
stasis_transaction_table[i].xid);
}
Tabort(stasis_transaction_table[i].xid);
Tabort(active[i]);
}
}
assert( stasis_transaction_table_num_active == 0 );
assert( stasis_transaction_table_num_active() == 0 );
free(active);
stasis_truncation_deinit(stasis_truncation);
TnaiveHashDeinit();
stasis_alloc_deinit(stasis_alloc);
@ -398,6 +352,7 @@ int Tdeinit() {
stasis_log_group_force_t * group_force = stasis_log_file->group_force;
stasis_log_file->close(stasis_log_file);
if(group_force) { stasis_log_group_force_deinit(group_force); }
stasis_transaction_table_deinit();
stasis_dirty_page_table_deinit(stasis_dirty_page_table);
stasis_initted = 0;
@ -418,7 +373,7 @@ int TuncleanShutdown() {
// XXX: close_file?
stasis_page_deinit();
stasis_log_file->close(stasis_log_file);
stasis_transaction_table_num_active = 0;
stasis_transaction_table_deinit();
stasis_dirty_page_table_deinit(stasis_dirty_page_table);
// Reset it here so the warnings will appear if a new stasis
@ -427,99 +382,6 @@ int TuncleanShutdown() {
return 0;
}
void stasis_transaction_table_max_transaction_id_set(int xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
stasis_transaction_table_xid_count = xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
}
void stasis_transaction_table_active_transaction_count_set(int xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
stasis_transaction_table_num_active = xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
}
lsn_t stasis_transaction_table_minRecLSN() {
lsn_t minRecLSN = LSN_T_MAX;
pthread_mutex_lock(&stasis_transaction_table_mutex);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) {
lsn_t recLSN = stasis_transaction_table[i].recLSN;
if(recLSN != -1 && recLSN < minRecLSN) {
minRecLSN = recLSN;
}
}
}
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return minRecLSN;
}
int TactiveTransactionCount() {
return stasis_transaction_table_num_active;
}
int* TlistActiveTransactions() {
pthread_mutex_lock(&stasis_transaction_table_mutex);
int * ret = malloc(sizeof(*ret));
ret[0] = 0;
int retcount = 0;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) {
ret[retcount] = stasis_transaction_table[i].xid;
retcount++;
ret = realloc(ret, (retcount+1) * sizeof(*ret));
ret[retcount] = 0;
}
}
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return ret;
}
int TisActiveTransaction(int xid) {
if(xid < 0) { return 0; }
pthread_mutex_lock(&stasis_transaction_table_mutex);
int ret = xid != INVALID_XTABLE_XID && stasis_transaction_table[xid%MAX_TRANSACTIONS].xid == xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return ret;
}
int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN) {
TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS];
if(l->xid == xid) {
// rolling forward CLRs / NTAs makes prevLSN decrease.
assert(l->prevLSN >= prevLSN);
} else {
pthread_mutex_lock(&stasis_transaction_table_mutex);
assert(l->xid == INVALID_XTABLE_XID);
l->xid = xid;
l->recLSN = lsn;
stasis_transaction_table_num_active++;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
}
l->prevLSN = lsn;
return 0;
}
int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn,
lsn_t prevLSN,
lsn_t recLSN) {
assert(stasis_transaction_table[xid%MAX_TRANSACTIONS].recLSN == recLSN);
return stasis_transaction_table_roll_forward(xid, lsn, prevLSN);
}
int stasis_transaction_table_forget(int xid) {
assert(xid != INVALID_XTABLE_XID);
TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS];
if(l->xid == xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
l->xid = INVALID_XTABLE_XID;
l->prevLSN = -1;
l->recLSN = -1;
stasis_transaction_table_num_active--;
assert(stasis_transaction_table_num_active >= 0);
pthread_mutex_unlock(&stasis_transaction_table_mutex);
} else {
assert(l->xid == INVALID_XTABLE_XID);
}
return 0;
}
int TdurabilityLevel() {
if(stasis_buffer_manager_factory == stasis_buffer_manager_mem_array_factory) {
return VOLATILE;

View file

@ -54,7 +54,6 @@ terms specified in this license.
#include <stasis/common.h>
typedef struct stasis_log_t stasis_log_t;
typedef struct TransactionLog TransactionLog;
typedef struct stasis_log_group_force_t stasis_log_group_force_t;
@ -66,16 +65,7 @@ typedef enum {
#include <stasis/logger/logEntry.h>
#include <stasis/truncation.h>
#include <stasis/constants.h>
/**
Contains the state needed by the logging layer to perform
operations on a transaction.
*/
struct TransactionLog {
int xid;
lsn_t prevLSN;
lsn_t recLSN;
pthread_mutex_t mut;
};
#include <stasis/transactionTable.h>
/**
A callback function that allows logHandle's iterator to stop
@ -84,13 +74,6 @@ struct TransactionLog {
*/
typedef int (guard_fcn_t)(const LogEntry *, void *);
/**
XXX TransactionTable should be private to transactional2.c!
*/
extern TransactionLog stasis_transaction_table[MAX_TRANSACTIONS];
/**
* Interface provided by Stasis log implementations.
*

73
stasis/transactionTable.h Normal file
View file

@ -0,0 +1,73 @@
/*
* transactionTable.h
*
* Created on: Oct 14, 2009
* Author: sears
*/
#ifndef TRANSACTIONTABLE_H_
#define TRANSACTIONTABLE_H_
#include <stasis/common.h>
typedef struct TransactionLog TransactionLog;
/**
Contains the state needed by the logging layer to perform
operations on a transaction.
*/
struct TransactionLog {
int xid;
lsn_t prevLSN;
lsn_t recLSN;
pthread_mutex_t mut;
};
/**
XXX TransactionTable should be private to transactional2.c!
*/
extern TransactionLog stasis_transaction_table[MAX_TRANSACTIONS];
/**
Initialize Stasis' transaction table. Called by Tinit() and unit
tests that wish to test portions of Stasis in isolation.
*/
void stasis_transaction_table_init();
/** Free resources associated with the transaction table */
void stasis_transaction_table_deinit();
/**
* Used by recovery to prevent reuse of old transaction ids.
*
* Should not be used elsewhere.
*
* @param xid The highest transaction id issued so far.
*/
void stasis_transaction_table_max_transaction_id_set(int xid);
/**
* Used by test cases to mess with internal transaction table state.
*
* @param xid The new active transaction count.
*/
void stasis_transaction_table_active_transaction_count_set(int xid);
int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN);
/**
@todo update Tprepare() to not write reclsn to log, then remove
this function.
*/
int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn,
lsn_t prevLSN,
lsn_t recLSN);
/**
This is used by log truncation.
*/
lsn_t stasis_transaction_table_minRecLSN();
TransactionLog * stasis_transaction_table_begin(int * xid);
TransactionLog * stasis_transaction_table_get(int xid);
int stasis_transaction_table_commit(int xid);
int stasis_transaction_table_forget(int xid);
int stasis_transaction_table_num_active();
int* stasis_transaction_table_list_active();
#endif /* TRANSACTIONTABLE_H_ */

View file

@ -739,41 +739,6 @@ int TisActiveTransaction(int xid);
*/
int TactiveTransactionCount();
/**
Initialize Stasis' transaction table. Called by Tinit() and unit
tests that wish to test portions of Stasis in isolation.
*/
void stasis_transaction_table_init();
/**
* Used by recovery to prevent reuse of old transaction ids.
*
* Should not be used elsewhere.
*
* @param xid The highest transaction id issued so far.
*/
void stasis_transaction_table_max_transaction_id_set(int xid);
/**
* Used by test cases to mess with internal transaction table state.
*
* @param xid The new active transaction count.
*/
void stasis_transaction_table_active_transaction_count_set(int xid);
int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN);
/**
@todo update Tprepare() to not write reclsn to log, then remove
this function.
*/
int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn,
lsn_t prevLSN,
lsn_t recLSN);
int stasis_transaction_table_forget(int xid);
/**
This is used by log truncation.
*/
lsn_t stasis_transaction_table_minRecLSN();
/**
* Called at the end of transactions aborted by recovery, after the transaction
* has been completely rolled back (ie: all rollback entries are in the log's