more refactoring; no more globals in transactionTable.c

This commit is contained in:
Sears Russell 2009-10-14 21:22:50 +00:00
parent dbe3ecf0d0
commit 4565aff9de
16 changed files with 213 additions and 195 deletions

View file

@ -112,7 +112,7 @@ int main(int argc, char ** argv) {
} */
}
if(net) {
last_lsn = stasis_transaction_table[xid%MAX_TRANSACTIONS].prevLSN;
last_lsn = stasis_transaction_table_get(stasis_runtime_transaction_table(), xid)->prevLSN;
}
Tcommit(xid);
}

View file

@ -157,7 +157,7 @@ void * writeback_unit_of_work(void * ap) {
stasis_log_reordering_handle_t * rh
= stasis_log_reordering_handle_open(
&stasis_transaction_table[ua->xid%MAX_TRANSACTIONS],
stasis_transaction_table_get(stasis_runtime_transaction_table(), ua->xid),
stasis_log(),
(0.9*stasis_log_file_write_buffer_size)/ua->divisor,
//512*1024/ua->divisor, // 0.5 mb in log tail at once
@ -199,7 +199,7 @@ void * bg_unit_of_work(void * ap) {
bulk_worker_args * a = ua->a;
stasis_log_reordering_handle_t * rh
= stasis_log_reordering_handle_open(&stasis_transaction_table[ua->xid%MAX_TRANSACTIONS],
= stasis_log_reordering_handle_open(stasis_transaction_table_get(stasis_runtime_transaction_table(), ua->xid),
stasis_log(),
(stasis_log_file_write_buffer_size * 0.25)/ua->divisor,
//512*1024/ua->divisor, // 0.5 mb in log tail at once

View file

@ -52,7 +52,7 @@ terms specified in this license.
#include <stasis/logger/inMemoryLog.h>
#include <stasis/page.h>
static lsn_t stasis_log_write_common(stasis_log_t* log, TransactionLog * l, int type) {
static lsn_t stasis_log_write_common(stasis_log_t* log, stasis_transaction_table_entry_t * l, int type) {
LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type);
lsn_t ret;
@ -73,7 +73,7 @@ static lsn_t stasis_log_write_common(stasis_log_t* log, TransactionLog * l, int
return ret;
}
static lsn_t stasis_log_write_prepare(stasis_log_t* log, TransactionLog * l) {
static lsn_t stasis_log_write_prepare(stasis_log_t* log, stasis_transaction_table_entry_t * l) {
LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN);
lsn_t ret;
@ -96,7 +96,7 @@ static lsn_t stasis_log_write_prepare(stasis_log_t* log, TransactionLog * l) {
}
LogEntry * stasis_log_write_update(stasis_log_t* log, TransactionLog * l,
LogEntry * stasis_log_write_update(stasis_log_t* log, stasis_transaction_table_entry_t * l,
pageid_t page, unsigned int op,
const byte * arg, size_t arg_size) {
@ -113,12 +113,12 @@ LogEntry * stasis_log_write_update(stasis_log_t* log, TransactionLog * l,
return e;
}
LogEntry * stasis_log_begin_nta(stasis_log_t* log, TransactionLog * l, unsigned int op,
LogEntry * stasis_log_begin_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, unsigned int op,
const byte * arg, size_t arg_size) {
LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op, INVALID_PAGE, arg, arg_size);
return e;
}
lsn_t stasis_log_end_nta(stasis_log_t* log, TransactionLog * l, LogEntry * e) {
lsn_t stasis_log_end_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, LogEntry * e) {
log->write_entry(log, e);
pthread_mutex_lock(&l->mut);
if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; }
@ -149,7 +149,7 @@ lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid, lsn_t prevLSN) {
return ret;
}
void stasis_log_begin_transaction(stasis_log_t* log, int xid, TransactionLog* tl) {
void stasis_log_begin_transaction(stasis_log_t* log, int xid, stasis_transaction_table_entry_t* tl) {
tl->xid = xid;
DEBUG("Log Begin %d\n", xid);
@ -157,20 +157,20 @@ void stasis_log_begin_transaction(stasis_log_t* log, int xid, TransactionLog* tl
tl->recLSN = INVALID_LSN;
}
lsn_t stasis_log_abort_transaction(stasis_log_t* log, TransactionLog * l) {
lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) {
return stasis_log_write_common(log, l, XABORT);
}
lsn_t stasis_log_end_aborted_transaction(stasis_log_t* log, TransactionLog * l) {
lsn_t stasis_log_end_aborted_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) {
return stasis_log_write_common(log, l, XEND);
}
lsn_t stasis_log_prepare_transaction(stasis_log_t* log, TransactionLog * l) {
lsn_t stasis_log_prepare_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) {
lsn_t lsn = stasis_log_write_prepare(log, l);
stasis_log_force(log, lsn, LOG_FORCE_COMMIT);
return lsn;
}
lsn_t stasis_log_commit_transaction(stasis_log_t* log, TransactionLog * l, int force) {
lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l, int force) {
lsn_t lsn = stasis_log_write_common(log, l, XCOMMIT);
if(force) {
stasis_log_force(log, lsn, LOG_FORCE_COMMIT);

View file

@ -75,7 +75,7 @@ void stasis_log_reordering_handle_close(stasis_log_reordering_handle_t * h) {
free(h);
}
stasis_log_reordering_handle_t *
stasis_log_reordering_handle_open(TransactionLog * l,
stasis_log_reordering_handle_open(stasis_transaction_table_entry_t * l,
stasis_log_t* log,
size_t chunk_len,
size_t max_len,

View file

@ -8,6 +8,9 @@
#include <pbl/pbl.h>
#include <stasis/recovery.h>
#include <stasis/transactionTable.h>
#include <stasis/bufferManager.h>
#include <stasis/lockManager.h>
@ -45,7 +48,7 @@ static pthread_mutex_t rollback_mutex = PTHREAD_MUTEX_INITIALIZER;
no longer reads the pages in, there's no longer any reason to build
the list of dirty pages.
*/
static void stasis_recovery_analysis(stasis_log_t* log) {
static void stasis_recovery_analysis(stasis_log_t* log, stasis_transaction_table_t * tbl) {
DEBUG("Recovery: Analysis\n");
@ -111,7 +114,7 @@ static void stasis_recovery_analysis(stasis_log_t* log) {
lsn_t* free_lsn = pblHtLookup(transactionLSN, &(e->xid), sizeof(int));
pblHtRemove(transactionLSN, &(e->xid), sizeof(int));
free(free_lsn);
stasis_transaction_table_forget(e->xid);
stasis_transaction_table_forget(tbl, e->xid);
}
break;
case UPDATELOG:
@ -148,7 +151,7 @@ static void stasis_recovery_analysis(stasis_log_t* log) {
freeLogEntry(e);
}
freeLogHandle(lh);
stasis_transaction_table_max_transaction_id_set(highestXid);
stasis_transaction_table_max_transaction_id_set(tbl, highestXid);
}
/**
@ -168,7 +171,7 @@ static void stasis_recovery_analysis(stasis_log_t* log) {
Y (NTA replaces physical undo)
*/
static void stasis_recovery_redo(stasis_log_t* log) {
static void stasis_recovery_redo(stasis_log_t* log, stasis_transaction_table_t * tbl) {
LogHandle* lh = getLogHandle(log);
const LogEntry * e;
@ -178,7 +181,7 @@ static void stasis_recovery_redo(stasis_log_t* log) {
// Is this log entry part of a transaction that needs to be redone?
if(pblHtLookup(transactionLSN, &(e->xid), sizeof(int)) != NULL) {
if(e->type != INTERNALLOG) {
stasis_transaction_table_roll_forward(e->xid, e->LSN, e->prevLSN);
stasis_transaction_table_roll_forward(tbl, e->xid, e->LSN, e->prevLSN);
}
// Check to see if this entry's action needs to be redone
switch(e->type) {
@ -219,7 +222,7 @@ static void stasis_recovery_redo(stasis_log_t* log) {
} break;
case XCOMMIT:
{
stasis_transaction_table_forget(e->xid);
stasis_transaction_table_forget(tbl, e->xid);
if(globalLockManager.commit)
globalLockManager.commit(e->xid);
@ -246,7 +249,7 @@ static void stasis_recovery_redo(stasis_log_t* log) {
freeLogHandle(lh);
}
static void stasis_recovery_undo(stasis_log_t* log, int recovery) {
static void stasis_recovery_undo(stasis_log_t* log, stasis_transaction_table_t * tbl, int recovery) {
LogHandle* lh;
DEBUG("Recovery: Undo\n");
@ -293,7 +296,7 @@ static void stasis_recovery_undo(stasis_log_t* log, int recovery) {
lsn_t clr_lsn = stasis_log_write_clr(log, e);
DEBUG("logged clr\n");
stasis_transaction_table_roll_forward(e->xid, e->LSN, e->prevLSN);
stasis_transaction_table_roll_forward(tbl, e->xid, e->LSN, e->prevLSN);
stasis_operation_undo(e, clr_lsn, p);
@ -331,7 +334,7 @@ static void stasis_recovery_undo(stasis_log_t* log, int recovery) {
prepared = 1;
stasis_transaction_table_roll_forward_with_reclsn
(e->xid, e->LSN, e->prevLSN, getPrepareRecLSN(e));
(tbl, e->xid, e->LSN, e->prevLSN, getPrepareRecLSN(e));
} else {
DEBUG("xact was aborted\n");
}
@ -355,15 +358,15 @@ static void stasis_recovery_undo(stasis_log_t* log, int recovery) {
freeLogHandle(lh);
}
}
void stasis_recovery_initiate(stasis_log_t* log, stasis_alloc_t * alloc) {
void stasis_recovery_initiate(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_alloc_t * alloc) {
transactionLSN = pblHtCreate();
DEBUG("Analysis started\n");
stasis_recovery_analysis(log);
stasis_recovery_analysis(log, tbl);
DEBUG("Redo started\n");
stasis_recovery_redo(log);
stasis_recovery_redo(log, tbl);
DEBUG("Undo started\n");
stasis_recovery_undo(log,1);
stasis_recovery_undo(log, tbl, 1);
stasis_alloc_post_init(alloc);
DEBUG("Recovery complete.\n");
@ -377,7 +380,7 @@ void stasis_recovery_initiate(stasis_log_t* log, stasis_alloc_t * alloc) {
}
void undoTrans(stasis_log_t* log, TransactionLog transaction) {
void undoTrans(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_transaction_table_entry_t transaction) {
pthread_mutex_lock(&rollback_mutex);
assert(!rollbackLSNs);
@ -389,7 +392,7 @@ void undoTrans(stasis_log_t* log, TransactionLog transaction) {
/* Nothing to undo. (Happens for read-only xacts.) */
}
stasis_recovery_undo(log, 0);
stasis_recovery_undo(log, tbl, 0);
if(rollbackLSNs) {
destroyList(&rollbackLSNs);
}

View file

@ -7,181 +7,189 @@
#include <stasis/common.h>
#include <stasis/constants.h>
#include <stasis/transactionTable.h>
#include <stasis/transactional.h>
#include <assert.h>
TransactionLog stasis_transaction_table[MAX_TRANSACTIONS];
static int stasis_transaction_table_active_count = 0;
static int stasis_transaction_table_xid_count = 0;
int stasis_transaction_table_num_active() {
return stasis_transaction_table_active_count;
}
/**
This mutex protects stasis_transaction_table, numActiveXactions and
xidCount.
*/
static pthread_mutex_t stasis_transaction_table_mutex;
struct stasis_transaction_table_t {
int active_count;
int xid_count;
/**
This mutex protects the rest of the struct
xidCount.
*/
pthread_mutex_t mut;
stasis_transaction_table_entry_t table[MAX_TRANSACTIONS];
};
typedef enum {
INVALID_XTABLE_XID = INVALID_XID,
PENDING_XTABLE_XID = -2
} stasis_transaction_table_status;
void stasis_transaction_table_init() {
pthread_mutex_init(&stasis_transaction_table_mutex, NULL);
stasis_transaction_table_active_count = 0;
int stasis_transaction_table_num_active(stasis_transaction_table_t *tbl) {
return tbl->active_count;
}
int stasis_transaction_table_is_active(stasis_transaction_table_t *tbl, int xid) {
return xid >= 0 && tbl->table[xid % MAX_TRANSACTIONS].xid == xid;
}
memset(stasis_transaction_table, INVALID_XTABLE_XID,
sizeof(TransactionLog)*MAX_TRANSACTIONS);
stasis_transaction_table_t * stasis_transaction_table_init() {
stasis_transaction_table_t * tbl = malloc(sizeof(*tbl));
pthread_mutex_init(&tbl->mut, NULL);
tbl->active_count = 0;
memset(tbl->table, INVALID_XTABLE_XID,
sizeof(stasis_transaction_table_entry_t)*MAX_TRANSACTIONS);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_init(&stasis_transaction_table[i].mut,0);
pthread_mutex_init(&tbl->table[i].mut,0);
}
return tbl;
}
void stasis_transaction_table_deinit() {
pthread_mutex_destroy(&stasis_transaction_table_mutex);
stasis_transaction_table_active_count = 0;
void stasis_transaction_table_deinit(stasis_transaction_table_t *tbl) {
pthread_mutex_destroy(&tbl->mut);
tbl->active_count = 0;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
pthread_mutex_destroy(&stasis_transaction_table[i].mut);
pthread_mutex_destroy(&tbl->table[i].mut);
}
memset(stasis_transaction_table, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS);
free(tbl);
}
void stasis_transaction_table_max_transaction_id_set(int xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
stasis_transaction_table_xid_count = xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
void stasis_transaction_table_max_transaction_id_set(stasis_transaction_table_t *tbl, int xid) {
pthread_mutex_lock(&tbl->mut);
tbl->xid_count = xid;
pthread_mutex_unlock(&tbl->mut);
}
void stasis_transaction_table_active_transaction_count_set(int xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
stasis_transaction_table_active_count = xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
void stasis_transaction_table_active_transaction_count_set(stasis_transaction_table_t *tbl, int xid) {
pthread_mutex_lock(&tbl->mut);
tbl->active_count = xid;
pthread_mutex_unlock(&tbl->mut);
}
lsn_t stasis_transaction_table_minRecLSN() {
lsn_t stasis_transaction_table_minRecLSN(stasis_transaction_table_t *tbl) {
lsn_t minRecLSN = LSN_T_MAX;
pthread_mutex_lock(&stasis_transaction_table_mutex);
pthread_mutex_lock(&tbl->mut);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) {
lsn_t recLSN = stasis_transaction_table[i].recLSN;
if(tbl->table[i].xid != INVALID_XTABLE_XID) {
lsn_t recLSN = tbl->table[i].recLSN;
if(recLSN != -1 && recLSN < minRecLSN) {
minRecLSN = recLSN;
}
}
}
pthread_mutex_unlock(&stasis_transaction_table_mutex);
pthread_mutex_unlock(&tbl->mut);
return minRecLSN;
}
int TactiveTransactionCount() {
return stasis_transaction_table_active_count;
int TactiveTransactionCount(stasis_transaction_table_t *tbl) {
return tbl->active_count;
}
int* stasis_transaction_table_list_active() {
pthread_mutex_lock(&stasis_transaction_table_mutex);
int* stasis_transaction_table_list_active(stasis_transaction_table_t *tbl) {
pthread_mutex_lock(&tbl->mut);
int * ret = malloc(sizeof(*ret));
ret[0] = 0;
int retcount = 0;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(stasis_transaction_table[i].xid != INVALID_XTABLE_XID) {
ret[retcount] = stasis_transaction_table[i].xid;
if(tbl->table[i].xid != INVALID_XTABLE_XID) {
ret[retcount] = tbl->table[i].xid;
retcount++;
ret = realloc(ret, (retcount+1) * sizeof(*ret));
ret[retcount] = 0;
}
}
pthread_mutex_unlock(&stasis_transaction_table_mutex);
pthread_mutex_unlock(&tbl->mut);
return ret;
}
int* TlistActiveTransactions() {
return stasis_transaction_table_list_active();
return stasis_transaction_table_list_active(stasis_runtime_transaction_table());
}
int TisActiveTransaction(int xid) {
if(xid < 0) { return 0; }
pthread_mutex_lock(&stasis_transaction_table_mutex);
int ret = xid != INVALID_XTABLE_XID && stasis_transaction_table[xid%MAX_TRANSACTIONS].xid == xid;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
return ret;
return stasis_transaction_table_is_active(stasis_runtime_transaction_table(), xid);
}
int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN) {
TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS];
int stasis_transaction_table_roll_forward(stasis_transaction_table_t *tbl, int xid, lsn_t lsn, lsn_t prevLSN) {
stasis_transaction_table_entry_t * l = &tbl->table[xid%MAX_TRANSACTIONS];
if(l->xid == xid) {
// rolling forward CLRs / NTAs makes prevLSN decrease.
assert(l->prevLSN >= prevLSN);
} else {
pthread_mutex_lock(&stasis_transaction_table_mutex);
pthread_mutex_lock(&tbl->mut);
assert(l->xid == INVALID_XTABLE_XID);
l->xid = xid;
l->recLSN = lsn;
stasis_transaction_table_active_count++;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
tbl->active_count++;
pthread_mutex_unlock(&tbl->mut);
}
l->prevLSN = lsn;
return 0;
}
int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn,
int stasis_transaction_table_roll_forward_with_reclsn(stasis_transaction_table_t *tbl, int xid, lsn_t lsn,
lsn_t prevLSN,
lsn_t recLSN) {
assert(stasis_transaction_table[xid%MAX_TRANSACTIONS].recLSN == recLSN);
return stasis_transaction_table_roll_forward(xid, lsn, prevLSN);
assert(tbl->table[xid%MAX_TRANSACTIONS].recLSN == recLSN);
return stasis_transaction_table_roll_forward(tbl, xid, lsn, prevLSN);
}
TransactionLog * stasis_transaction_table_begin(int * xid) {
stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transaction_table_t *tbl, int * xid) {
int index = 0;
int xidCount_tmp;
pthread_mutex_lock(&stasis_transaction_table_mutex);
pthread_mutex_lock(&tbl->mut);
if( stasis_transaction_table_active_count == MAX_TRANSACTIONS ) {
pthread_mutex_unlock(&stasis_transaction_table_mutex);
if( tbl->active_count == MAX_TRANSACTIONS ) {
pthread_mutex_unlock(&tbl->mut);
*xid = LLADD_EXCEED_MAX_TRANSACTIONS;
return 0;
} else {
DEBUG("%s:%d activate in begin\n",__FILE__,__LINE__);
stasis_transaction_table_active_count++;
tbl->active_count++;
}
for(int i = 0; i < MAX_TRANSACTIONS; i++ ) {
stasis_transaction_table_xid_count++;
if( stasis_transaction_table[stasis_transaction_table_xid_count%MAX_TRANSACTIONS].xid == INVALID_XTABLE_XID ) {
index = stasis_transaction_table_xid_count%MAX_TRANSACTIONS;
tbl->xid_count++;
if( tbl->table[tbl->xid_count%MAX_TRANSACTIONS].xid == INVALID_XTABLE_XID ) {
index = tbl->xid_count%MAX_TRANSACTIONS;
break;
}
}
xidCount_tmp = stasis_transaction_table_xid_count;
xidCount_tmp = tbl->xid_count;
stasis_transaction_table[index].xid = PENDING_XTABLE_XID;
tbl->table[index].xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&stasis_transaction_table_mutex);
pthread_mutex_unlock(&tbl->mut);
*xid = xidCount_tmp;
return &stasis_transaction_table[index];
return &tbl->table[index];
}
TransactionLog * stasis_transaction_table_get(int xid) {
return &stasis_transaction_table[xid % MAX_TRANSACTIONS];
stasis_transaction_table_entry_t * stasis_transaction_table_get(stasis_transaction_table_t *tbl, int xid) {
if(tbl->table[xid % MAX_TRANSACTIONS].xid == xid) {
return &tbl->table[xid % MAX_TRANSACTIONS];
} else {
return NULL;
}
}
int stasis_transaction_table_commit(int xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
int stasis_transaction_table_commit(stasis_transaction_table_t *tbl, int xid) {
pthread_mutex_lock(&tbl->mut);
stasis_transaction_table[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
tbl->table[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
DEBUG("%s:%d deactivate %d\n",__FILE__,__LINE__,xid);
stasis_transaction_table_active_count--;
assert( stasis_transaction_table_active_count >= 0 );
pthread_mutex_unlock(&stasis_transaction_table_mutex);
tbl->active_count--;
assert( tbl->active_count >= 0 );
pthread_mutex_unlock(&tbl->mut);
return 0;
}
int stasis_transaction_table_forget(int xid) {
int stasis_transaction_table_forget(stasis_transaction_table_t *tbl, int xid) {
assert(xid != INVALID_XTABLE_XID);
TransactionLog * l = &stasis_transaction_table[xid%MAX_TRANSACTIONS];
stasis_transaction_table_entry_t * l = &tbl->table[xid%MAX_TRANSACTIONS];
if(l->xid == xid) {
pthread_mutex_lock(&stasis_transaction_table_mutex);
pthread_mutex_lock(&tbl->mut);
l->xid = INVALID_XTABLE_XID;
l->prevLSN = -1;
l->recLSN = -1;
stasis_transaction_table_active_count--;
assert(stasis_transaction_table_active_count >= 0);
pthread_mutex_unlock(&stasis_transaction_table_mutex);
tbl->active_count--;
assert(tbl->active_count >= 0);
pthread_mutex_unlock(&tbl->mut);
} else {
assert(l->xid == INVALID_XTABLE_XID);
}

View file

@ -29,7 +29,8 @@
static int stasis_initted = 0;
static stasis_log_t* stasis_log_file = 0;
stasis_dirty_page_table_t * stasis_dirty_page_table = 0;
static stasis_dirty_page_table_t * stasis_dirty_page_table = 0;
static stasis_transaction_table_t * stasis_transaction_table;
static stasis_truncation_t * stasis_truncation = 0;
static stasis_alloc_t * stasis_alloc = 0;
static stasis_allocation_policy_t * stasis_allocation_policy = 0;
@ -38,10 +39,12 @@ static stasis_buffer_manager_t * stasis_buffer_manager = 0;
void * stasis_runtime_buffer_manager() {
return stasis_buffer_manager;
}
void * stasis_runtime_dirty_page_table() {
return stasis_dirty_page_table;
}
void * stasis_runtime_transaction_table() {
return stasis_transaction_table;
}
void * stasis_runtime_alloc_state() {
return stasis_alloc;
}
@ -63,7 +66,6 @@ int Tinit() {
compensations_init();
stasis_transaction_table_init();
stasis_operation_table_init();
stasis_log_file = 0;
@ -82,7 +84,9 @@ int Tinit() {
assert(stasis_log_file != NULL);
}
stasis_dirty_page_table = stasis_dirty_page_table_init();
stasis_transaction_table = stasis_transaction_table_init();
stasis_dirty_page_table = stasis_dirty_page_table_init();
stasis_page_init(stasis_dirty_page_table);
stasis_buffer_manager = stasis_buffer_manager_factory(stasis_log_file, stasis_dirty_page_table);
@ -101,8 +105,9 @@ int Tinit() {
setupLockManagerCallbacksNil();
//setupLockManagerCallbacksPage();
stasis_recovery_initiate(stasis_log_file, stasis_alloc);
stasis_truncation = stasis_truncation_init(stasis_dirty_page_table, stasis_buffer_manager, stasis_log_file);
stasis_recovery_initiate(stasis_log_file, stasis_transaction_table, stasis_alloc);
stasis_truncation = stasis_truncation_init(stasis_dirty_page_table, stasis_transaction_table,
stasis_buffer_manager, stasis_log_file);
if(stasis_truncation_automatic) {
// should this be before InitiateRecovery?
stasis_truncation_thread_start(stasis_truncation);
@ -118,7 +123,7 @@ int Tbegin() {
int xid;
TransactionLog* newXact = stasis_transaction_table_begin(&xid);
stasis_transaction_table_entry_t* newXact = stasis_transaction_table_begin(stasis_transaction_table, &xid);
if(newXact != 0) {
stasis_log_begin_transaction(stasis_log_file, xid, newXact);
@ -136,8 +141,8 @@ compensated_function void Tupdate(int xid, pageid_t page,
assert(stasis_initted);
assert(page != INVALID_PAGE);
LogEntry * e;
assert(xid >= 0 && stasis_transaction_table[xid % MAX_TRANSACTIONS].xid == xid);
stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(xact);
Page * p = loadPageForOperation(xid, page, op);
try {
@ -148,10 +153,9 @@ compensated_function void Tupdate(int xid, pageid_t page,
if(p) writelock(p->rwlatch,0);
e = stasis_log_write_update(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS],
page, op, dat, datlen);
e = stasis_log_write_update(stasis_log_file, xact, page, op, dat, datlen);
assert(stasis_transaction_table[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
assert(xact->prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
stasis_operation_do(e, p);
freeLogEntry(e);
@ -163,7 +167,7 @@ compensated_function void Tupdate(int xid, pageid_t page,
void TreorderableUpdate(int xid, void * hp, pageid_t page,
const void *dat, size_t datlen, int op) {
stasis_log_reordering_handle_t * h = (typeof(h))hp;
assert(xid >= 0 && stasis_transaction_table[xid % MAX_TRANSACTIONS].xid == xid);
assert(stasis_transaction_table_is_active(stasis_transaction_table, xid));
Page * p = loadPage(xid, page);
assert(p);
try {
@ -190,9 +194,8 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page,
}
lsn_t TwritebackUpdate(int xid, pageid_t page,
const void *dat, size_t datlen, int op) {
assert(xid >= 0 && stasis_transaction_table[xid % MAX_TRANSACTIONS].xid == xid);
LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen);
TransactionLog* l = &stasis_transaction_table[xid % MAX_TRANSACTIONS];
stasis_transaction_table_entry_t* l = stasis_transaction_table_get(stasis_transaction_table, xid);
stasis_log_file->write_entry(stasis_log_file, e);
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
@ -208,7 +211,7 @@ void TreorderableWritebackUpdate(int xid, void* hp,
pageid_t page, const void * dat,
size_t datlen, int op) {
stasis_log_reordering_handle_t* h = hp;
assert(xid >= 0 && stasis_transaction_table[xid % MAX_TRANSACTIONS].xid == xid);
assert(stasis_transaction_table_is_active(stasis_transaction_table, xid));
pthread_mutex_lock(&h->mut);
LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen);
stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(0, e));
@ -264,22 +267,18 @@ compensated_function void TreadRaw(int xid, recordid rid, void * dat) {
static inline int TcommitHelper(int xid, int force) {
lsn_t lsn;
assert(xid >= 0);
#ifdef DEBUGGING
pthread_mutex_lock(&stasis_transaction_table_mutex);
assert(stasis_transaction_table_num_active <= MAX_TRANSACTIONS);
pthread_mutex_unlock(&stasis_transaction_table_mutex);
#endif
if(stasis_transaction_table[xid % MAX_TRANSACTIONS].prevLSN != INVALID_LSN) {
stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid);
if(xact->prevLSN != INVALID_LSN) {
lsn = stasis_log_commit_transaction(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS], force);
lsn = stasis_log_commit_transaction(stasis_log_file, xact, force);
if(globalLockManager.commit) { globalLockManager.commit(xid); }
stasis_alloc_committed(stasis_alloc, xid);
}
stasis_transaction_table_commit(xid);
stasis_transaction_table_commit(stasis_transaction_table, xid);
return 0;
}
@ -296,9 +295,9 @@ void TforceCommits() {
int Tprepare(int xid) {
assert(xid >= 0);
off_t i = xid % MAX_TRANSACTIONS;
assert(stasis_transaction_table[i].xid == xid);
stasis_log_prepare_transaction(stasis_log_file, &stasis_transaction_table[i]);
stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(xact);
stasis_log_prepare_transaction(stasis_log_file, xact);
return 0;
}
@ -306,13 +305,13 @@ int Tabort(int xid) {
lsn_t lsn;
assert(xid >= 0);
TransactionLog * t =&stasis_transaction_table[xid%MAX_TRANSACTIONS];
stasis_transaction_table_entry_t * t = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(t->xid == xid);
lsn = stasis_log_abort_transaction(stasis_log_file, t);
/** @todo is the order of the next two calls important? */
undoTrans(stasis_log_file, *t);
undoTrans(stasis_log_file, stasis_transaction_table, *t); // XXX don't really need to pass the whole table in...
if(globalLockManager.abort) { globalLockManager.abort(xid); }
stasis_alloc_aborted(stasis_alloc, xid);
@ -320,25 +319,24 @@ int Tabort(int xid) {
return 0;
}
int Tforget(int xid) {
TransactionLog * t = &stasis_transaction_table[xid%MAX_TRANSACTIONS];
stasis_transaction_table_entry_t * t = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(t->xid == xid);
stasis_log_end_aborted_transaction(stasis_log_file, t);
stasis_transaction_table_forget(t->xid);
stasis_transaction_table_forget(stasis_transaction_table, t->xid);
return 0;
}
int Tdeinit() {
int * active = stasis_transaction_table_list_active();
int count = stasis_transaction_table_num_active();
int * active = stasis_transaction_table_list_active(stasis_transaction_table);
int count = stasis_transaction_table_num_active(stasis_transaction_table);
for(int i = 0; i < count; i++) {
if(!stasis_suppress_unclean_shutdown_warnings) {
fprintf(stderr, "WARNING: Tdeinit() is aborting transaction %d\n",
stasis_transaction_table[i].xid);
fprintf(stderr, "WARNING: Tdeinit() is aborting transaction %d\n", active[i]);
}
Tabort(active[i]);
}
assert( stasis_transaction_table_num_active() == 0 );
assert( stasis_transaction_table_num_active(stasis_transaction_table) == 0 );
free(active);
@ -352,7 +350,7 @@ int Tdeinit() {
stasis_log_group_force_t * group_force = stasis_log_file->group_force;
stasis_log_file->close(stasis_log_file);
if(group_force) { stasis_log_group_force_deinit(group_force); }
stasis_transaction_table_deinit();
stasis_transaction_table_deinit(stasis_transaction_table);
stasis_dirty_page_table_deinit(stasis_dirty_page_table);
stasis_initted = 0;
@ -373,7 +371,7 @@ int TuncleanShutdown() {
// XXX: close_file?
stasis_page_deinit();
stasis_log_file->close(stasis_log_file);
stasis_transaction_table_deinit();
stasis_transaction_table_deinit(stasis_transaction_table);
stasis_dirty_page_table_deinit(stasis_dirty_page_table);
// Reset it here so the warnings will appear if a new stasis
@ -401,14 +399,15 @@ typedef struct {
} stasis_nta_handle;
int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) {
stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(xid >= 0);
void * e = stasis_log_begin_nta(stasis_log_file,
&stasis_transaction_table[xid % MAX_TRANSACTIONS],
xact,
op, dat, datSize);
// HACK: breaks encapsulation.
stasis_operation_do(e, NULL);
stasis_log_end_nta(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS], e);
stasis_log_end_nta(stasis_log_file, xact, e);
return 0;
}
@ -416,7 +415,7 @@ int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) {
void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
assert(xid >= 0);
void * ret = stasis_log_begin_nta(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS], op, dat, datSize);
void * ret = stasis_log_begin_nta(stasis_log_file, stasis_transaction_table_get(stasis_transaction_table, xid), op, dat, datSize);
DEBUG("Begin Nested Top Action e->LSN: %ld\n", e->LSN);
return ret;
}
@ -427,7 +426,7 @@ void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
*/
lsn_t TendNestedTopAction(int xid, void * handle) {
lsn_t ret = stasis_log_end_nta(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS], handle);
lsn_t ret = stasis_log_end_nta(stasis_log_file, stasis_transaction_table_get(stasis_transaction_table, xid), handle);
DEBUG("NestedTopAction CLR %d, LSN: %ld type: %ld (undoing: %ld, next to undo: %ld)\n", e->xid,
clrLSN, undoneLSN, *prevLSN);

View file

@ -11,6 +11,7 @@ struct stasis_truncation_t {
pthread_mutex_t shutdown_mutex;
pthread_cond_t shutdown_cond;
stasis_dirty_page_table_t * dirty_pages;
stasis_transaction_table_t * transaction_table;
stasis_buffer_manager_t * buffer_manager;
stasis_log_t * log;
};
@ -24,13 +25,15 @@ struct stasis_truncation_t {
#define TRUNCATE_INTERVAL 1
#define MIN_INCREMENTAL_TRUNCATION (1024 * 1024 * 25)
#endif
stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_buffer_manager_t *buffer_manager, stasis_log_t *log) {
stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_transaction_table_t * tbl,
stasis_buffer_manager_t *buffer_manager, stasis_log_t *log) {
stasis_truncation_t * ret = malloc(sizeof(*ret));
ret->initialized = 1;
ret->automaticallyTruncating = 0;
pthread_mutex_init(&ret->shutdown_mutex, 0);
pthread_cond_init(&ret->shutdown_cond, 0);
ret->dirty_pages = dpt;
ret->transaction_table = tbl;
ret->buffer_manager = buffer_manager;
ret->log = log;
return ret;
@ -92,7 +95,7 @@ int stasis_truncation_truncate(stasis_truncation_t* trunc, int force) {
//dirty pages.
lsn_t page_rec_lsn = stasis_dirty_page_table_minRecLSN(trunc->dirty_pages);
lsn_t xact_rec_lsn = stasis_transaction_table_minRecLSN();
lsn_t xact_rec_lsn = stasis_transaction_table_minRecLSN(trunc->transaction_table);
lsn_t flushed_lsn = trunc->log->first_unstable_lsn(trunc->log, LOG_FORCE_WAL);
lsn_t rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;

View file

@ -266,7 +266,7 @@ void stasis_log_force(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode
Inform the logging layer that a new transaction has begun, and
obtain a handle.
*/
void stasis_log_begin_transaction(stasis_log_t* log, int xid, TransactionLog* l);
void stasis_log_begin_transaction(stasis_log_t* log, int xid, stasis_transaction_table_entry_t* l);
/**
Write a transaction PREPARE to the log tail. Blocks until the
@ -274,27 +274,27 @@ void stasis_log_begin_transaction(stasis_log_t* log, int xid, TransactionLog* l)
@return the lsn of the prepare log entry
*/
lsn_t stasis_log_prepare_transaction(stasis_log_t* log, TransactionLog * l);
lsn_t stasis_log_prepare_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l);
/**
Write a transaction COMMIT to the log tail. Blocks until the commit
record is stable.
@return the lsn of the commit log entry.
*/
lsn_t stasis_log_commit_transaction(stasis_log_t* log, TransactionLog * l, int force);
lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l, int force);
/**
Write a transaction ABORT to the log tail. Does not force the log.
@return the lsn of the abort log entry.
*/
lsn_t stasis_log_abort_transaction(stasis_log_t* log, TransactionLog * l);
lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l);
/**
Write a end transaction record. This entry tells recovery's undo
phase that it may safely ignore the transaction.
*/
lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, TransactionLog * l);
lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, stasis_transaction_table_entry_t * l);
/**
stasis_log_write_update writes an UPDATELOG log record to the log tail. It
@ -303,7 +303,7 @@ lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, TransactionLog * l)
state of the parameter l.
*/
LogEntry * stasis_log_write_update(stasis_log_t* log,
TransactionLog * l, pageid_t page, unsigned int operation,
stasis_transaction_table_entry_t * l, pageid_t page, unsigned int operation,
const byte * arg, size_t arg_size);
/**
@ -319,7 +319,7 @@ lsn_t stasis_log_write_clr(stasis_log_t* log, const LogEntry * e);
lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid, lsn_t prev_lsn);
LogEntry * stasis_log_begin_nta(stasis_log_t* log, TransactionLog * l, unsigned int op,
LogEntry * stasis_log_begin_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, unsigned int op,
const byte * arg, size_t arg_size);
lsn_t stasis_log_end_nta(stasis_log_t* log, TransactionLog * l, LogEntry * e);
lsn_t stasis_log_end_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, LogEntry * e);
#endif

View file

@ -13,7 +13,7 @@ typedef struct {
} stasis_log_reordering_op_t;
typedef struct stasis_log_reordering_handle_t {
TransactionLog *l;
stasis_transaction_table_entry_t *l;
stasis_log_t * log;
pthread_mutex_t mut;
pthread_cond_t done;
@ -34,7 +34,7 @@ typedef struct stasis_log_reordering_handle_t {
void stasis_log_reordering_handle_flush(stasis_log_reordering_handle_t * h);
void stasis_log_reordering_handle_close(stasis_log_reordering_handle_t * h);
stasis_log_reordering_handle_t *
stasis_log_reordering_handle_open(TransactionLog * l,
stasis_log_reordering_handle_open(stasis_transaction_table_entry_t * l,
stasis_log_t* log,
size_t chunk_len,
size_t max_len,

View file

@ -1,11 +1,15 @@
#ifndef __LLADD_RECOVERY2_H
#define __LLADD_RECOVERY2_H
#include <stasis/transactionTable.h>
#include <stasis/logger/logger2.h>
#include <stasis/operations/alloc.h>
void stasis_recovery_initiate(stasis_log_t* log, stasis_alloc_t * alloc);
/** This really doesn't belong in recovery.c, but there's so much code overlap, it doesn't make sense not to put it there. */
void undoTrans(stasis_log_t*log, TransactionLog transaction);
void stasis_recovery_initiate(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_alloc_t * alloc);
/** This really doesn't belong in recovery.c, but there's so much code overlap, it doesn't make sense not to put it there.
*
* XXX undoTrans should not take the entire transaction table as an argument. Instead, it should place its transaction argument directly into the list of transactions that undo processes.
* */
void undoTrans(stasis_log_t*log, stasis_transaction_table_t * tbl, stasis_transaction_table_entry_t transaction);
#endif

View file

@ -9,31 +9,27 @@
#define TRANSACTIONTABLE_H_
#include <stasis/common.h>
typedef struct TransactionLog TransactionLog;
typedef struct stasis_transaction_table_entry_t stasis_transaction_table_entry_t;
typedef struct stasis_transaction_table_t stasis_transaction_table_t;
/**
Contains the state needed by the logging layer to perform
operations on a transaction.
*/
struct TransactionLog {
struct stasis_transaction_table_entry_t {
int xid;
lsn_t prevLSN;
lsn_t recLSN;
pthread_mutex_t mut;
};
/**
XXX TransactionTable should be private to transactional2.c!
*/
extern TransactionLog stasis_transaction_table[MAX_TRANSACTIONS];
/**
Initialize Stasis' transaction table. Called by Tinit() and unit
tests that wish to test portions of Stasis in isolation.
*/
void stasis_transaction_table_init();
stasis_transaction_table_t* stasis_transaction_table_init();
/** Free resources associated with the transaction table */
void stasis_transaction_table_deinit();
void stasis_transaction_table_deinit(stasis_transaction_table_t*);
/**
* Used by recovery to prevent reuse of old transaction ids.
*
@ -41,33 +37,34 @@ void stasis_transaction_table_deinit();
*
* @param xid The highest transaction id issued so far.
*/
void stasis_transaction_table_max_transaction_id_set(int xid);
void stasis_transaction_table_max_transaction_id_set(stasis_transaction_table_t*,int xid);
/**
* Used by test cases to mess with internal transaction table state.
*
* @param xid The new active transaction count.
*/
void stasis_transaction_table_active_transaction_count_set(int xid);
void stasis_transaction_table_active_transaction_count_set(stasis_transaction_table_t*,int xid);
int stasis_transaction_table_roll_forward(int xid, lsn_t lsn, lsn_t prevLSN);
int stasis_transaction_table_roll_forward(stasis_transaction_table_t*,int xid, lsn_t lsn, lsn_t prevLSN);
/**
@todo update Tprepare() to not write reclsn to log, then remove
this function.
*/
int stasis_transaction_table_roll_forward_with_reclsn(int xid, lsn_t lsn,
int stasis_transaction_table_roll_forward_with_reclsn(stasis_transaction_table_t*,int xid, lsn_t lsn,
lsn_t prevLSN,
lsn_t recLSN);
/**
This is used by log truncation.
*/
lsn_t stasis_transaction_table_minRecLSN();
lsn_t stasis_transaction_table_minRecLSN(stasis_transaction_table_t*);
TransactionLog * stasis_transaction_table_begin(int * xid);
TransactionLog * stasis_transaction_table_get(int xid);
int stasis_transaction_table_commit(int xid);
int stasis_transaction_table_forget(int xid);
stasis_transaction_table_entry_t * stasis_transaction_table_begin(stasis_transaction_table_t*,int * xid);
stasis_transaction_table_entry_t * stasis_transaction_table_get(stasis_transaction_table_t*,int xid);
int stasis_transaction_table_commit(stasis_transaction_table_t*,int xid);
int stasis_transaction_table_forget(stasis_transaction_table_t*,int xid);
int stasis_transaction_table_num_active();
int* stasis_transaction_table_list_active();
int stasis_transaction_table_num_active(stasis_transaction_table_t*);
int* stasis_transaction_table_list_active(stasis_transaction_table_t*);
int stasis_transaction_table_is_active(stasis_transaction_table_t*, int xid);
#endif /* TRANSACTIONTABLE_H_ */

View file

@ -774,6 +774,8 @@ void * stasis_log(void);
*/
void * stasis_runtime_dirty_page_table();
void * stasis_runtime_transaction_table();
void * stasis_runtime_alloc_state();
void * stasis_runtime_buffer_manager();

View file

@ -63,9 +63,11 @@ typedef struct stasis_truncation_t stasis_truncation_t;
#include <stasis/logger/logger2.h>
#include <stasis/dirtyPageTable.h>
#include <stasis/transactionTable.h>
#include <stasis/bufferManager.h>
stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_buffer_manager_t * bufferManager, stasis_log_t * log);
stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_transaction_table_t * tbl,
stasis_buffer_manager_t * bufferManager, stasis_log_t * log);
void stasis_truncation_deinit(stasis_truncation_t * trunc);
/**

View file

@ -427,7 +427,7 @@ void reopenLogWorkload(int truncating) {
const int SYNC_POINT = 900;
stasis_log_t * stasis_log_file = 0;
stasis_transaction_table_active_transaction_count_set(0);
stasis_transaction_table_active_transaction_count_set(stasis_runtime_transaction_table(), 0);
if(LOG_TO_FILE == stasis_log_type) {
stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name,
@ -441,7 +441,7 @@ void reopenLogWorkload(int truncating) {
}
int xid = 1;
TransactionLog l;
stasis_transaction_table_entry_t l;
pthread_mutex_init(&l.mut,0);
stasis_log_begin_transaction(stasis_log_file, xid, &l);
lsn_t startLSN = 0;

View file

@ -642,7 +642,7 @@ START_TEST(operation_reorderable) {
stasis_log_reordering_handle_t * rh
= stasis_log_reordering_handle_open(
&stasis_transaction_table[xid[0]% MAX_TRANSACTIONS],
stasis_transaction_table_get(stasis_runtime_transaction_table(), xid[0]),
stasis_log(),
100, // bytes (far too low!)
10, // log entries