port recovery from pbl hash api to lhtable api

This commit is contained in:
Sears Russell 2011-06-12 11:03:48 +00:00
parent 312f8182f9
commit 80ab0d684c

View file

@ -6,8 +6,7 @@
*/ */
#include <stasis/common.h> #include <stasis/common.h>
#include <pbl/pbl.h> #include <stasis/lhtable.h>
#include <stasis/recovery.h> #include <stasis/recovery.h>
#include <stasis/transactionTable.h> #include <stasis/transactionTable.h>
@ -26,7 +25,8 @@
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
static pblHashTable_t * transactionLSN; //static pblHashTable_t * transactionLSN;
static struct lhtable * transactionLSN;
static LinkedList * rollbackLSNs = NULL; static LinkedList * rollbackLSNs = NULL;
/** @todo There is no real reason to have this mutex (which prevents /** @todo There is no real reason to have this mutex (which prevents
concurrent aborts), except that we need to protect rollbackLSNs's concurrent aborts), except that we need to protect rollbackLSNs's
@ -59,7 +59,7 @@ static void stasis_recovery_analysis(stasis_log_t* log, stasis_transaction_table
while((e = nextInLog(lh))) { while((e = nextInLog(lh))) {
lsn_t * xactLSN = (lsn_t*)pblHtLookup(transactionLSN, &(e->xid), sizeof(int)); lsn_t * xactLSN = (lsn_t*)lhfind(transactionLSN, &(e->xid), sizeof(int));
/** Track LSN's in two data structures: /** Track LSN's in two data structures:
- map: xid -> max LSN - map: xid -> max LSN
@ -68,8 +68,7 @@ static void stasis_recovery_analysis(stasis_log_t* log, stasis_transaction_table
if(xactLSN == NULL) { if(xactLSN == NULL) {
xactLSN = malloc(sizeof(lsn_t)); xactLSN = malloc(sizeof(lsn_t));
pblHtInsert(transactionLSN, &(e->xid), sizeof(int), xactLSN); lhinsert(transactionLSN, &(e->xid), sizeof(int), xactLSN);
} else { } else {
/* We've seen this xact before, and must have put a value in /* We've seen this xact before, and must have put a value in
rollbackLSNs for it. That value is now stale, so remove rollbackLSNs for it. That value is now stale, so remove
@ -344,7 +343,7 @@ static void stasis_recovery_undo(stasis_log_t* log, stasis_transaction_table_t *
} }
void stasis_recovery_initiate(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_alloc_t * alloc) { void stasis_recovery_initiate(stasis_log_t* log, stasis_transaction_table_t * tbl, stasis_alloc_t * alloc) {
stasis_buffer_manager_set_redo_mode(1); stasis_buffer_manager_set_redo_mode(1);
transactionLSN = pblHtCreate(); transactionLSN = lhcreate(64);
DEBUG("Analysis started\n"); DEBUG("Analysis started\n");
stasis_recovery_analysis(log, tbl); stasis_recovery_analysis(log, tbl);
DEBUG("Redo started\n"); DEBUG("Redo started\n");
@ -356,10 +355,14 @@ void stasis_recovery_initiate(stasis_log_t* log, stasis_transaction_table_t * tb
DEBUG("Recovery complete.\n"); DEBUG("Recovery complete.\n");
stasis_transaction_post_recovery(tbl); stasis_transaction_post_recovery(tbl);
for(void * it = pblHtFirst(transactionLSN); it; it = pblHtNext(transactionLSN)) { struct lhlist ll;
free(pblHtCurrent(transactionLSN)); const struct lhpair_t * next;
lhopenlist(transactionLSN, &ll);
while(( next = lhreadlist(&ll) )) {
free(next->value);
} }
pblHtDelete(transactionLSN); lhcloselist(&ll);
lhdestroy(transactionLSN);
destroyList(&rollbackLSNs); destroyList(&rollbackLSNs);
assert(rollbackLSNs==0); assert(rollbackLSNs==0);