more refactoring; log handles now manage their log entries memory
This commit is contained in:
parent
a5788688c8
commit
1409a9eb27
18 changed files with 123 additions and 94 deletions
|
@ -95,10 +95,11 @@ int main(int argc, char ** argv) {
|
|||
if(log_mode) {
|
||||
lsn_t prevLSN = -1;
|
||||
byte * arg = calloc(PAGE_SIZE, 1);
|
||||
LogEntry * e = allocUpdateLogEntry(prevLSN, -1, OPERATION_NOOP,
|
||||
stasis_log_t * l = stasis_log();
|
||||
|
||||
LogEntry * e = allocUpdateLogEntry(l, prevLSN, -1, OPERATION_NOOP,
|
||||
0, PAGE_SIZE);
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), arg, PAGE_SIZE);
|
||||
stasis_log_t * l = stasis_log();
|
||||
for(long i = 0; i < page_count; i++) {
|
||||
void * h;
|
||||
LogEntry * e2 = l->reserve_entry(l, sizeofLogEntry(l, e), &h);
|
||||
|
@ -107,7 +108,7 @@ int main(int argc, char ** argv) {
|
|||
memcpy(e2, e, sizeofLogEntry(l, e));
|
||||
l->entry_done(l, e2, h);
|
||||
}
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(l, e);
|
||||
free(arg);
|
||||
} else {
|
||||
if(stake) {
|
||||
|
|
|
@ -46,7 +46,7 @@ terms specified in this license.
|
|||
|
||||
#include <assert.h>
|
||||
|
||||
LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) {
|
||||
LogEntry * allocCommonLogEntry(stasis_log_t* log, lsn_t prevLSN, int xid, unsigned int type) {
|
||||
LogEntry * ret = calloc(1,sizeof(struct __raw_log_entry));
|
||||
ret->LSN = -1;
|
||||
ret->prevLSN = prevLSN;
|
||||
|
@ -54,7 +54,7 @@ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) {
|
|||
ret->type = type;
|
||||
return ret;
|
||||
}
|
||||
LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN) {
|
||||
LogEntry * allocPrepareLogEntry(stasis_log_t* log, lsn_t prevLSN, int xid, lsn_t recLSN) {
|
||||
LogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeof(lsn_t));
|
||||
ret->LSN = -1;
|
||||
ret->prevLSN = prevLSN;
|
||||
|
@ -86,7 +86,7 @@ lsn_t getPrepareRecLSN(const LogEntry *e) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
|
||||
LogEntry * allocUpdateLogEntry(stasis_log_t* log, lsn_t prevLSN, int xid,
|
||||
unsigned int op, pageid_t page,
|
||||
unsigned int arg_size) {
|
||||
/** Use calloc since the struct might not be packed in memory;
|
||||
|
@ -107,7 +107,7 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
|
|||
return ret;
|
||||
}
|
||||
|
||||
LogEntry * allocCLRLogEntry(const LogEntry * old_e) {
|
||||
LogEntry * allocCLRLogEntry(stasis_log_t* log, const LogEntry * old_e) {
|
||||
CLRLogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeofLogEntry(0, old_e));
|
||||
|
||||
ret->LSN = -1;
|
||||
|
@ -119,18 +119,18 @@ LogEntry * allocCLRLogEntry(const LogEntry * old_e) {
|
|||
|
||||
return (LogEntry*)ret;
|
||||
}
|
||||
void freeLogEntry(const LogEntry* e) {
|
||||
void freeLogEntry(stasis_log_t* log, const LogEntry* e) {
|
||||
free((void*)e);
|
||||
}
|
||||
|
||||
|
||||
lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e) {
|
||||
lsn_t sizeofLogEntry(stasis_log_t * log, const LogEntry * e) {
|
||||
switch (e->type) {
|
||||
case CLRLOG:
|
||||
{
|
||||
const LogEntry * contents = getCLRCompensated((const CLRLogEntry*) e);
|
||||
assert(contents->type != CLRLOG);
|
||||
return sizeof(struct __raw_log_entry) + sizeofLogEntry(lh, contents);
|
||||
return sizeof(struct __raw_log_entry) + sizeofLogEntry(log, contents);
|
||||
}
|
||||
case UPDATELOG:
|
||||
{
|
||||
|
@ -138,8 +138,8 @@ lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e) {
|
|||
sizeof(UpdateLogEntry) + e->update.arg_size;
|
||||
}
|
||||
case INTERNALLOG:
|
||||
assert(lh);
|
||||
return lh->sizeof_internal_entry(lh,e);
|
||||
assert(log);
|
||||
return log->sizeof_internal_entry(log,e);
|
||||
case XPREPARE:
|
||||
return sizeof(struct __raw_log_entry)+sizeof(lsn_t);
|
||||
default:
|
||||
|
|
|
@ -46,6 +46,8 @@ struct LogHandle {
|
|||
lsn_t next_offset;
|
||||
/** The LSN of the log entry that we would return if previous is called. */
|
||||
lsn_t prev_offset;
|
||||
/** The last LogEntry this iterator returned */
|
||||
const LogEntry * last;
|
||||
/** The log this iterator traverses. */
|
||||
stasis_log_t* log;
|
||||
};
|
||||
|
@ -69,27 +71,33 @@ LogHandle* getLSNHandle(stasis_log_t * log, lsn_t lsn) {
|
|||
LogHandle* ret = malloc(sizeof(*ret));
|
||||
ret->next_offset = lsn;
|
||||
ret->prev_offset = lsn;
|
||||
ret->last = 0;
|
||||
ret->log = log;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void freeLogHandle(LogHandle* lh) {
|
||||
if(lh->last) { freeLogEntry(lh->log, lh->last); }
|
||||
free(lh);
|
||||
}
|
||||
const LogEntry * nextInLog(LogHandle * h) {
|
||||
if(h->last) { freeLogEntry(h->log, h->last); }
|
||||
const LogEntry * ret = h->log->read_entry(h->log,h->next_offset);
|
||||
if(ret != NULL) {
|
||||
set_offsets(h, ret);
|
||||
}
|
||||
h->last = ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
const LogEntry * previousInTransaction(LogHandle * h) {
|
||||
const LogEntry * ret = NULL;
|
||||
if(h->last) { freeLogEntry(h->log, h->last); }
|
||||
if(h->prev_offset > 0) {
|
||||
ret = h->log->read_entry(h->log, h->prev_offset);
|
||||
set_offsets(h, ret);
|
||||
}
|
||||
h->last = ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ terms specified in this license.
|
|||
#include <stasis/page.h>
|
||||
|
||||
static lsn_t stasis_log_write_common(stasis_log_t* log, stasis_transaction_table_entry_t * l, int type) {
|
||||
LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type);
|
||||
LogEntry * e = allocCommonLogEntry(log, l->prevLSN, l->xid, type);
|
||||
lsn_t ret;
|
||||
|
||||
log->write_entry(log, e);
|
||||
|
@ -68,13 +68,13 @@ static lsn_t stasis_log_write_common(stasis_log_t* log, stasis_transaction_table
|
|||
|
||||
ret = e->LSN;
|
||||
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(log, e);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static lsn_t stasis_log_write_prepare(stasis_log_t* log, stasis_transaction_table_entry_t * l) {
|
||||
LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN);
|
||||
LogEntry * e = allocPrepareLogEntry(log, l->prevLSN, l->xid, l->recLSN);
|
||||
lsn_t ret;
|
||||
|
||||
DEBUG("Log prepare xid = %d prevlsn = %lld reclsn = %lld, %lld\n",
|
||||
|
@ -90,7 +90,7 @@ static lsn_t stasis_log_write_prepare(stasis_log_t* log, stasis_transaction_tabl
|
|||
|
||||
ret = e->LSN;
|
||||
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(log, e);
|
||||
|
||||
return ret;
|
||||
|
||||
|
@ -100,7 +100,7 @@ LogEntry * stasis_log_write_update(stasis_log_t* log, stasis_transaction_table_e
|
|||
pageid_t page, unsigned int op,
|
||||
const byte * arg, size_t arg_size) {
|
||||
|
||||
LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op,
|
||||
LogEntry * e = allocUpdateLogEntry(log, l->prevLSN, l->xid, op,
|
||||
page, arg_size);
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), arg, arg_size);
|
||||
log->write_entry(log, e);
|
||||
|
@ -115,7 +115,7 @@ LogEntry * stasis_log_write_update(stasis_log_t* log, stasis_transaction_table_e
|
|||
|
||||
LogEntry * stasis_log_begin_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, unsigned int op,
|
||||
const byte * arg, size_t arg_size) {
|
||||
LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op, INVALID_PAGE, arg_size);
|
||||
LogEntry * e = allocUpdateLogEntry(log, l->prevLSN, l->xid, op, INVALID_PAGE, arg_size);
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), arg, arg_size);
|
||||
return e;
|
||||
}
|
||||
|
@ -125,28 +125,28 @@ lsn_t stasis_log_end_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l
|
|||
if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; }
|
||||
lsn_t ret = l->prevLSN = e->LSN;
|
||||
// pthread_mutex_unlock(&l->mut);
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(log, e);
|
||||
return ret;
|
||||
}
|
||||
|
||||
lsn_t stasis_log_write_clr(stasis_log_t* log, const LogEntry * old_e) {
|
||||
LogEntry * e = allocCLRLogEntry(old_e);
|
||||
LogEntry * e = allocCLRLogEntry(log, old_e);
|
||||
log->write_entry(log, e);
|
||||
|
||||
DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid,
|
||||
e->LSN, LSN, prevLSN);
|
||||
lsn_t ret = e->LSN;
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(log, e);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid, lsn_t prevLSN) {
|
||||
// XXX waste of log bandwidth.
|
||||
const LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP,
|
||||
const LogEntry * e = allocUpdateLogEntry(log, prevLSN, xid, OPERATION_NOOP,
|
||||
INVALID_PAGE, 0);
|
||||
lsn_t ret = stasis_log_write_clr(log, e);
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(log, e);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -244,13 +244,13 @@ static inline void log_crc_reset(stasis_log_safe_writes_state* sw) {
|
|||
static inline void log_crc_update(stasis_log_t* log, const LogEntry * e, unsigned int * crc) {
|
||||
*crc = stasis_crc32(e, sizeofLogEntry(log, e), *crc);
|
||||
}
|
||||
static LogEntry* log_crc_dummy_entry() {
|
||||
LogEntry* ret = allocCommonLogEntry(0, -1, INTERNALLOG);
|
||||
static LogEntry* log_crc_dummy_entry(stasis_log_t *log) {
|
||||
LogEntry* ret = allocCommonLogEntry(log, 0, -1, INTERNALLOG);
|
||||
assert(ret->prevLSN == 0);
|
||||
return ret;
|
||||
}
|
||||
static LogEntry* log_crc_entry(unsigned int crc) {
|
||||
LogEntry* ret = allocCommonLogEntry(crc, -1, INTERNALLOG);
|
||||
static LogEntry* log_crc_entry(stasis_log_t *log, unsigned int crc) {
|
||||
LogEntry* ret = allocCommonLogEntry(log, crc, -1, INTERNALLOG);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -279,13 +279,13 @@ static inline lsn_t log_crc_next_lsn(stasis_log_t* log, lsn_t ret) {
|
|||
(unsigned int) le->prevLSN, crc, le->LSN);
|
||||
// The log wasn't successfully forced to this point; discard
|
||||
// everything after the last CRC.
|
||||
freeLogEntry(le);
|
||||
freeLogEntry(log, le);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
log_crc_update(log, le, &crc);
|
||||
}
|
||||
freeLogEntry(le);
|
||||
freeLogEntry(log, le);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -411,7 +411,7 @@ static void syncLog_LogWriter(stasis_log_t * log,
|
|||
newFlushedLSN = sw->nextAvailableLSN;
|
||||
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
|
||||
|
||||
LogEntry* crc_entry = log_crc_entry(sw->crc);
|
||||
LogEntry* crc_entry = log_crc_entry(log, sw->crc);
|
||||
writeLogEntryUnlocked(log, crc_entry);
|
||||
free(crc_entry);
|
||||
// Reset log_crc to zero each time a crc entry is written.
|
||||
|
@ -631,28 +631,24 @@ static int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
|
|||
}
|
||||
nextLSN = nextEntry_LogWriter(log, le);
|
||||
|
||||
LogEntry *firstCRC = 0;
|
||||
// zero out crc of first entry during copy
|
||||
if(firstInternalEntry && le->type == INTERNALLOG) {
|
||||
LogEntry * firstCRC = malloc(size);
|
||||
firstCRC = malloc(size);
|
||||
memcpy(firstCRC, le, size);
|
||||
freeLogEntry(le);
|
||||
firstCRC->prevLSN = 0;
|
||||
le = firstCRC;
|
||||
firstInternalEntry = 0;
|
||||
}
|
||||
|
||||
lengthOfCopiedLog += (size + sizeof(lsn_t));
|
||||
|
||||
myFwrite(&size, sizeof(lsn_t), tmpLog);
|
||||
myFwrite(le, size, tmpLog);
|
||||
if(firstInternalEntry && le->type == INTERNALLOG) {
|
||||
free((void*)le); // remove const qualifier + free
|
||||
firstInternalEntry = 0;
|
||||
} else {
|
||||
freeLogEntry(le);
|
||||
}
|
||||
if(firstCRC) { free(firstCRC); }
|
||||
}
|
||||
freeLogHandle(lh);
|
||||
LogEntry * crc_entry = log_crc_dummy_entry();
|
||||
LogEntry * crc_entry = log_crc_dummy_entry(log);
|
||||
|
||||
pthread_mutex_lock(&sw->nextAvailableLSN_mutex);
|
||||
crc_entry->LSN = sw->nextAvailableLSN;
|
||||
|
|
|
@ -76,9 +76,9 @@ int TpageSetRange(int xid, pageid_t page, int offset, const void * memAddr, int
|
|||
This calls loadPage and releasePage directly, and bypasses the
|
||||
logger.
|
||||
*/
|
||||
compensated_function void pageOperationsInit() {
|
||||
compensated_function void pageOperationsInit(stasis_log_t *log) {
|
||||
|
||||
regionsInit();
|
||||
regionsInit(log);
|
||||
|
||||
boundary_tag t;
|
||||
recordid rid = {0, 0, sizeof(boundary_tag)};
|
||||
|
|
|
@ -134,7 +134,7 @@ static void TdeallocBoundaryTag(int xid, pageid_t page) {
|
|||
|
||||
}
|
||||
|
||||
void regionsInit() {
|
||||
void regionsInit(stasis_log_t *log) {
|
||||
Page * p = loadPage(-1, 0);
|
||||
|
||||
holding_mutex = pthread_self();
|
||||
|
@ -155,13 +155,13 @@ void regionsInit() {
|
|||
// recordid rid = {0,0,sizeof(boundary_tag)};
|
||||
|
||||
// hack; allocate a fake log entry; pass it into ourselves.
|
||||
LogEntry * e = allocUpdateLogEntry(0,0,OPERATION_ALLOC_BOUNDARY_TAG,
|
||||
LogEntry * e = allocUpdateLogEntry(log, 0,0,OPERATION_ALLOC_BOUNDARY_TAG,
|
||||
p->id, sizeof(boundary_tag));
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), &t, sizeof(boundary_tag));
|
||||
writelock(p->rwlatch,0);
|
||||
op_alloc_boundary_tag(e,p);
|
||||
unlock(p->rwlatch);
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(log, e);
|
||||
}
|
||||
holding_mutex = 0;
|
||||
releasePage(p);
|
||||
|
|
|
@ -133,7 +133,6 @@ static void stasis_recovery_analysis(stasis_log_t* log, stasis_transaction_table
|
|||
default:
|
||||
abort();
|
||||
}
|
||||
freeLogEntry(e);
|
||||
}
|
||||
freeLogHandle(lh);
|
||||
}
|
||||
|
@ -228,7 +227,6 @@ static void stasis_recovery_redo(stasis_log_t* log, stasis_transaction_table_t *
|
|||
abort();
|
||||
}
|
||||
} // end switch
|
||||
freeLogEntry(e);
|
||||
} // end loop
|
||||
freeLogHandle(lh);
|
||||
}
|
||||
|
@ -329,7 +327,6 @@ static void stasis_recovery_undo(stasis_log_t* log, stasis_transaction_table_t *
|
|||
abort();
|
||||
}
|
||||
}
|
||||
freeLogEntry(e);
|
||||
}
|
||||
if(!prepared) {
|
||||
// Log an XEND, remove transaction from stasis_transaction_table.
|
||||
|
|
|
@ -94,7 +94,7 @@ int Tinit() {
|
|||
stasis_buffer_manager = stasis_buffer_manager_factory(stasis_log_file, stasis_dirty_page_table);
|
||||
|
||||
stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table, stasis_buffer_manager); // xxx circular dependency.
|
||||
pageOperationsInit();
|
||||
pageOperationsInit(stasis_log_file);
|
||||
stasis_allocation_policy = stasis_allocation_policy_init();
|
||||
stasis_alloc = stasis_alloc_init(stasis_transaction_table, stasis_allocation_policy);
|
||||
|
||||
|
@ -160,7 +160,7 @@ compensated_function void Tupdate(int xid, pageid_t page,
|
|||
assert(xact->prevLSN == e->LSN);
|
||||
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
|
||||
stasis_operation_do(e, p);
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(stasis_log_file, e);
|
||||
|
||||
if(p) unlock(p->rwlatch);
|
||||
if(p) releasePage(p);
|
||||
|
@ -180,7 +180,7 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page,
|
|||
|
||||
pthread_mutex_lock(&h->mut);
|
||||
|
||||
LogEntry * e = allocUpdateLogEntry(-1, h->l->xid, op,
|
||||
LogEntry * e = allocUpdateLogEntry(h->log, -1, h->l->xid, op,
|
||||
p->id, datlen);
|
||||
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen);
|
||||
|
@ -193,11 +193,11 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page,
|
|||
unlock(p->rwlatch);
|
||||
pthread_mutex_unlock(&h->mut);
|
||||
// page will be released by the log handle...
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(stasis_log_file, e);
|
||||
}
|
||||
lsn_t TwritebackUpdate(int xid, pageid_t page,
|
||||
const void *dat, size_t datlen, int op) {
|
||||
LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, datlen);
|
||||
LogEntry * e = allocUpdateLogEntry(stasis_log_file, -1, xid, op, page, datlen);
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen);
|
||||
|
||||
stasis_transaction_table_entry_t* l = stasis_transaction_table_get(stasis_transaction_table, xid);
|
||||
|
@ -206,7 +206,7 @@ lsn_t TwritebackUpdate(int xid, pageid_t page,
|
|||
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
|
||||
l->prevLSN = e->LSN;
|
||||
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(stasis_log_file, e);
|
||||
return l->prevLSN;
|
||||
}
|
||||
/** DANGER: you need to set the LSN's on the pages that you want to write back,
|
||||
|
@ -218,7 +218,7 @@ void TreorderableWritebackUpdate(int xid, void* hp,
|
|||
stasis_log_reordering_handle_t* h = hp;
|
||||
assert(stasis_transaction_table_is_active(stasis_transaction_table, xid));
|
||||
pthread_mutex_lock(&h->mut);
|
||||
LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, datlen);
|
||||
LogEntry * e = allocUpdateLogEntry(stasis_log_file, -1, xid, op, page, datlen);
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen);
|
||||
stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(0, e));
|
||||
pthread_mutex_unlock(&h->mut);
|
||||
|
|
|
@ -94,9 +94,9 @@ struct LogEntry {
|
|||
|
||||
@return a LogEntry that should be freed with free().
|
||||
*/
|
||||
LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type);
|
||||
LogEntry * allocCommonLogEntry(stasis_log_t *log, lsn_t prevLSN, int xid, unsigned int type);
|
||||
|
||||
LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN);
|
||||
LogEntry * allocPrepareLogEntry(stasis_log_t *log, lsn_t prevLSN, int xid, lsn_t recLSN);
|
||||
/**
|
||||
Allocate a log entry associated with an operation implemention. This
|
||||
is usually called inside of Tupdate().
|
||||
|
@ -104,7 +104,7 @@ LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN);
|
|||
@return a LogEntry that should be freed with free().
|
||||
|
||||
*/
|
||||
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
|
||||
LogEntry * allocUpdateLogEntry(stasis_log_t *log, lsn_t prevLSN, int xid,
|
||||
unsigned int op, pageid_t page, unsigned int arg_size);
|
||||
|
||||
/**
|
||||
|
@ -112,20 +112,18 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
|
|||
entries are undone. This moves undo operations into the redo
|
||||
phase, by recording the inverse of the original operation, and sets
|
||||
prevLSN to the prevLSN of old_e.
|
||||
|
||||
@return a LogEntry that should be freed with free().
|
||||
*/
|
||||
LogEntry * allocCLRLogEntry(const LogEntry * e);
|
||||
LogEntry * allocCLRLogEntry(stasis_log_t *log, const LogEntry * e);
|
||||
/**
|
||||
@param e a log entry returned from one of the alloc???LogEntry functions.
|
||||
*/
|
||||
void freeLogEntry(const LogEntry * e);
|
||||
void freeLogEntry(stasis_log_t *log, const LogEntry * e);
|
||||
/**
|
||||
@param lh The log handle the entry will be stored in. (Needed because some log entries are of type INTERNALLOG) May be NULL if e is not of type INTERNALLOG.
|
||||
@param e A log entry of any type.
|
||||
@return the length, in bytes, of e.
|
||||
*/
|
||||
lsn_t sizeofLogEntry(stasis_log_t * lh, const LogEntry * e);
|
||||
lsn_t sizeofLogEntry(stasis_log_t * log, const LogEntry * e);
|
||||
/**
|
||||
* @return the operation's arguments, or NULL if there are no arguments.
|
||||
*/
|
||||
|
|
|
@ -58,6 +58,9 @@ typedef struct LogHandle LogHandle;
|
|||
in a particular transaction. They follow the prevLSN field,
|
||||
skipping any undo entries that have been marked complete.
|
||||
|
||||
The memory that contains the logEntry objects is managed by the LogHandle.
|
||||
Callers should not free() or otherwise dispose of memory returned by a LogHandle.
|
||||
|
||||
@see logWriter.h For write access to the log.
|
||||
@see logger.h For the api provided by log implementations.
|
||||
*/
|
||||
|
|
|
@ -79,6 +79,6 @@ stasis_operation_impl stasis_op_impl_page_initialize();
|
|||
|
||||
stasis_operation_impl stasis_op_impl_fixed_page_alloc();
|
||||
|
||||
compensated_function void pageOperationsInit();
|
||||
compensated_function void pageOperationsInit(stasis_log_t *log);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -26,7 +26,7 @@ typedef struct boundary_tag {
|
|||
#define REGION_OCCUPIED (REGION_BASE + 2)
|
||||
#define REGION_CONDEMNED (REGION_BASE + 3)
|
||||
|
||||
void regionsInit();
|
||||
void regionsInit(stasis_log_t *log);
|
||||
|
||||
pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocaionManager);
|
||||
void TregionDealloc(int xid, pageid_t firstPage);
|
||||
|
|
|
@ -49,13 +49,16 @@ terms specified in this license.
|
|||
|
||||
START_TEST(rawLogEntryAlloc)
|
||||
{
|
||||
LogEntry * log = allocCommonLogEntry(200, 1, XABORT);
|
||||
Tinit();
|
||||
stasis_log_t *l = stasis_log();
|
||||
LogEntry * log = allocCommonLogEntry(log, 200, 1, XABORT);
|
||||
assert(log->LSN == -1);
|
||||
assert(log->prevLSN == 200);
|
||||
assert(log->xid == 1);
|
||||
assert(log->type == XABORT);
|
||||
assert(sizeofLogEntry(0, log) == sizeof(struct __raw_log_entry));
|
||||
free(log);
|
||||
freeLogEntry(l, log);
|
||||
Tdeinit();
|
||||
}
|
||||
END_TEST
|
||||
|
||||
|
@ -74,8 +77,9 @@ START_TEST(updateLogEntryAlloc)
|
|||
LogEntry * log;
|
||||
|
||||
Tinit(); /* Needed because it sets up the operations table. */
|
||||
stasis_log_t *l = stasis_log();
|
||||
|
||||
log = allocUpdateLogEntry(200, 1, OPERATION_SET,
|
||||
log = allocUpdateLogEntry(l, 200, 1, OPERATION_SET,
|
||||
rid.page, 3*sizeof(char));
|
||||
memcpy(stasis_log_entry_update_args_ptr(log), args, 3*sizeof(char));
|
||||
assert(log->LSN == -1);
|
||||
|
@ -103,9 +107,12 @@ END_TEST
|
|||
|
||||
START_TEST(updateLogEntryAllocNoExtras)
|
||||
{
|
||||
Tinit();
|
||||
|
||||
recordid rid = { 3 , 4, sizeof(int)*3 };
|
||||
|
||||
LogEntry * log = allocUpdateLogEntry(200, 1, OPERATION_SET,
|
||||
stasis_log_t *l = stasis_log();
|
||||
LogEntry * log = allocUpdateLogEntry(l, 200, 1, OPERATION_SET,
|
||||
rid.page, 0);
|
||||
assert(log->LSN == -1);
|
||||
assert(log->prevLSN == 200);
|
||||
|
@ -120,6 +127,8 @@ START_TEST(updateLogEntryAllocNoExtras)
|
|||
|
||||
assert(sizeofLogEntry(0, log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 0 * (sizeof(int)+sizeof(char))));
|
||||
free(log);
|
||||
|
||||
Tdeinit();
|
||||
}
|
||||
END_TEST
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ static stasis_log_t * setup_log() {
|
|||
int first = 1;
|
||||
stasis_log_t * stasis_log_file = stasis_log();
|
||||
for(i = 0 ; i < 1000; i++) {
|
||||
LogEntry * e = allocCommonLogEntry(prevLSN, xid, XBEGIN);
|
||||
LogEntry * e = allocCommonLogEntry(stasis_log_file, prevLSN, xid, XBEGIN);
|
||||
const LogEntry * f;
|
||||
recordid rid;
|
||||
byte * args = (byte*)"Test 123.";
|
||||
|
@ -100,23 +100,23 @@ static stasis_log_t * setup_log() {
|
|||
fail_unless(sizeofLogEntry(0, e) == sizeofLogEntry(0, f), "Log entry changed size!!");
|
||||
fail_unless(0 == memcmp(e,f,sizeofLogEntry(0, e)), "Log entries did not agree!!");
|
||||
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(f);
|
||||
freeLogEntry(stasis_log_file, e);
|
||||
freeLogEntry(stasis_log_file, f);
|
||||
|
||||
e = allocUpdateLogEntry(prevLSN, xid, 1, rid.page, args_size);
|
||||
e = allocUpdateLogEntry(stasis_log_file, prevLSN, xid, 1, rid.page, args_size);
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), args, args_size);
|
||||
stasis_log_file->write_entry(stasis_log_file,e);
|
||||
prevLSN = e->prevLSN;
|
||||
|
||||
// LogEntry * g = allocCLRLogEntry(100, 1, 200, rid, 0); //prevLSN);
|
||||
LogEntry * g = allocCLRLogEntry(e); // XXX will probably break
|
||||
LogEntry * g = allocCLRLogEntry(stasis_log_file, e); // XXX will probably break
|
||||
g->prevLSN = firstLSN;
|
||||
stasis_log_file->write_entry(stasis_log_file,g);
|
||||
assert (g->type == CLRLOG);
|
||||
prevLSN = g->LSN;
|
||||
|
||||
freeLogEntry (e);
|
||||
freeLogEntry (g);
|
||||
freeLogEntry (stasis_log_file, e);
|
||||
freeLogEntry (stasis_log_file, g);
|
||||
}
|
||||
return stasis_log_file;
|
||||
}
|
||||
|
@ -148,7 +148,6 @@ static void loggerTest(int logType) {
|
|||
h = getLogHandle(stasis_log_file);
|
||||
|
||||
while((e = nextInLog(h))) {
|
||||
freeLogEntry(e);
|
||||
i++;
|
||||
assert(i < 4000);
|
||||
}
|
||||
|
@ -181,15 +180,17 @@ static void logHandleColdReverseIterator(int logType) {
|
|||
|
||||
|
||||
while(((e = nextInLog(lh)) && (i < 100)) ) {
|
||||
freeLogEntry(e);
|
||||
i++;
|
||||
}
|
||||
|
||||
lsn_t lsn = e->LSN;
|
||||
|
||||
freeLogHandle(lh);
|
||||
|
||||
i = 0;
|
||||
lh = getLSNHandle(stasis_log_file, e->LSN);
|
||||
lh = getLSNHandle(stasis_log_file, lsn);
|
||||
while((e = previousInTransaction(lh))) {
|
||||
i++;
|
||||
freeLogEntry(e);
|
||||
}
|
||||
freeLogHandle(lh);
|
||||
assert(i <= 4); /* We should almost immediately hit a clr that goes to the beginning of the log... */
|
||||
|
@ -224,13 +225,26 @@ static void loggerTruncate(int logType) {
|
|||
le = nextInLog(lh);
|
||||
}
|
||||
|
||||
LogEntry * copy = malloc(sizeofLogEntry(stasis_log_file, le));
|
||||
memcpy(copy, le, sizeofLogEntry(stasis_log_file, le));
|
||||
le = copy;
|
||||
|
||||
le2 = nextInLog(lh);
|
||||
|
||||
copy = malloc(sizeofLogEntry(stasis_log_file, le2));
|
||||
memcpy(copy, le2, sizeofLogEntry(stasis_log_file, le2));
|
||||
le2 = copy;
|
||||
|
||||
i = 0;
|
||||
while(i < 23) {
|
||||
i++;
|
||||
le3 = nextInLog(lh);
|
||||
}
|
||||
|
||||
copy = malloc(sizeofLogEntry(stasis_log_file, le3));
|
||||
memcpy(copy, le3, sizeofLogEntry(stasis_log_file, le3));
|
||||
le3 = copy;
|
||||
|
||||
stasis_log_file->truncate(stasis_log_file, le->LSN);
|
||||
|
||||
tmp = stasis_log_file->read_entry(stasis_log_file, le->LSN);
|
||||
|
@ -238,33 +252,32 @@ static void loggerTruncate(int logType) {
|
|||
fail_unless(NULL != tmp, NULL);
|
||||
fail_unless(tmp->LSN == le->LSN, NULL);
|
||||
|
||||
freeLogEntry(tmp);
|
||||
freeLogEntry(stasis_log_file, tmp);
|
||||
tmp = stasis_log_file->read_entry(stasis_log_file, le2->LSN);
|
||||
|
||||
fail_unless(NULL != tmp, NULL);
|
||||
fail_unless(tmp->LSN == le2->LSN, NULL);
|
||||
|
||||
freeLogEntry(tmp);
|
||||
freeLogEntry(stasis_log_file, tmp);
|
||||
tmp = stasis_log_file->read_entry(stasis_log_file, le3->LSN);
|
||||
|
||||
fail_unless(NULL != tmp, NULL);
|
||||
fail_unless(tmp->LSN == le3->LSN, NULL);
|
||||
|
||||
freeLogEntry(tmp);
|
||||
freeLogEntry(stasis_log_file, tmp);
|
||||
freeLogHandle(lh);
|
||||
lh = getLogHandle(stasis_log_file);
|
||||
|
||||
i = 0;
|
||||
|
||||
freeLogEntry(le);
|
||||
freeLogEntry(le2);
|
||||
freeLogEntry(le3);
|
||||
free((void*)le);
|
||||
free((void*)le2);
|
||||
free((void*)le3);
|
||||
|
||||
while((le = nextInLog(lh))) {
|
||||
if(le->type != INTERNALLOG) {
|
||||
i++;
|
||||
}
|
||||
freeLogEntry(le);
|
||||
}
|
||||
assert(i == (3000 - 234 + 1));
|
||||
freeLogHandle(lh);
|
||||
|
@ -300,7 +313,7 @@ static void* worker_thread(void * arg) {
|
|||
stasis_log_t * stasis_log_file = stasis_log();
|
||||
|
||||
while(i < ENTRIES_PER_THREAD) {
|
||||
LogEntry * le = allocCommonLogEntry(-1, -1, XBEGIN);
|
||||
LogEntry * le = allocCommonLogEntry(stasis_log_file, -1, -1, XBEGIN);
|
||||
int threshold;
|
||||
long entry;
|
||||
int needToTruncate = 0;
|
||||
|
@ -361,7 +374,7 @@ static void* worker_thread(void * arg) {
|
|||
pthread_mutex_unlock(&random_mutex);
|
||||
} else {
|
||||
assert(e->xid == entry+key);
|
||||
freeLogEntry(e);
|
||||
freeLogEntry(stasis_log_file, e);
|
||||
}
|
||||
} else {
|
||||
pthread_mutex_unlock(&random_mutex);
|
||||
|
@ -372,7 +385,7 @@ static void* worker_thread(void * arg) {
|
|||
|
||||
/* Try to interleave requests as much as possible */
|
||||
sched_yield();
|
||||
freeLogEntry(le);
|
||||
freeLogEntry(stasis_log_file, le);
|
||||
}
|
||||
|
||||
|
||||
|
@ -536,6 +549,11 @@ void reopenLogWorkload(int truncating) {
|
|||
freeLogHandle(h);
|
||||
assert(i == (ENTRY_COUNT * 2));
|
||||
|
||||
for(int i = 0; i < ENTRY_COUNT; i++) {
|
||||
freeLogEntry(stasis_log_file, entries[i]);
|
||||
freeLogEntry(stasis_log_file, entries2[i]);
|
||||
}
|
||||
|
||||
stasis_truncation_automatic = 1;
|
||||
stasis_log_file->close(stasis_log_file);
|
||||
}
|
||||
|
|
|
@ -180,14 +180,14 @@ START_TEST(multiplexTest) {
|
|||
for(i = 0; i < NUM_INSERTS; i++) {
|
||||
|
||||
(*(lsn_t*)(arg+1)) = i;
|
||||
LogEntry * e = allocUpdateLogEntry(-1, -1, OPERATION_LINEAR_HASH_INSERT, INVALID_PAGE,
|
||||
LogEntry * e = allocUpdateLogEntry(stasis_log(), -1, -1, OPERATION_LINEAR_HASH_INSERT, INVALID_PAGE,
|
||||
sizeof(linearHash_remove_arg) + sizeof(lsn_t) + sizeof(char));
|
||||
memcpy(stasis_log_entry_update_args_ptr(e), arg, sizeof(linearHash_remove_arg) + sizeof(lsn_t) + sizeof(char));
|
||||
|
||||
ThashInsert(xid, hash, (byte*)&i, sizeof(lsn_t), (byte*)e, sizeofLogEntry(0, e));
|
||||
|
||||
|
||||
free(e);
|
||||
freeLogEntry(stasis_log(), e);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ START_TEST(operation_physical_do_undo) {
|
|||
|
||||
|
||||
// XXX fails; set log format has changed
|
||||
setToTwo = allocUpdateLogEntry(-1, xid, OPERATION_SET, rid.page,
|
||||
setToTwo = allocUpdateLogEntry(stasis_log(), -1, xid, OPERATION_SET, rid.page,
|
||||
sizeof(slotid_t) + sizeof(int64_t) + 2 * sizeof(int));
|
||||
memcpy(stasis_log_entry_update_args_ptr(setToTwo), arg, sizeof(slotid_t) + sizeof(int64_t) + 2 * sizeof(int));
|
||||
|
||||
|
@ -181,7 +181,7 @@ START_TEST(operation_physical_do_undo) {
|
|||
// XXX This is a hack to put some stuff in the log. Otherwise, Tdeinit() fails.
|
||||
for(int i = 0; i < 10; i++)
|
||||
((stasis_log_t*)stasis_log())->write_entry(stasis_log(),
|
||||
allocCommonLogEntry(-1, -1, -1));
|
||||
allocCommonLogEntry(stasis_log(), -1, -1, -1));
|
||||
|
||||
/** @todo need to re-think check_operations. The test is pretty broken. */
|
||||
Tcommit(xid);
|
||||
|
|
|
@ -82,7 +82,6 @@ int main() {
|
|||
|
||||
free(s);
|
||||
}
|
||||
freeLogEntry(le);
|
||||
}
|
||||
freeLogHandle(lh);
|
||||
|
||||
|
|
Loading…
Reference in a new issue