diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index 2452e1c..477d518 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -157,11 +157,17 @@ void stasis_log_begin_transaction(stasis_log_t* log, int xid, stasis_transaction tl->recLSN = INVALID_LSN; } -lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) { - return stasis_log_write_common(log, l, XABORT); +lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_t *table, stasis_transaction_table_entry_t * l) { + stasis_transaction_table_invoke_callbacks(table, l, PRE_COMMIT); + lsn_t ret = stasis_log_write_common(log, l, XABORT); + // rest of callbacks happen after rollback completes, in end_aborted_transaction. + return ret; } -lsn_t stasis_log_end_aborted_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) { - return stasis_log_write_common(log, l, XEND); +lsn_t stasis_log_end_aborted_transaction(stasis_log_t* log, stasis_transaction_table_t *table, stasis_transaction_table_entry_t * l) { + lsn_t ret = stasis_log_write_common(log, l, XEND); + stasis_transaction_table_invoke_callbacks(table, l, AT_COMMIT); + stasis_transaction_table_invoke_callbacks(table, l, POST_COMMIT); + return ret; } lsn_t stasis_log_prepare_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l) { lsn_t lsn = stasis_log_write_prepare(log, l); @@ -170,11 +176,14 @@ lsn_t stasis_log_prepare_transaction(stasis_log_t* log, stasis_transaction_table } -lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l, int force) { +lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_t *table, stasis_transaction_table_entry_t * l, int force) { + stasis_transaction_table_invoke_callbacks(table, l, PRE_COMMIT); lsn_t lsn = stasis_log_write_common(log, l, XCOMMIT); + stasis_transaction_table_invoke_callbacks(table, l, AT_COMMIT); if(force) { stasis_log_force(log, lsn, LOG_FORCE_COMMIT); } + stasis_transaction_table_invoke_callbacks(table, l, POST_COMMIT); return lsn; } diff --git a/src/stasis/operations/alloc.c b/src/stasis/operations/alloc.c index f5001d4..4742fa2 100644 --- a/src/stasis/operations/alloc.c +++ b/src/stasis/operations/alloc.c @@ -94,9 +94,11 @@ typedef struct { } alloc_arg; struct stasis_alloc_t { - pthread_mutex_t mut; - pageid_t lastFreepage; - stasis_allocation_policy_t * allocPolicy; + pthread_mutex_t mut; + pageid_t lastFreepage; + int callback_id; + stasis_transaction_table_t * xact_table; + stasis_allocation_policy_t * allocPolicy; }; static int op_alloc(const LogEntry* e, Page* p) { @@ -122,7 +124,6 @@ static int op_alloc(const LogEntry* e, Page* p) { // otherwise, no preimage assert(e->update.arg_size == sizeof(alloc_arg)); } - return ret; } @@ -203,14 +204,25 @@ stasis_operation_impl stasis_op_impl_realloc() { return o; } -static void stasis_alloc_register_old_regions(); -stasis_alloc_t* stasis_alloc_init(stasis_allocation_policy_t * allocPolicy) { +int stasis_alloc_callback(int xid, void * arg) { + stasis_alloc_t * alloc = arg; + pthread_mutex_lock(&alloc->mut); + stasis_allocation_policy_transaction_completed(alloc->allocPolicy, xid); + pthread_mutex_unlock(&alloc->mut); + return 0; +} + +stasis_alloc_t* stasis_alloc_init(stasis_transaction_table_t * tbl, stasis_allocation_policy_t * allocPolicy) { stasis_alloc_t * alloc = malloc(sizeof(*alloc)); alloc->lastFreepage = PAGEID_T_MAX; alloc->allocPolicy = allocPolicy; pthread_mutex_init(&alloc->mut, 0); + alloc->callback_id = stasis_transaction_table_register_callback(tbl, stasis_alloc_callback, AT_COMMIT); + alloc->xact_table = tbl; return alloc; } + +static void stasis_alloc_register_old_regions(); void stasis_alloc_post_init(stasis_alloc_t * alloc) { stasis_alloc_register_old_regions(alloc); } @@ -343,18 +355,10 @@ recordid Talloc(int xid, unsigned long size) { releasePage(p); pthread_mutex_unlock(&alloc->mut); - return rid; // TODO return NULLRID on error -} + stasis_transaction_table_set_argument(alloc->xact_table, xid, alloc->callback_id, + AT_COMMIT, alloc); -void stasis_alloc_aborted(stasis_alloc_t* alloc, int xid) { - pthread_mutex_lock(&alloc->mut); - stasis_allocation_policy_transaction_completed(alloc->allocPolicy, xid); - pthread_mutex_unlock(&alloc->mut); -} -void stasis_alloc_committed(stasis_alloc_t* alloc, int xid) { - pthread_mutex_lock(&alloc->mut); - stasis_allocation_policy_transaction_completed(alloc->allocPolicy, xid); - pthread_mutex_unlock(&alloc->mut); + return rid; // TODO return NULLRID on error } recordid TallocFromPage(int xid, pageid_t page, unsigned long size) { @@ -397,6 +401,8 @@ recordid TallocFromPage(int xid, pageid_t page, unsigned long size) { releasePage(p); pthread_mutex_unlock(&alloc->mut); + stasis_transaction_table_set_argument(alloc->xact_table, xid, alloc->callback_id, + AT_COMMIT, alloc); return rid; } @@ -458,6 +464,9 @@ void Tdealloc(int xid, recordid rid) { free(preimage); + stasis_transaction_table_set_argument(alloc->xact_table, xid, alloc->callback_id, + AT_COMMIT, alloc); + } int TrecordType(int xid, recordid rid) { diff --git a/src/stasis/transactionTable.c b/src/stasis/transactionTable.c index e09e188..07e8e75 100644 --- a/src/stasis/transactionTable.c +++ b/src/stasis/transactionTable.c @@ -24,6 +24,8 @@ struct stasis_transaction_table_t { */ pthread_key_t key; stasis_transaction_table_entry_t table[MAX_TRANSACTIONS]; + stasis_transaction_table_callback_t * commitCallbacks[3]; + int commitCallbackCount[3]; }; static inline int test_and_set_entry(stasis_transaction_table_entry_t* e, int old, int new) { @@ -121,6 +123,53 @@ int stasis_transaction_table_is_active(stasis_transaction_table_t *tbl, int xid) return xid >= 0 && tbl->table[xid].xid == xid; } +int stasis_transaction_table_register_callback(stasis_transaction_table_t *tbl, + stasis_transaction_table_callback_t cb, + stasis_transaction_table_callback_type_t type) { + assert(type >= 0 && type < 3); + stasis_transaction_table_callback_t **list = &tbl->commitCallbacks[type]; + int *count = &tbl->commitCallbackCount[type]; + + *list = realloc(*list, (1+*count) * sizeof(*list[0])); + *list[*count] = cb; + for(int i = 0; i < MAX_TRANSACTIONS; i++) { + void *** args; + args = &tbl->table[i].commitArgs[type]; + *args = realloc(*args, (1+*count) * sizeof(*args[0])); + *args[*count] = 0; + } + *count = 1 + *count; + + return 0; +} + +int stasis_transaction_table_invoke_callbacks(stasis_transaction_table_t *tbl, + stasis_transaction_table_entry_t * entry, + stasis_transaction_table_callback_type_t type) { + assert(type >= 0 && type < 3); + stasis_transaction_table_callback_t *list = tbl->commitCallbacks[type]; + int count = tbl->commitCallbackCount[type]; + void **args = entry->commitArgs[type]; + + int ret = 0; + for(int i = 0; i < count; i++) { + if(args[i]) { + ret = list[i](entry->xid, args[i]) || ret; + args[i] = 0; + } + } + return ret; +} +int stasis_transaction_table_set_argument(stasis_transaction_table_t *tbl, int xid, int callback_id, + stasis_transaction_table_callback_type_t type, void *arg) { + assert(type >= 0 && type < 3); + int count = tbl->commitCallbackCount[type]; + void ** args = tbl->table[xid].commitArgs[type]; + assert(count > callback_id); + args[callback_id] = arg; + return 0; +} + int* stasis_transaction_table_list_active(stasis_transaction_table_t *tbl, int *count) { int * ret = malloc(sizeof(*ret)); ret[0] = INVALID_XID; @@ -147,11 +196,19 @@ stasis_transaction_table_t * stasis_transaction_table_init() { for(int i = 0; i < MAX_TRANSACTIONS; i++) { tbl->table[i].xid = INVALID_XTABLE_XID; + tbl->table[i].commitArgs[0] = 0; + tbl->table[i].commitArgs[1] = 0; + tbl->table[i].commitArgs[2] = 0; #ifndef HAVE_GCC_ATOMICS pthread_mutex_init(&(tbl->table[i].mut),0); #endif } + for(int i = 0; i < 3; i++) { + tbl->commitCallbacks[i] = 0; + tbl->commitCallbackCount[i] = 0; + } + DEBUG("initted xact table!\n"); pthread_key_create(&tbl->key, stasis_transaction_table_thread_destructor); @@ -162,11 +219,19 @@ stasis_transaction_table_t * stasis_transaction_table_init() { void stasis_transaction_table_deinit(stasis_transaction_table_t *tbl) { #ifndef HAVE_GCC_ATOMICS pthread_mutex_destroy(&tbl->mut); +#endif for(int i = 0; i < MAX_TRANSACTIONS; i++) { +#ifndef HAVE_GCC_ATOMICS pthread_mutex_destroy(&tbl->table[i].mut); - } #endif + for(int j = 0; j < 3; j++) { + if(tbl->table[i].commitArgs[j]) { free(tbl->table[i].commitArgs[j]); } + } + } + for(int j = 0; j < 3; j++) { + if(tbl->commitCallbacks[j]) { free(tbl->commitCallbacks[j]); } + } pthread_key_delete(tbl->key); free(tbl); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 0d59291..b672697 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -96,7 +96,7 @@ int Tinit() { stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table, stasis_buffer_manager); // xxx circular dependency. pageOperationsInit(); stasis_allocation_policy = stasis_allocation_policy_init(); - stasis_alloc = stasis_alloc_init(stasis_allocation_policy); + stasis_alloc = stasis_alloc_init(stasis_transaction_table, stasis_allocation_policy); TnaiveHashInit(); LinearHashNTAInit(); @@ -272,12 +272,8 @@ static inline int TcommitHelper(int xid, int force) { stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid); if(xact->prevLSN != INVALID_LSN) { - - lsn = stasis_log_commit_transaction(stasis_log_file, xact, force); + lsn = stasis_log_commit_transaction(stasis_log_file, stasis_transaction_table, xact, force); if(globalLockManager.commit) { globalLockManager.commit(xid); } - - stasis_alloc_committed(stasis_alloc, xid); - } stasis_transaction_table_commit(stasis_transaction_table, xid); @@ -310,20 +306,18 @@ int Tabort(int xid) { stasis_transaction_table_entry_t * t = stasis_transaction_table_get(stasis_transaction_table, xid); assert(t->xid == xid); - lsn = stasis_log_abort_transaction(stasis_log_file, t); + lsn = stasis_log_abort_transaction(stasis_log_file, stasis_transaction_table, t); /** @todo is the order of the next two calls important? */ undoTrans(stasis_log_file, stasis_transaction_table, *t); // XXX don't really need to pass the whole table in... if(globalLockManager.abort) { globalLockManager.abort(xid); } - stasis_alloc_aborted(stasis_alloc, xid); - return 0; } int Tforget(int xid) { stasis_transaction_table_entry_t * t = stasis_transaction_table_get(stasis_transaction_table, xid); assert(t->xid == xid); - stasis_log_end_aborted_transaction(stasis_log_file, t); + stasis_log_end_aborted_transaction(stasis_log_file, stasis_transaction_table, t); stasis_transaction_table_forget(stasis_transaction_table, t->xid); return 0; } diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index abebe6f..6a9566e 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -281,20 +281,20 @@ lsn_t stasis_log_prepare_transaction(stasis_log_t* log, stasis_transaction_table @return the lsn of the commit log entry. */ -lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l, int force); +lsn_t stasis_log_commit_transaction(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_transaction_table_entry_t * l, int force); /** Write a transaction ABORT to the log tail. Does not force the log. @return the lsn of the abort log entry. */ -lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_entry_t * l); +lsn_t stasis_log_abort_transaction(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_transaction_table_entry_t * l); /** Write a end transaction record. This entry tells recovery's undo phase that it may safely ignore the transaction. */ -lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, stasis_transaction_table_entry_t * l); +lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, stasis_transaction_table_t *tbl, stasis_transaction_table_entry_t * l); /** stasis_log_write_update writes an UPDATELOG log record to the log tail. It diff --git a/stasis/operations/alloc.h b/stasis/operations/alloc.h index 243ab6d..a5faee3 100644 --- a/stasis/operations/alloc.h +++ b/stasis/operations/alloc.h @@ -20,10 +20,7 @@ stasis_operation_impl stasis_op_impl_realloc(); typedef struct stasis_alloc_t stasis_alloc_t; -void stasis_alloc_aborted(stasis_alloc_t* alloc, int xid); -void stasis_alloc_committed(stasis_alloc_t* alloc, int xid); - -stasis_alloc_t* stasis_alloc_init(stasis_allocation_policy_t * allocPolicy); +stasis_alloc_t* stasis_alloc_init(stasis_transaction_table_t * tbl, stasis_allocation_policy_t * allocPolicy); void stasis_alloc_post_init(stasis_alloc_t* alloc); void stasis_alloc_deinit(stasis_alloc_t* alloc); /** diff --git a/stasis/transactionTable.h b/stasis/transactionTable.h index 73d0b38..a9ceb0c 100644 --- a/stasis/transactionTable.h +++ b/stasis/transactionTable.h @@ -10,6 +10,9 @@ #include +typedef int (*stasis_transaction_table_callback_t)(int, void*); + +typedef struct stasis_transaction_table_callback_list_t stasis_transaction_table_callback_list_t; typedef struct stasis_transaction_table_entry_t stasis_transaction_table_entry_t; typedef struct stasis_transaction_table_t stasis_transaction_table_t; @@ -21,6 +24,7 @@ struct stasis_transaction_table_entry_t { int xid; lsn_t prevLSN; lsn_t recLSN; + void ** commitArgs[3]; #ifndef HAVE_GCC_ATOMICS pthread_mutex_t mut; #endif @@ -55,4 +59,19 @@ int stasis_transaction_table_num_active_threads(stasis_transaction_table_t*); int* stasis_transaction_table_list_active(stasis_transaction_table_t*, int *count); int stasis_transaction_table_is_active(stasis_transaction_table_t*, int xid); +typedef enum { + PRE_COMMIT = 0, + AT_COMMIT = 1, + POST_COMMIT = 2 +} stasis_transaction_table_callback_type_t; + +int stasis_transaction_table_register_callback(stasis_transaction_table_t *tbl, + stasis_transaction_table_callback_t cb, + stasis_transaction_table_callback_type_t type); +int stasis_transaction_table_invoke_callbacks(stasis_transaction_table_t *tbl, + stasis_transaction_table_entry_t *entry, + stasis_transaction_table_callback_type_t type); +int stasis_transaction_table_set_argument(stasis_transaction_table_t *tbl, int xid, int callback_id, + stasis_transaction_table_callback_type_t type, void *arg); + #endif /* TRANSACTIONTABLE_H_ */