add deferred, end of transaction callbacks. addresses issue 15

This commit is contained in:
Sears Russell 2009-12-15 06:30:37 +00:00
parent 5e2c02149c
commit 7ff80f37ef
7 changed files with 133 additions and 40 deletions

View file

@ -157,11 +157,17 @@ void stasis_log_begin_transaction(stasis_log_t* log, int xid, stasis_transaction
tl->recLSN = INVALID_LSN;
}
lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) {
return stasis_log_write_common(log, l, XABORT);
lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_t *table, stasis_transaction_table_entry_t * l) {
stasis_transaction_table_invoke_callbacks(table, l, PRE_COMMIT);
lsn_t ret = stasis_log_write_common(log, l, XABORT);
// rest of callbacks happen after rollback completes, in end_aborted_transaction.
return ret;
}
lsn_t stasis_log_end_aborted_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) {
return stasis_log_write_common(log, l, XEND);
lsn_t stasis_log_end_aborted_transaction(stasis_log_t* log, stasis_transaction_table_t *table, stasis_transaction_table_entry_t * l) {
lsn_t ret = stasis_log_write_common(log, l, XEND);
stasis_transaction_table_invoke_callbacks(table, l, AT_COMMIT);
stasis_transaction_table_invoke_callbacks(table, l, POST_COMMIT);
return ret;
}
lsn_t stasis_log_prepare_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) {
lsn_t lsn = stasis_log_write_prepare(log, l);
@ -170,11 +176,14 @@ lsn_t stasis_log_prepare_transaction(stasis_log_t* log, stasis_transaction_table
}
lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l, int force) {
lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_t *table, stasis_transaction_table_entry_t * l, int force) {
stasis_transaction_table_invoke_callbacks(table, l, PRE_COMMIT);
lsn_t lsn = stasis_log_write_common(log, l, XCOMMIT);
stasis_transaction_table_invoke_callbacks(table, l, AT_COMMIT);
if(force) {
stasis_log_force(log, lsn, LOG_FORCE_COMMIT);
}
stasis_transaction_table_invoke_callbacks(table, l, POST_COMMIT);
return lsn;
}

View file

@ -96,6 +96,8 @@ typedef struct {
struct stasis_alloc_t {
pthread_mutex_t mut;
pageid_t lastFreepage;
int callback_id;
stasis_transaction_table_t * xact_table;
stasis_allocation_policy_t * allocPolicy;
};
@ -122,7 +124,6 @@ static int op_alloc(const LogEntry* e, Page* p) {
// otherwise, no preimage
assert(e->update.arg_size == sizeof(alloc_arg));
}
return ret;
}
@ -203,14 +204,25 @@ stasis_operation_impl stasis_op_impl_realloc() {
return o;
}
static void stasis_alloc_register_old_regions();
stasis_alloc_t* stasis_alloc_init(stasis_allocation_policy_t * allocPolicy) {
int stasis_alloc_callback(int xid, void * arg) {
stasis_alloc_t * alloc = arg;
pthread_mutex_lock(&alloc->mut);
stasis_allocation_policy_transaction_completed(alloc->allocPolicy, xid);
pthread_mutex_unlock(&alloc->mut);
return 0;
}
stasis_alloc_t* stasis_alloc_init(stasis_transaction_table_t * tbl, stasis_allocation_policy_t * allocPolicy) {
stasis_alloc_t * alloc = malloc(sizeof(*alloc));
alloc->lastFreepage = PAGEID_T_MAX;
alloc->allocPolicy = allocPolicy;
pthread_mutex_init(&alloc->mut, 0);
alloc->callback_id = stasis_transaction_table_register_callback(tbl, stasis_alloc_callback, AT_COMMIT);
alloc->xact_table = tbl;
return alloc;
}
static void stasis_alloc_register_old_regions();
void stasis_alloc_post_init(stasis_alloc_t * alloc) {
stasis_alloc_register_old_regions(alloc);
}
@ -343,18 +355,10 @@ recordid Talloc(int xid, unsigned long size) {
releasePage(p);
pthread_mutex_unlock(&alloc->mut);
return rid; // TODO return NULLRID on error
}
stasis_transaction_table_set_argument(alloc->xact_table, xid, alloc->callback_id,
AT_COMMIT, alloc);
void stasis_alloc_aborted(stasis_alloc_t* alloc, int xid) {
pthread_mutex_lock(&alloc->mut);
stasis_allocation_policy_transaction_completed(alloc->allocPolicy, xid);
pthread_mutex_unlock(&alloc->mut);
}
void stasis_alloc_committed(stasis_alloc_t* alloc, int xid) {
pthread_mutex_lock(&alloc->mut);
stasis_allocation_policy_transaction_completed(alloc->allocPolicy, xid);
pthread_mutex_unlock(&alloc->mut);
return rid; // TODO return NULLRID on error
}
recordid TallocFromPage(int xid, pageid_t page, unsigned long size) {
@ -397,6 +401,8 @@ recordid TallocFromPage(int xid, pageid_t page, unsigned long size) {
releasePage(p);
pthread_mutex_unlock(&alloc->mut);
stasis_transaction_table_set_argument(alloc->xact_table, xid, alloc->callback_id,
AT_COMMIT, alloc);
return rid;
}
@ -458,6 +464,9 @@ void Tdealloc(int xid, recordid rid) {
free(preimage);
stasis_transaction_table_set_argument(alloc->xact_table, xid, alloc->callback_id,
AT_COMMIT, alloc);
}
int TrecordType(int xid, recordid rid) {

View file

@ -24,6 +24,8 @@ struct stasis_transaction_table_t {
*/
pthread_key_t key;
stasis_transaction_table_entry_t table[MAX_TRANSACTIONS];
stasis_transaction_table_callback_t * commitCallbacks[3];
int commitCallbackCount[3];
};
static inline int test_and_set_entry(stasis_transaction_table_entry_t* e, int old, int new) {
@ -121,6 +123,53 @@ int stasis_transaction_table_is_active(stasis_transaction_table_t *tbl, int xid)
return xid >= 0 && tbl->table[xid].xid == xid;
}
int stasis_transaction_table_register_callback(stasis_transaction_table_t *tbl,
stasis_transaction_table_callback_t cb,
stasis_transaction_table_callback_type_t type) {
assert(type >= 0 && type < 3);
stasis_transaction_table_callback_t **list = &tbl->commitCallbacks[type];
int *count = &tbl->commitCallbackCount[type];
*list = realloc(*list, (1+*count) * sizeof(*list[0]));
*list[*count] = cb;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
void *** args;
args = &tbl->table[i].commitArgs[type];
*args = realloc(*args, (1+*count) * sizeof(*args[0]));
*args[*count] = 0;
}
*count = 1 + *count;
return 0;
}
int stasis_transaction_table_invoke_callbacks(stasis_transaction_table_t *tbl,
stasis_transaction_table_entry_t * entry,
stasis_transaction_table_callback_type_t type) {
assert(type >= 0 && type < 3);
stasis_transaction_table_callback_t *list = tbl->commitCallbacks[type];
int count = tbl->commitCallbackCount[type];
void **args = entry->commitArgs[type];
int ret = 0;
for(int i = 0; i < count; i++) {
if(args[i]) {
ret = list[i](entry->xid, args[i]) || ret;
args[i] = 0;
}
}
return ret;
}
int stasis_transaction_table_set_argument(stasis_transaction_table_t *tbl, int xid, int callback_id,
stasis_transaction_table_callback_type_t type, void *arg) {
assert(type >= 0 && type < 3);
int count = tbl->commitCallbackCount[type];
void ** args = tbl->table[xid].commitArgs[type];
assert(count > callback_id);
args[callback_id] = arg;
return 0;
}
int* stasis_transaction_table_list_active(stasis_transaction_table_t *tbl, int *count) {
int * ret = malloc(sizeof(*ret));
ret[0] = INVALID_XID;
@ -147,11 +196,19 @@ stasis_transaction_table_t * stasis_transaction_table_init() {
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
tbl->table[i].xid = INVALID_XTABLE_XID;
tbl->table[i].commitArgs[0] = 0;
tbl->table[i].commitArgs[1] = 0;
tbl->table[i].commitArgs[2] = 0;
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_init(&(tbl->table[i].mut),0);
#endif
}
for(int i = 0; i < 3; i++) {
tbl->commitCallbacks[i] = 0;
tbl->commitCallbackCount[i] = 0;
}
DEBUG("initted xact table!\n");
pthread_key_create(&tbl->key, stasis_transaction_table_thread_destructor);
@ -162,11 +219,19 @@ stasis_transaction_table_t * stasis_transaction_table_init() {
void stasis_transaction_table_deinit(stasis_transaction_table_t *tbl) {
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_destroy(&tbl->mut);
#endif
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_destroy(&tbl->table[i].mut);
}
#endif
for(int j = 0; j < 3; j++) {
if(tbl->table[i].commitArgs[j]) { free(tbl->table[i].commitArgs[j]); }
}
}
for(int j = 0; j < 3; j++) {
if(tbl->commitCallbacks[j]) { free(tbl->commitCallbacks[j]); }
}
pthread_key_delete(tbl->key);
free(tbl);

View file

@ -96,7 +96,7 @@ int Tinit() {
stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table, stasis_buffer_manager); // xxx circular dependency.
pageOperationsInit();
stasis_allocation_policy = stasis_allocation_policy_init();
stasis_alloc = stasis_alloc_init(stasis_allocation_policy);
stasis_alloc = stasis_alloc_init(stasis_transaction_table, stasis_allocation_policy);
TnaiveHashInit();
LinearHashNTAInit();
@ -272,12 +272,8 @@ static inline int TcommitHelper(int xid, int force) {
stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid);
if(xact->prevLSN != INVALID_LSN) {
lsn = stasis_log_commit_transaction(stasis_log_file, xact, force);
lsn = stasis_log_commit_transaction(stasis_log_file, stasis_transaction_table, xact, force);
if(globalLockManager.commit) { globalLockManager.commit(xid); }
stasis_alloc_committed(stasis_alloc, xid);
}
stasis_transaction_table_commit(stasis_transaction_table, xid);
@ -310,20 +306,18 @@ int Tabort(int xid) {
stasis_transaction_table_entry_t * t = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(t->xid == xid);
lsn = stasis_log_abort_transaction(stasis_log_file, t);
lsn = stasis_log_abort_transaction(stasis_log_file, stasis_transaction_table, t);
/** @todo is the order of the next two calls important? */
undoTrans(stasis_log_file, stasis_transaction_table, *t); // XXX don't really need to pass the whole table in...
if(globalLockManager.abort) { globalLockManager.abort(xid); }
stasis_alloc_aborted(stasis_alloc, xid);
return 0;
}
int Tforget(int xid) {
stasis_transaction_table_entry_t * t = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(t->xid == xid);
stasis_log_end_aborted_transaction(stasis_log_file, t);
stasis_log_end_aborted_transaction(stasis_log_file, stasis_transaction_table, t);
stasis_transaction_table_forget(stasis_transaction_table, t->xid);
return 0;
}

View file

@ -281,20 +281,20 @@ lsn_t stasis_log_prepare_transaction(stasis_log_t* log, stasis_transaction_table
@return the lsn of the commit log entry.
*/
lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l, int force);
lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_transaction_table_entry_t * l, int force);
/**
Write a transaction ABORT to the log tail. Does not force the log.
@return the lsn of the abort log entry.
*/
lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l);
lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_transaction_table_entry_t * l);
/**
Write a end transaction record. This entry tells recovery's undo
phase that it may safely ignore the transaction.
*/
lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, stasis_transaction_table_entry_t * l);
lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, stasis_transaction_table_t *tbl, stasis_transaction_table_entry_t * l);
/**
stasis_log_write_update writes an UPDATELOG log record to the log tail. It

View file

@ -20,10 +20,7 @@ stasis_operation_impl stasis_op_impl_realloc();
typedef struct stasis_alloc_t stasis_alloc_t;
void stasis_alloc_aborted(stasis_alloc_t* alloc, int xid);
void stasis_alloc_committed(stasis_alloc_t* alloc, int xid);
stasis_alloc_t* stasis_alloc_init(stasis_allocation_policy_t * allocPolicy);
stasis_alloc_t* stasis_alloc_init(stasis_transaction_table_t * tbl, stasis_allocation_policy_t * allocPolicy);
void stasis_alloc_post_init(stasis_alloc_t* alloc);
void stasis_alloc_deinit(stasis_alloc_t* alloc);
/**

View file

@ -10,6 +10,9 @@
#include <stasis/common.h>
typedef int (*stasis_transaction_table_callback_t)(int, void*);
typedef struct stasis_transaction_table_callback_list_t stasis_transaction_table_callback_list_t;
typedef struct stasis_transaction_table_entry_t stasis_transaction_table_entry_t;
typedef struct stasis_transaction_table_t stasis_transaction_table_t;
@ -21,6 +24,7 @@ struct stasis_transaction_table_entry_t {
int xid;
lsn_t prevLSN;
lsn_t recLSN;
void ** commitArgs[3];
#ifndef HAVE_GCC_ATOMICS
pthread_mutex_t mut;
#endif
@ -55,4 +59,19 @@ int stasis_transaction_table_num_active_threads(stasis_transaction_table_t*);
int* stasis_transaction_table_list_active(stasis_transaction_table_t*, int *count);
int stasis_transaction_table_is_active(stasis_transaction_table_t*, int xid);
typedef enum {
PRE_COMMIT = 0,
AT_COMMIT = 1,
POST_COMMIT = 2
} stasis_transaction_table_callback_type_t;
int stasis_transaction_table_register_callback(stasis_transaction_table_t *tbl,
stasis_transaction_table_callback_t cb,
stasis_transaction_table_callback_type_t type);
int stasis_transaction_table_invoke_callbacks(stasis_transaction_table_t *tbl,
stasis_transaction_table_entry_t *entry,
stasis_transaction_table_callback_type_t type);
int stasis_transaction_table_set_argument(stasis_transaction_table_t *tbl, int xid, int callback_id,
stasis_transaction_table_callback_type_t type, void *arg);
#endif /* TRANSACTIONTABLE_H_ */