diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index a932a73..805616e 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -6,7 +6,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c bufferManager/legacy/legacyBufferManager.c page.c bufferPool.c blobManager.c recovery2.c truncation.c transactional2.c - dirtyPageTable.c + dirtyPageTable.c transactionTable.c allocationPolicy.c lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c logger/logEntry.c diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index 96fdc6c..574acb6 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -3,7 +3,7 @@ lib_LTLIBRARIES=libstasis.la libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common.c flags.c \ stats.c linkedlist.c operations.c pageHandle.c \ page.c bufferPool.c blobManager.c recovery2.c truncation.c \ - dirtyPageTable.c \ + dirtyPageTable.c transactionTable.c \ transactional2.c allocationPolicy.c \ lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\ logger/logEntry.c \ diff --git a/src/stasis/transactionTable.c b/src/stasis/transactionTable.c new file mode 100644 index 0000000..20e7f54 --- /dev/null +++ b/src/stasis/transactionTable.c @@ -0,0 +1,189 @@ +/* + * transactionTable.c + * + * Created on: Oct 14, 2009 + * Author: sears + */ +#include +#include +#include +#include + +TransactionLog stasis_transaction_table[MAX_TRANSACTIONS]; +static int stasis_transaction_table_active_count = 0; +static int stasis_transaction_table_xid_count = 0; + +int stasis_transaction_table_num_active() { + return stasis_transaction_table_active_count; +} + +/** + This mutex protects stasis_transaction_table, numActiveXactions and + xidCount. +*/ +static pthread_mutex_t stasis_transaction_table_mutex; + +typedef enum { + INVALID_XTABLE_XID = INVALID_XID, + PENDING_XTABLE_XID = -2 +} stasis_transaction_table_status; + +void stasis_transaction_table_init() { + pthread_mutex_init(&stasis_transaction_table_mutex, NULL); + stasis_transaction_table_active_count = 0; + + memset(stasis_transaction_table, INVALID_XTABLE_XID, + sizeof(TransactionLog)*MAX_TRANSACTIONS); + for(int i = 0; i < MAX_TRANSACTIONS; i++) { + pthread_mutex_init(&stasis_transaction_table[i].mut,0); + } +} + +void stasis_transaction_table_deinit() { + pthread_mutex_destroy(&stasis_transaction_table_mutex); + stasis_transaction_table_active_count = 0; + for(int i = 0; i < MAX_TRANSACTIONS; i++) { + pthread_mutex_destroy(&stasis_transaction_table[i].mut); + } + memset(stasis_transaction_table, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS); +} + +void stasis_transaction_table_max_transaction_id_set(int xid) { + pthread_mutex_lock(&stasis_transaction_table_mutex); + stasis_transaction_table_xid_count = xid; + pthread_mutex_unlock(&stasis_transaction_table_mutex); +} +void stasis_transaction_table_active_transaction_count_set(int xid) { + pthread_mutex_lock(&stasis_transaction_table_mutex); + stasis_transaction_table_active_count = xid; + pthread_mutex_unlock(&stasis_transaction_table_mutex); +} + +lsn_t stasis_transaction_table_minRecLSN() { + lsn_t minRecLSN = LSN_T_MAX; + pthread_mutex_lock(&stasis_transaction_table_mutex); + for(int i = 0; i < MAX_TRANSACTIONS; i++) { + if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) { + lsn_t recLSN = stasis_transaction_table[i].recLSN; + if(recLSN != -1 && recLSN < minRecLSN) { + minRecLSN = recLSN; + } + } + } + pthread_mutex_unlock(&stasis_transaction_table_mutex); + return minRecLSN; +} + + +int TactiveTransactionCount() { + return stasis_transaction_table_active_count; +} + +int* stasis_transaction_table_list_active() { + pthread_mutex_lock(&stasis_transaction_table_mutex); + int * ret = malloc(sizeof(*ret)); + ret[0] = 0; + int retcount = 0; + for(int i = 0; i < MAX_TRANSACTIONS; i++) { + if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) { + ret[retcount] = stasis_transaction_table[i].xid; + retcount++; + ret = realloc(ret, (retcount+1) * sizeof(*ret)); + ret[retcount] = 0; + } + } + pthread_mutex_unlock(&stasis_transaction_table_mutex); + return ret; +} +int* TlistActiveTransactions() { + return stasis_transaction_table_list_active(); +} +int TisActiveTransaction(int xid) { + if(xid < 0) { return 0; } + pthread_mutex_lock(&stasis_transaction_table_mutex); + int ret = xid != INVALID_XTABLE_XID && stasis_transaction_table[xid%MAX_TRANSACTIONS].xid == xid; + pthread_mutex_unlock(&stasis_transaction_table_mutex); + return ret; +} + +int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN) { + TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS]; + if(l->xid == xid) { + // rolling forward CLRs / NTAs makes prevLSN decrease. + assert(l->prevLSN >= prevLSN); + } else { + pthread_mutex_lock(&stasis_transaction_table_mutex); + assert(l->xid == INVALID_XTABLE_XID); + l->xid = xid; + l->recLSN = lsn; + stasis_transaction_table_active_count++; + pthread_mutex_unlock(&stasis_transaction_table_mutex); + } + l->prevLSN = lsn; + return 0; +} +int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn, + lsn_t prevLSN, + lsn_t recLSN) { + assert(stasis_transaction_table[xid%MAX_TRANSACTIONS].recLSN == recLSN); + return stasis_transaction_table_roll_forward(xid, lsn, prevLSN); +} +TransactionLog * stasis_transaction_table_begin(int * xid) { + int index = 0; + int xidCount_tmp; + + pthread_mutex_lock(&stasis_transaction_table_mutex); + + if( stasis_transaction_table_active_count == MAX_TRANSACTIONS ) { + pthread_mutex_unlock(&stasis_transaction_table_mutex); + *xid = LLADD_EXCEED_MAX_TRANSACTIONS; + return 0; + } else { + DEBUG("%s:%d activate in begin\n",__FILE__,__LINE__); + stasis_transaction_table_active_count++; + } + for(int i = 0; i < MAX_TRANSACTIONS; i++ ) { + stasis_transaction_table_xid_count++; + if( stasis_transaction_table[stasis_transaction_table_xid_count%MAX_TRANSACTIONS].xid == INVALID_XTABLE_XID ) { + index = stasis_transaction_table_xid_count%MAX_TRANSACTIONS; + break; + } + } + + xidCount_tmp = stasis_transaction_table_xid_count; + + stasis_transaction_table[index].xid = PENDING_XTABLE_XID; + + pthread_mutex_unlock(&stasis_transaction_table_mutex); + *xid = xidCount_tmp; + return &stasis_transaction_table[index]; +} +TransactionLog * stasis_transaction_table_get(int xid) { + return &stasis_transaction_table[xid % MAX_TRANSACTIONS]; +} +int stasis_transaction_table_commit(int xid) { + pthread_mutex_lock(&stasis_transaction_table_mutex); + + stasis_transaction_table[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID; + DEBUG("%s:%d deactivate %d\n",__FILE__,__LINE__,xid); + stasis_transaction_table_active_count--; + assert( stasis_transaction_table_active_count >= 0 ); + pthread_mutex_unlock(&stasis_transaction_table_mutex); + return 0; +} +int stasis_transaction_table_forget(int xid) { + assert(xid != INVALID_XTABLE_XID); + TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS]; + if(l->xid == xid) { + pthread_mutex_lock(&stasis_transaction_table_mutex); + l->xid = INVALID_XTABLE_XID; + l->prevLSN = -1; + l->recLSN = -1; + stasis_transaction_table_active_count--; + assert(stasis_transaction_table_active_count >= 0); + pthread_mutex_unlock(&stasis_transaction_table_mutex); + } else { + assert(l->xid == INVALID_XTABLE_XID); + } + return 0; +} diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index c4a9bc9..3bf6ed3 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -27,10 +28,6 @@ static int stasis_initted = 0; -TransactionLog stasis_transaction_table[MAX_TRANSACTIONS]; -static int stasis_transaction_table_num_active = 0; -static int stasis_transaction_table_xid_count = 0; - static stasis_log_t* stasis_log_file = 0; stasis_dirty_page_table_t * stasis_dirty_page_table = 0; static stasis_truncation_t * stasis_truncation = 0; @@ -42,25 +39,6 @@ void * stasis_runtime_buffer_manager() { return stasis_buffer_manager; } -/** - This mutex protects stasis_transaction_table, numActiveXactions and - xidCount. -*/ -static pthread_mutex_t stasis_transaction_table_mutex; - -typedef enum { - INVALID_XTABLE_XID = INVALID_XID, - PENDING_XTABLE_XID = -2 -} stasis_transaction_table_status; - -void stasis_transaction_table_init() { - memset(stasis_transaction_table, INVALID_XTABLE_XID, - sizeof(TransactionLog)*MAX_TRANSACTIONS); - for(int i = 0; i < MAX_TRANSACTIONS; i++) { - pthread_mutex_init(&stasis_transaction_table[i].mut,0); - } -} - void * stasis_runtime_dirty_page_table() { return stasis_dirty_page_table; } @@ -81,9 +59,7 @@ stasis_page_handle_t* stasis_page_handle_default_factory(stasis_log_t *log, stas } int Tinit() { - pthread_mutex_init(&stasis_transaction_table_mutex, NULL); stasis_initted = 1; - stasis_transaction_table_num_active = 0; compensations_init(); @@ -138,40 +114,21 @@ int Tinit() { int Tbegin() { - int i, index = 0; - int xidCount_tmp; + assert(stasis_initted); - assert(stasis_initted); + int xid; - pthread_mutex_lock(&stasis_transaction_table_mutex); + TransactionLog* newXact = stasis_transaction_table_begin(&xid); + if(newXact != 0) { + stasis_log_begin_transaction(stasis_log_file, xid, newXact); - if( stasis_transaction_table_num_active == MAX_TRANSACTIONS ) { - pthread_mutex_unlock(&stasis_transaction_table_mutex); - return LLADD_EXCEED_MAX_TRANSACTIONS; - } - else { - DEBUG("%s:%d activate in begin\n",__FILE__,__LINE__); - stasis_transaction_table_num_active++; - } - for( i = 0; i < MAX_TRANSACTIONS; i++ ) { - stasis_transaction_table_xid_count++; - if( stasis_transaction_table[stasis_transaction_table_xid_count%MAX_TRANSACTIONS].xid == INVALID_XTABLE_XID ) { - index = stasis_transaction_table_xid_count%MAX_TRANSACTIONS; - break; - } - } + if(globalLockManager.begin) { globalLockManager.begin(newXact->xid); } - xidCount_tmp = stasis_transaction_table_xid_count; - - stasis_transaction_table[index].xid = PENDING_XTABLE_XID; - - pthread_mutex_unlock(&stasis_transaction_table_mutex); - - stasis_log_begin_transaction(stasis_log_file, xidCount_tmp, &stasis_transaction_table[index]); - - if(globalLockManager.begin) { globalLockManager.begin(stasis_transaction_table[index].xid); } - - return stasis_transaction_table[index].xid; + return newXact->xid; + } else { + assert(xid == LLADD_EXCEED_MAX_TRANSACTIONS); + return xid; + } } compensated_function void Tupdate(int xid, pageid_t page, @@ -322,13 +279,7 @@ static inline int TcommitHelper(int xid, int force) { } - pthread_mutex_lock(&stasis_transaction_table_mutex); - - stasis_transaction_table[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID; - DEBUG("%s:%d deactivate %d\n",__FILE__,__LINE__,xid); - stasis_transaction_table_num_active--; - assert( stasis_transaction_table_num_active >= 0 ); - pthread_mutex_unlock(&stasis_transaction_table_mutex); + stasis_transaction_table_commit(xid); return 0; } @@ -376,18 +327,21 @@ int Tforget(int xid) { return 0; } int Tdeinit() { - int i; - for( i = 0; i < MAX_TRANSACTIONS; i++ ) { - if( stasis_transaction_table[i].xid != INVALID_XTABLE_XID ) { - if(!stasis_suppress_unclean_shutdown_warnings) { - fprintf(stderr, "WARNING: Tdeinit() is aborting transaction %d\n", - stasis_transaction_table[i].xid); - } - Tabort(stasis_transaction_table[i].xid); + int * active = stasis_transaction_table_list_active(); + int count = stasis_transaction_table_num_active(); + + for(int i = 0; i < count; i++) { + if(!stasis_suppress_unclean_shutdown_warnings) { + fprintf(stderr, "WARNING: Tdeinit() is aborting transaction %d\n", + stasis_transaction_table[i].xid); } + Tabort(active[i]); } - assert( stasis_transaction_table_num_active == 0 ); + assert( stasis_transaction_table_num_active() == 0 ); + + free(active); + stasis_truncation_deinit(stasis_truncation); TnaiveHashDeinit(); stasis_alloc_deinit(stasis_alloc); @@ -398,6 +352,7 @@ int Tdeinit() { stasis_log_group_force_t * group_force = stasis_log_file->group_force; stasis_log_file->close(stasis_log_file); if(group_force) { stasis_log_group_force_deinit(group_force); } + stasis_transaction_table_deinit(); stasis_dirty_page_table_deinit(stasis_dirty_page_table); stasis_initted = 0; @@ -418,7 +373,7 @@ int TuncleanShutdown() { // XXX: close_file? stasis_page_deinit(); stasis_log_file->close(stasis_log_file); - stasis_transaction_table_num_active = 0; + stasis_transaction_table_deinit(); stasis_dirty_page_table_deinit(stasis_dirty_page_table); // Reset it here so the warnings will appear if a new stasis @@ -427,99 +382,6 @@ int TuncleanShutdown() { return 0; } -void stasis_transaction_table_max_transaction_id_set(int xid) { - pthread_mutex_lock(&stasis_transaction_table_mutex); - stasis_transaction_table_xid_count = xid; - pthread_mutex_unlock(&stasis_transaction_table_mutex); -} -void stasis_transaction_table_active_transaction_count_set(int xid) { - pthread_mutex_lock(&stasis_transaction_table_mutex); - stasis_transaction_table_num_active = xid; - pthread_mutex_unlock(&stasis_transaction_table_mutex); -} - -lsn_t stasis_transaction_table_minRecLSN() { - lsn_t minRecLSN = LSN_T_MAX; - pthread_mutex_lock(&stasis_transaction_table_mutex); - for(int i = 0; i < MAX_TRANSACTIONS; i++) { - if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) { - lsn_t recLSN = stasis_transaction_table[i].recLSN; - if(recLSN != -1 && recLSN < minRecLSN) { - minRecLSN = recLSN; - } - } - } - pthread_mutex_unlock(&stasis_transaction_table_mutex); - return minRecLSN; -} - -int TactiveTransactionCount() { - return stasis_transaction_table_num_active; -} - -int* TlistActiveTransactions() { - pthread_mutex_lock(&stasis_transaction_table_mutex); - int * ret = malloc(sizeof(*ret)); - ret[0] = 0; - int retcount = 0; - for(int i = 0; i < MAX_TRANSACTIONS; i++) { - if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) { - ret[retcount] = stasis_transaction_table[i].xid; - retcount++; - ret = realloc(ret, (retcount+1) * sizeof(*ret)); - ret[retcount] = 0; - } - } - pthread_mutex_unlock(&stasis_transaction_table_mutex); - return ret; -} -int TisActiveTransaction(int xid) { - if(xid < 0) { return 0; } - pthread_mutex_lock(&stasis_transaction_table_mutex); - int ret = xid != INVALID_XTABLE_XID && stasis_transaction_table[xid%MAX_TRANSACTIONS].xid == xid; - pthread_mutex_unlock(&stasis_transaction_table_mutex); - return ret; -} - -int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN) { - TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS]; - if(l->xid == xid) { - // rolling forward CLRs / NTAs makes prevLSN decrease. - assert(l->prevLSN >= prevLSN); - } else { - pthread_mutex_lock(&stasis_transaction_table_mutex); - assert(l->xid == INVALID_XTABLE_XID); - l->xid = xid; - l->recLSN = lsn; - stasis_transaction_table_num_active++; - pthread_mutex_unlock(&stasis_transaction_table_mutex); - } - l->prevLSN = lsn; - return 0; -} -int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn, - lsn_t prevLSN, - lsn_t recLSN) { - assert(stasis_transaction_table[xid%MAX_TRANSACTIONS].recLSN == recLSN); - return stasis_transaction_table_roll_forward(xid, lsn, prevLSN); -} -int stasis_transaction_table_forget(int xid) { - assert(xid != INVALID_XTABLE_XID); - TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS]; - if(l->xid == xid) { - pthread_mutex_lock(&stasis_transaction_table_mutex); - l->xid = INVALID_XTABLE_XID; - l->prevLSN = -1; - l->recLSN = -1; - stasis_transaction_table_num_active--; - assert(stasis_transaction_table_num_active >= 0); - pthread_mutex_unlock(&stasis_transaction_table_mutex); - } else { - assert(l->xid == INVALID_XTABLE_XID); - } - return 0; -} - int TdurabilityLevel() { if(stasis_buffer_manager_factory == stasis_buffer_manager_mem_array_factory) { return VOLATILE; diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index c2a9b83..b7dd450 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -54,7 +54,6 @@ terms specified in this license. #include typedef struct stasis_log_t stasis_log_t; -typedef struct TransactionLog TransactionLog; typedef struct stasis_log_group_force_t stasis_log_group_force_t; @@ -66,16 +65,7 @@ typedef enum { #include #include #include -/** - Contains the state needed by the logging layer to perform - operations on a transaction. - */ -struct TransactionLog { - int xid; - lsn_t prevLSN; - lsn_t recLSN; - pthread_mutex_t mut; -}; +#include /** A callback function that allows logHandle's iterator to stop @@ -84,13 +74,6 @@ struct TransactionLog { */ typedef int (guard_fcn_t)(const LogEntry *, void *); - - -/** - XXX TransactionTable should be private to transactional2.c! -*/ -extern TransactionLog stasis_transaction_table[MAX_TRANSACTIONS]; - /** * Interface provided by Stasis log implementations. * diff --git a/stasis/transactionTable.h b/stasis/transactionTable.h new file mode 100644 index 0000000..20eeab6 --- /dev/null +++ b/stasis/transactionTable.h @@ -0,0 +1,73 @@ +/* + * transactionTable.h + * + * Created on: Oct 14, 2009 + * Author: sears + */ + +#ifndef TRANSACTIONTABLE_H_ +#define TRANSACTIONTABLE_H_ + +#include +typedef struct TransactionLog TransactionLog; + +/** + Contains the state needed by the logging layer to perform + operations on a transaction. + */ +struct TransactionLog { + int xid; + lsn_t prevLSN; + lsn_t recLSN; + pthread_mutex_t mut; +}; + +/** + XXX TransactionTable should be private to transactional2.c! +*/ +extern TransactionLog stasis_transaction_table[MAX_TRANSACTIONS]; + +/** + Initialize Stasis' transaction table. Called by Tinit() and unit + tests that wish to test portions of Stasis in isolation. + */ +void stasis_transaction_table_init(); +/** Free resources associated with the transaction table */ +void stasis_transaction_table_deinit(); +/** + * Used by recovery to prevent reuse of old transaction ids. + * + * Should not be used elsewhere. + * + * @param xid The highest transaction id issued so far. + */ +void stasis_transaction_table_max_transaction_id_set(int xid); +/** + * Used by test cases to mess with internal transaction table state. + * + * @param xid The new active transaction count. + */ +void stasis_transaction_table_active_transaction_count_set(int xid); + +int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN); +/** + @todo update Tprepare() to not write reclsn to log, then remove + this function. + */ +int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn, + lsn_t prevLSN, + lsn_t recLSN); +/** + This is used by log truncation. +*/ +lsn_t stasis_transaction_table_minRecLSN(); + +TransactionLog * stasis_transaction_table_begin(int * xid); +TransactionLog * stasis_transaction_table_get(int xid); +int stasis_transaction_table_commit(int xid); +int stasis_transaction_table_forget(int xid); + +int stasis_transaction_table_num_active(); +int* stasis_transaction_table_list_active(); + +#endif /* TRANSACTIONTABLE_H_ */ diff --git a/stasis/transactional.h b/stasis/transactional.h index 11f0ca4..07986ae 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -739,41 +739,6 @@ int TisActiveTransaction(int xid); */ int TactiveTransactionCount(); -/** - Initialize Stasis' transaction table. Called by Tinit() and unit - tests that wish to test portions of Stasis in isolation. - */ -void stasis_transaction_table_init(); - -/** - * Used by recovery to prevent reuse of old transaction ids. - * - * Should not be used elsewhere. - * - * @param xid The highest transaction id issued so far. - */ -void stasis_transaction_table_max_transaction_id_set(int xid); -/** - * Used by test cases to mess with internal transaction table state. - * - * @param xid The new active transaction count. - */ -void stasis_transaction_table_active_transaction_count_set(int xid); - -int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN); -/** - @todo update Tprepare() to not write reclsn to log, then remove - this function. - */ -int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn, - lsn_t prevLSN, - lsn_t recLSN); -int stasis_transaction_table_forget(int xid); - -/** - This is used by log truncation. -*/ -lsn_t stasis_transaction_table_minRecLSN(); /** * Called at the end of transactions aborted by recovery, after the transaction * has been completely rolled back (ie: all rollback entries are in the log's