Changed CLR log format so that it is identical to update entries. The old format pointed back to an earlier

entry in the log, complicating truncation significantly.  The new format copies log entries into their CLRs,
and is somewhat redundant, as redo and undo information is stored in the CLR, but the undo information will
never be used...
This commit is contained in:
Sears Russell 2007-04-03 09:18:45 +00:00
parent 4f47613fe5
commit 362b1036de
13 changed files with 270 additions and 271 deletions

View file

@ -48,14 +48,8 @@ terms specified in this license.
BEGIN_C_DECLS
/**
@file
@file structs and memory managment routines for log entries
Next generation logger api.
@todo Was getting some memory over-runs from the fact that I didn't
know the exact length of a raw log entry. This seems to be fixed
now.
@todo Is there a better way to deal with sizeof() and log entries?
@todo Other than some typedefs, is there anything in logEntry that belongs in the API?
@ -65,17 +59,10 @@ BEGIN_C_DECLS
$Id$
*/
typedef struct {
lsn_t thisUpdateLSN;
recordid rid;
lsn_t undoNextLSN;
} CLRLogEntry;
typedef struct {
unsigned int funcID : 8;
recordid rid;
unsigned int argSize;
/* int invertible; */ /* no longer needed */
/* Implicit members:
args; @ ((byte*)ule) + sizeof(UpdateLogEntry)
preImage; @ ((byte*)ule) + sizeof(UpdateLogEntry) + ule.argSize */
@ -88,30 +75,27 @@ struct __raw_log_entry {
unsigned int type;
};
/*#define sizeofRawLogEntry (sizeof(lsn_t)*2+sizeof(int)+4)*/
typedef struct {
lsn_t LSN;
lsn_t prevLSN;
int xid;
unsigned int type;
union {
UpdateLogEntry update;
CLRLogEntry clr;
} contents;
UpdateLogEntry update;
} LogEntry;
/**
All of these return a pointer to a single malloced region that should be freed.
*/
Allocate a log entry that does not contain any extra payload
information. (Eg: Tbegin, Tcommit, etc.)
/**
Allocate a log entry that does not contain any extra payload information. (Eg: Tbegin, Tcommit, etc.)
@return a LogEntry that should be freed with free().
*/
LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type);
/**
Allocate a log entry associated with an operation implemention. This
is usually called inside of Tupdate().
@return a LogEntry that should be freed with free().
*/
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
unsigned int operation, recordid rid,
@ -121,25 +105,36 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
Alloc a deferred log entry. This is just like allocUpdateLogEntry(), except
the log entry's type will be DEFERLOG instead UPDATELOG. This is usually
called inside of Tdefer().
@return a LogEntry that should be freed with free().
*/
LogEntry * allocDeferredLogEntry(lsn_t prevLSN, int xid,
unsigned int operation, recordid rid,
const byte * args, unsigned int argSize,
const byte * preImage);
/**
Allocate a CLR entry. These are written during recovery to
indicate that the stable copy of the store file reflects the state
of the database after an operation has successfuly been
redone/undone.
*/
LogEntry * allocCLRLogEntry (lsn_t prevLSN, int xid,
lsn_t thisUpdateLSN, recordid rid, lsn_t undoNextLSN);
Allocate a CLR entry. These are written during recovery as log
entries are undone. This moves undo operations into the redo
phase, by recording the inverse of the original operation, and sets
prevLSN to the prevLSN of old_e.
long sizeofLogEntry(const LogEntry * log);
const byte * getUpdateArgs(const LogEntry * log);
const byte * getUpdatePreImage(const LogEntry * log);
@return a LogEntry that should be freed with free().
*/
LogEntry * allocCLRLogEntry(const LogEntry * old_e);
/**
@return the length, in bytes, of e.
*/
long sizeofLogEntry(const LogEntry * e);
/**
@return the operation's arguments.
*/
const byte * getUpdateArgs(const LogEntry * e);
/**
@return the undo information for operations that use record-based
phsysical undo.
*/
const byte * getUpdatePreImage(const LogEntry * e);
END_C_DECLS

View file

@ -63,8 +63,6 @@ terms specified in this license.
typedef int (guard_fcn_t)(const LogEntry *, void *);
typedef struct {
/** The LSN of the last log entry returned.*/
/* lsn_t file_offset; */ /* Unneeded? */
/** The LSN of the log entry that we would return if next is called. */
lsn_t next_offset;
/** The LSN of the log entry that we would return if previous is called. */
@ -93,7 +91,8 @@ typedef struct {
(eg: gcc ... -DUSE_LOGGER=LOG_TO_FOO)
@see constants.h for a list of recognized log implementations. (The constants are named LOG_TO_*)
@see constants.h for a list of recognized log implementations.
(The constants are named LOG_TO_*)
*/
extern int loggerType;
@ -130,7 +129,8 @@ lsn_t LogNextEntry(const LogEntry * e);
TransactionLog LogTransBegin(int xid);
/**
Write a transaction COMMIT to the log tail. Blocks until the commit record is stable.
Write a transaction COMMIT to the log tail. Blocks until the commit
record is stable.
@return The lsn of the commit log entry.
*/
@ -148,7 +148,7 @@ lsn_t LogTransAbort(TransactionLog * l);
its operation argument to the extent necessary for allocating and laying out
the log entry. Finally, it updates the state of the parameter l.
*/
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args);
/**
LogDeferred writes a DEFERLOG log record to the log tail
@ -156,14 +156,15 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
@see LogUpdate is analagous to this function, but wrutes UPDATELOG entries
instead.
*/
LogEntry * LogDeferred(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args);
LogEntry * LogDeferred(TransactionLog * l, Page * p, recordid rid,
int operation, const byte * args);
/**
Any LogEntry that is returned by a function in logger2.h or
logHandle.h should be freed using this function.
@param e The log entry to be freed. (The "const" here is a hack that allows LogReadLSN to return a const *.
@param e The log entry to be freed. (The "const" here is a hack
that allows LogReadLSN to return a const *.
*/
void FreeLogEntry(const LogEntry * e);
@ -173,10 +174,12 @@ void FreeLogEntry(const LogEntry * e);
record the completion of undo operations, amongst other things.
@return the lsn of the CLR entry that was written to the log.
(Needed so that the lsn slot of the page in question can be
updated.)
(Needed so that the lsn slot of the page in question can be
updated.)
*/
lsn_t LogCLR(int xid, lsn_t LSN, recordid rid, lsn_t prevLSN);
lsn_t LogCLR(const LogEntry * e);
lsn_t LogDummyCLR(int xid, lsn_t prevLSN);
/**
Write a end transaction record @see XEND
@ -191,7 +194,8 @@ void LogEnd (TransactionLog * l);
long LoggerSizeOfInternalLogEntry(const LogEntry * e);
/**
For internal use only... This would be static, but it is called by the test cases.
For internal use only... This would be static, but it is called by
the test cases.
*/
void LogWrite(LogEntry * e);

View file

@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
@ -42,39 +42,47 @@ terms specified in this license.
#include <config.h>
#include <lladd/common.h>
#include <page.h> // For physical_slot_length()
#include <lladd/transactional.h>
#include <assert.h>
#include <lladd/operations.h>
#include <lladd/logger/logger2.h>
#include "page.h" // For physical_slot_length()
#include <lladd/logger/logger2.h> // needed for LoggerSizeOfInternalLogEntry()
#include <lladd/logger/logEntry.h>
LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) {
LogEntry * ret = malloc(sizeof(struct __raw_log_entry));
ret->LSN = -1;
ret->prevLSN = prevLSN;
ret->xid = xid;
ret->type = type;
//printf ("logEntry.c: Log entry length is %ld\n", sizeof(struct __raw_log_entry));
return ret;
}
const byte * getUpdateArgs(const LogEntry * ret) {
assert(ret->type == UPDATELOG || ret->type == DEFERLOG);
if(ret->contents.update.argSize == 0) {
assert(ret->type == UPDATELOG ||
ret->type == DEFERLOG ||
ret->type == CLRLOG);
if(ret->update.argSize == 0) {
return NULL;
} else {
return ((byte*)ret) + sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry);
return ((byte*)ret) +
sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry);
}
}
const byte * getUpdatePreImage(const LogEntry * ret) {
assert(ret->type == UPDATELOG || ret->type == DEFERLOG);
if(operationsTable[ret->contents.update.funcID].undo != NO_INVERSE &&
operationsTable[ret->contents.update.funcID].undo != NO_INVERSE_WHOLE_PAGE) {
assert(ret->type == UPDATELOG ||
ret->type == DEFERLOG ||
ret->type == CLRLOG);
if(operationsTable[ret->update.funcID].undo != NO_INVERSE &&
operationsTable[ret->update.funcID].undo != NO_INVERSE_WHOLE_PAGE) {
return NULL;
} else {
return ((byte*)ret) + sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + ret->contents.update.argSize;
return ((byte*)ret) +
sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry) +
ret->update.argSize;
}
}
@ -88,50 +96,55 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
/** Use calloc since the struct might not be packed in memory;
otherwise, we'd leak uninitialized bytes to the log. */
LogEntry * ret = calloc(1, sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + argSize +
((!invertible) ? physical_slot_length(rid.size) : 0) + (whole_page_phys ? PAGE_SIZE : 0));
LogEntry * ret = calloc(1, sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry) + argSize +
((!invertible) ? physical_slot_length(rid.size)
: 0) +
(whole_page_phys ? PAGE_SIZE
: 0));
ret->LSN = -1;
ret->prevLSN = prevLSN;
ret->xid = xid;
ret->type = UPDATELOG;
ret->contents.update.funcID = funcID;
ret->contents.update.rid = rid;
ret->contents.update.argSize = argSize;
ret->update.funcID = funcID;
ret->update.rid = rid;
ret->update.argSize = argSize;
if(argSize) {
memcpy((void*)getUpdateArgs(ret), args, argSize);
}
if(!invertible) {
memcpy((void*)getUpdatePreImage(ret), preImage, physical_slot_length(rid.size));
memcpy((void*)getUpdatePreImage(ret), preImage,
physical_slot_length(rid.size));
}
if(whole_page_phys) {
memcpy((void*)getUpdatePreImage(ret), preImage, PAGE_SIZE);
memcpy((void*)getUpdatePreImage(ret), preImage,
PAGE_SIZE);
}
return ret;
}
LogEntry * allocDeferredLogEntry(lsn_t prevLSN, int xid,
unsigned int funcID, recordid rid,
const byte * args, unsigned int argSize,
const byte * preImage) {
LogEntry * ret = allocUpdateLogEntry(prevLSN, xid, funcID, rid, args, argSize,
preImage);
LogEntry * ret = allocUpdateLogEntry(prevLSN, xid, funcID, rid,
args, argSize, preImage);
ret->type = DEFERLOG;
return ret;
}
LogEntry * allocCLRLogEntry (lsn_t prevLSN, int xid,
lsn_t thisUpdateLSN, recordid rid, lsn_t undoNextLSN) {
LogEntry * ret = malloc(sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry));
ret->LSN = -1;
ret->prevLSN = prevLSN;
ret->xid = xid;
ret->type = CLRLOG;
LogEntry * allocCLRLogEntry(const LogEntry * old_e) {
ret->contents.clr.thisUpdateLSN = thisUpdateLSN;
ret->contents.clr.rid = rid;
ret->contents.clr.undoNextLSN = undoNextLSN;
// Could handle other types, but we should never encounter them here.
assert(old_e->type == UPDATELOG);
LogEntry * ret = malloc(sizeofLogEntry(old_e));
memcpy(ret, old_e, sizeofLogEntry(old_e));
ret->LSN = -1;
// prevLSN is OK already
// xid is OK already
ret->type = CLRLOG;
// update is also OK
return ret;
}
@ -141,12 +154,15 @@ LogEntry * allocCLRLogEntry (lsn_t prevLSN, int xid,
long sizeofLogEntry(const LogEntry * log) {
switch (log->type) {
case CLRLOG:
return sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry);
case UPDATELOG:
case DEFERLOG:
return sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + log->contents.update.argSize +
((operationsTable[log->contents.update.funcID].undo == NO_INVERSE) ? physical_slot_length(log->contents.update.rid.size) : 0) +
((operationsTable[log->contents.update.funcID].undo == NO_INVERSE_WHOLE_PAGE) ? PAGE_SIZE : 0) ;
case DEFERLOG: {
int undoType = operationsTable[log->update.funcID].undo;
return sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry) + log->update.argSize +
((undoType == NO_INVERSE) ? physical_slot_length(log->update.rid.size)
: 0) +
((undoType == NO_INVERSE_WHOLE_PAGE) ? PAGE_SIZE : 0);
}
case INTERNALLOG:
return LoggerSizeOfInternalLogEntry(log);
default:

View file

@ -40,10 +40,12 @@ permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include "logHandle.h"
#include <config.h>
#include <stdlib.h>
#include "logHandle.h"
/**
Sets the next and prev field of h, but does not set h.file_offset.
That should probably be set before calling this function.
@ -117,7 +119,6 @@ const LogEntry * previousInTransaction(LogHandle * h) {
*/
static void set_offsets(LogHandle * h, const LogEntry * e) {
h->next_offset = LogNextEntry(e);
h->prev_offset = (e->type==CLRLOG) ? e->contents.clr.undoNextLSN : e->prevLSN ;
h->prev_offset = e->prevLSN;
}

View file

@ -47,16 +47,16 @@ terms specified in this license.
@todo Switch logger2 to use function pointers?
*/
#include <stdio.h>
#include <assert.h>
#include <config.h>
#include <lladd/common.h>
#include <stdio.h>
#include <assert.h>
#include <lladd/logger/logger2.h>
#include "logWriter.h"
#include "inMemoryLog.h"
#include "page.h"
#ifdef USE_LOGGER
@ -75,7 +75,8 @@ long LoggerSizeOfInternalLogEntry(const LogEntry * e) {
} else if (loggerType == LOG_TO_MEMORY) {
return sizeofInternalLogEntry_InMemoryLog(e);
} else {
abort(); // we dont have an appropriate implementation, or weren't initialized...
// we dont have an appropriate implementation, or weren't initialized...
abort();
}
}
@ -352,16 +353,22 @@ LogEntry * LogDeferred(TransactionLog * l, Page * p, recordid rid, int operation
return LogAction(l, p, rid, operation, args, 1); // 1 -> deferred
}
lsn_t LogCLR(int xid, lsn_t LSN, recordid rid, lsn_t prevLSN) {
lsn_t ret;
LogEntry * e = allocCLRLogEntry(-1, xid, LSN, rid, prevLSN);
lsn_t LogCLR(const LogEntry * old_e) {
LogEntry * e = allocCLRLogEntry(old_e);
LogWrite(e);
DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid,
e->LSN, LSN, prevLSN);
ret = e->LSN;
lsn_t ret = e->LSN;
FreeLogEntry(e);
return ret;
}
lsn_t LogDummyCLR(int xid, lsn_t prevLSN) {
LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP,
NULLRID, NULL, 0, 0);
lsn_t ret = LogCLR(e);
FreeLogEntry(e);
return ret;
}

View file

@ -130,7 +130,7 @@ void multiplexHashLogByKey(byte * key,
// We don't care what the key is. It's probably an LSN.
const LogEntry * log = (const LogEntry*) value;
const byte * updateArgs = getUpdateArgs(log); // assume the log is a logical update entry.
switch(log->contents.update.funcID) {
switch(log->update.funcID) {
// If you really want to know why insert takes
// linearHash_remove_arg entries and vice versa, look at

View file

@ -39,39 +39,37 @@ authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include <lladd/operations.h>
#include <config.h>
#include <lladd/common.h>
#include <lladd/logger/logger2.h>
#include <lladd/bufferManager.h>
#include <assert.h>
#include <string.h>
/** @todo questionable include */
#include <stdio.h>
#include <lladd/operations.h>
#include <lladd/logger/logger2.h>
#include <lladd/bufferManager.h>
#include "page.h"
#include <stdio.h>
Operation operationsTable[MAX_OPERATIONS];
void doUpdate(const LogEntry * e, Page * p) {
DEBUG("OPERATION update arg length %d, lsn = %ld\n",
e->contents.update.argSize, e->LSN);
DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN);
operationsTable[e->contents.update.funcID].run(e->xid, p, e->LSN, e->contents.update.rid, getUpdateArgs(e));
operationsTable[e->update.funcID].run(e->xid, p, e->LSN,
e->update.rid, getUpdateArgs(e));
}
/**
@todo redoUpdate()'s CLR handling is messy, at best; broken at worst.
*/
void redoUpdate(const LogEntry * e) {
if(e->type == UPDATELOG) {
/* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */
recordid rid = e->contents.update.rid;
recordid rid = e->update.rid;
Page * p;
lsn_t pageLSN;
try {
if(operationsTable[e->contents.update.funcID].sizeofData == SIZEIS_PAGEID) {
if(operationsTable[e->update.funcID].sizeofData == SIZEIS_PAGEID) {
p = NULL;
pageLSN = 0;
} else {
@ -81,28 +79,29 @@ void redoUpdate(const LogEntry * e) {
} end;
if(e->LSN > pageLSN) {
DEBUG("OPERATION Redo, %ld > %ld {%d %d %ld}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size);
// doUpdate(e, p);
// Need to check the id field to find out what the _REDO_ action is for this log type.
// contrast with doUpdate(), which doesn't use the .id field.
operationsTable[operationsTable[e->contents.update.funcID].id]
.run(e->xid, p, e->LSN, e->contents.update.rid, getUpdateArgs(e));
DEBUG("OPERATION Redo, %ld > %ld {%d %d %ld}\n",
e->LSN, pageLSN, rid.page, rid.slot, rid.size);
// Need to check the id field to find out what the _REDO_ action
// is for this log type. contrast with doUpdate(), which
// doesn't use the .id field.
operationsTable[operationsTable[e->update.funcID].id]
.run(e->xid, p, e->LSN, e->update.rid, getUpdateArgs(e));
} else {
DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %ld}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size);
DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %ld}\n",
e->LSN, pageLSN, rid.page, rid.slot, rid.size);
}
if(p) {
releasePage(p);
}
} else if(e->type == CLRLOG) {
const LogEntry * f = LogReadLSN(e->contents.clr.thisUpdateLSN);
recordid rid = f->contents.update.rid;
recordid rid = e->update.rid;
Page * p = NULL;
lsn_t pageLSN;
int isNullRid = !memcmp(&rid, &NULLRID, sizeof(recordid));
if(!isNullRid) {
if(operationsTable[f->contents.update.funcID].sizeofData == SIZEIS_PAGEID) {
if(operationsTable[e->update.funcID].sizeofData == SIZEIS_PAGEID) {
p = NULL;
pageLSN = 0;
} else {
@ -112,68 +111,72 @@ void redoUpdate(const LogEntry * e) {
} end;
}
}
//assert(rid.page == e->contents.update.rid.page); /* @todo Should this always hold? */
/* See if the page contains the result of the undo that this CLR
is supposed to perform. If it doesn't, or this was a logical
operation, then undo the original operation. */
if(isNullRid || f->LSN > pageLSN) {
DEBUG("OPERATION Undoing for clr, %ld {%d %d %ld}\n", f->LSN, rid.page, rid.slot, rid.size);
undoUpdate(f, p, e->LSN);
/* See if the page contains the result of the undo that this CLR
is supposed to perform. If it doesn't, or this was a logical
operation, then undo the original operation. */
if(isNullRid || e->LSN > pageLSN) {
DEBUG("OPERATION Undoing for clr, %ld {%d %d %ld}\n",
e->LSN, rid.page, rid.slot, rid.size);
undoUpdate(e, p, e->LSN);
} else {
DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %ld}\n", f->LSN, rid.page, rid.slot, rid.size);
DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %ld}\n",
e->LSN, rid.page, rid.slot, rid.size);
}
if(p) {
releasePage(p);
}
FreeLogEntry(f);
} else {
assert(0);
abort();
}
}
void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
int undo = operationsTable[e->contents.update.funcID].undo;
DEBUG("OPERATION FuncID %d Undo op %d LSN %ld\n",e->contents.update.funcID, undo, clr_lsn);
int undo = operationsTable[e->update.funcID].undo;
DEBUG("OPERATION FuncID %d Undo op %d LSN %ld\n",
e->update.funcID, undo, clr_lsn);
#ifdef DEBUGGING
recordid rid = e->contents.update.rid;
recordid rid = e->update.rid;
#endif
/* lsn_t page_lsn = readLSN(e->contents.update.rid.page); */
lsn_t page_lsn = -1;
if(p) {
page_lsn = pageReadLSN(p);
}
if(e->LSN <= page_lsn || !p) {
/* Actually execute the undo */
// Actually execute the undo
if(undo == NO_INVERSE) {
/* Physical undo */
assert(p); // Must be provided wiht a page in order to perform a physical undo!
DEBUG("OPERATION %d Physical undo, %ld {%d %d %ld}\n", undo, e->LSN, e->contents.update.rid.page, e->contents.update.rid.slot, e->contents.update.rid.size);
writeRecord(e->xid, p, clr_lsn, e->contents.update.rid, getUpdatePreImage(e));
DEBUG("OPERATION %d Physical undo, %ld {%d %d %ld}\n", undo, e->LSN,
e->update.rid.page, e->contents.rid.slot, e->update.rid.size);
assert(p);
writeRecord(e->xid, p, clr_lsn, e->update.rid, getUpdatePreImage(e));
} else if(undo == NO_INVERSE_WHOLE_PAGE) {
DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN,
e->update.rid.page);
assert(p);
DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN, e->contents.update.rid.page);
memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE);
pageWriteLSN(e->xid, p, clr_lsn);
} else {
/* @see doUpdate() */
/* printf("Logical undo"); fflush(NULL); */
DEBUG("OPERATION %d Logical undo, %ld {%d %d %ld}\n", undo, e->LSN, e->contents.update.rid.page, e->contents.update.rid.slot, e->contents.update.rid.size);
operationsTable[undo].run(e->xid, p, clr_lsn, e->contents.update.rid, getUpdateArgs(e));
DEBUG("OPERATION %d Logical undo, %ld {%d %d %ld}\n", undo, e->LSN,
e->update.rid.page, e->update.rid.slot, e->update.rid.size);
operationsTable[undo].run(e->xid, p, clr_lsn, e->update.rid,
getUpdateArgs(e));
}
} else {
DEBUG("OPERATION %d Skipping undo, %ld {%d %d %ld}\n", undo, e->LSN, e->contents.update.rid.page, e->contents.update.rid.slot, e->contents.update.rid.size);
DEBUG("OPERATION %d Skipping undo, %ld {%d %d %ld}\n", undo, e->LSN,
e->update.rid.page, e->update.rid.slot, e->update.rid.size);
}
/* printf("Undo done."); fflush(NULL); */
}

View file

@ -56,13 +56,9 @@ terms specified in this license.
/** @todo Remove extern declaration of transactional_2_mutex from nestedTopActions.c */
extern pthread_mutex_t transactional_2_mutex;
/*#include <lladd/bufferManager.h>*/
extern TransactionLog XactionTable[];
pblHashTable_t * nestedTopActions = NULL;
/** @todo this really should be set somewhere globally. */
void initNestedTopActions() {
nestedTopActions = pblHtCreate();
@ -93,11 +89,7 @@ void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
/**
Call this function at the end of a nested top action.
@return the lsn of the CLR. Most users (everyone?) will ignore this.
@todo LogCLR()'s API is useless. Make it private, and implement a better
public version. (Then rewrite TendNestedTopAction)
*/
lsn_t TendNestedTopAction(int xid, void * handle) {
@ -110,12 +102,9 @@ lsn_t TendNestedTopAction(int xid, void * handle) {
}
assert(xid >= 0);
// This action wasn't really undone -- This is a nested top action!
lsn_t undoneLSN = XactionTable[xid % MAX_TRANSACTIONS].prevLSN;
recordid undoneRID = NULLRID; // Not correct, but this field is unused anyway. ;)
// Write a CLR.
lsn_t clrLSN = LogCLR(xid, undoneLSN, undoneRID, *prevLSN);
lsn_t clrLSN = LogDummyCLR(xid, *prevLSN);
// Ensure that the next action in this transaction points to the CLR.
XactionTable[xid % MAX_TRANSACTIONS].prevLSN = clrLSN;

View file

@ -48,8 +48,10 @@ terms specified in this license.
#include <lladd/operations.h>
#include <lladd/logger/logger2.h>
#include <stdlib.h>
#include <assert.h>
recordid prepare_bogus_rec = { 0, 0, 0};
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
@ -88,7 +90,7 @@ int prepareGuard(const LogEntry * e, void * state) {
PrepareGuardState * pgs = state;
int ret = pgs->continueIterating;
if(e->type == UPDATELOG) {
if(e->contents.update.funcID == OPERATION_PREPARE) {
if(e->update.funcID == OPERATION_PREPARE) {
pgs->continueIterating = 0;
pgs->prevLSN = e->prevLSN;
}
@ -102,7 +104,8 @@ int prepareGuard(const LogEntry * e, void * state) {
return ret;
}
/** @todo When fleshing out the logHandle's prepareAction interface, figure out what the return value should mean... */
/** @todo When fleshing out the logHandle's prepareAction interface,
figure out what the return value should mean... */
int prepareAction(void * state) {
PrepareGuardState * pgs = state;
int ret;

View file

@ -1,35 +1,30 @@
/**
Replacement for recovery.c
A lot of refactoring has been done to simplify the contents of recovery.c
Hopefully, this file will be nice and clean. :)
@file Implements three phase recovery
*/
#include <config.h>
#include <lladd/common.h>
#include <lladd/recovery.h>
#include <pbl/pbl.h>
#include "linkedlist.h"
#include "logger/logHandle.h"
#include <lladd/bufferManager.h>
#include <lladd/lockManager.h>
#include "page.h" // Needed for pageReadLSN.
#include <lladd/transactional.h>
#include <stdio.h>
#include <assert.h>
#include <pbl/pbl.h>
/** @todo This include is an artifact of our lack of infrastructure to support log iterator guards. */
#include <lladd/recovery.h>
#include <lladd/bufferManager.h>
#include <lladd/lockManager.h>
/** @todo Add better log iterator guard support and remove this include.*/
#include <lladd/operations/prepare.h>
#include "logger/logHandle.h"
/** @todo Get rid of linkedlist.[ch] */
#include "linkedlist.h"
#include "page.h" // Needed for pageReadLSN.
static pblHashTable_t * transactionLSN;
static LinkedList * rollbackLSNs = NULL;
/** @todo There is no real reason to have this mutex (which prevents
@ -37,8 +32,6 @@ static LinkedList * rollbackLSNs = NULL;
from concurrent modifications. */
static pthread_mutex_t rollback_mutex = PTHREAD_MUTEX_INITIALIZER;
/**
Determines which transactions committed, and which need to be redone.
@ -54,7 +47,6 @@ static pthread_mutex_t rollback_mutex = PTHREAD_MUTEX_INITIALIZER;
the pages from being read later during recovery. Since this function
no longer reads the pages in, there's no longer any reason to build
the list of dirty pages.
*/
static void Analysis () {
@ -68,13 +60,12 @@ static void Analysis () {
int highestXid = 0;
/** @todo loadCheckPoint() - Jump forward in the log to the last
checkpoint. (Maybe getLogHandle should do this automatically,
checkpoint. (getLogHandle should do this automatically,
since the log will be truncated on checkpoint anyway.) */
while((e = nextInLog(&lh))) {
lsn_t * xactLSN = (lsn_t*)pblHtLookup(transactionLSN, &(e->xid), sizeof(int));
/* recordid rid = e->contents.update.rid; */
if(highestXid < e->xid) {
highestXid = e->xid;
@ -125,15 +116,13 @@ static void Analysis () {
pblHtRemove(transactionLSN, &(e->xid), sizeof(int));
break;
case UPDATELOG:
// XXX we should treat CLR's like REDO's, but things don't work
// that way yet.
case CLRLOG:
/*
If the last record we see for a transaction is an update or clr,
then the transaction must not have committed, so it must need
to be rolled back.
Add it to the appropriate list
Add it to the list
*/
DEBUG("Adding %ld\n", e->LSN);
@ -141,14 +130,13 @@ static void Analysis () {
addSortedVal(&rollbackLSNs, e->LSN);
break;
case XABORT:
/* If the last record we see for a transaction is an abort, then
the transaction didn't commit, and must be rolled back.
*/
// If the last record we see for a transaction is an abort, then
// the transaction didn't commit, and must be rolled back.
DEBUG("Adding %ld\n", e->LSN);
addSortedVal(&rollbackLSNs, e->LSN);
break;
case INTERNALLOG:
/* Created by the logger, just ignore it. */
// Created by the logger, just ignore it
// Make sure the log entry doesn't interfere with real xacts.
assert(e->xid == INVALID_XID);
break;
@ -165,8 +153,7 @@ static void Redo() {
const LogEntry * e;
while((e = nextInLog(&lh))) {
/* Check to see if this log entry is part of a transaction that needs to be redone. */
// Is this log entry part of a transaction that needs to be redone?
if(pblHtLookup(transactionLSN, &(e->xid), sizeof(int)) != NULL) {
// Check to see if this entry's action needs to be redone
switch(e->type) {
@ -181,7 +168,7 @@ static void Redo() {
} break;
case DEFERLOG:
{
//XXX deferred_push(e);
// XXX deferred_push(e);
} break;
case XCOMMIT:
{
@ -204,9 +191,8 @@ static void Redo() {
}
}
}
/**
XXX
@todo CLR handling seems to be broken for logical operations!
/**
@todo Guards shouldn't be hardcoded in Undo()
*/
static void Undo(int recovery) {
LogHandle lh;
@ -219,72 +205,66 @@ static void Undo(int recovery) {
prepare_guard_state = getPrepareGuardState();
DEBUG("Undoing LSN %ld\n", (long int)rollback);
if(recovery) {
/** @todo shouldn't be hardcoded here! */
lh = getGuardedHandle(rollback, &prepareGuard, prepare_guard_state);
} else {
/** @todo probably want guards that are run during normal operation. */
lh = getLSNHandle(rollback);
}
/* printf("e->prev_offset: %ld\n", e->prevLSN);
printf("prev_offset: %ld\n", lh.prev_offset); */
int thisXid = -1;
while((e = previousInTransaction(&lh))) {
thisXid = e->xid;
lsn_t this_lsn, clr_lsn;
/* printf("."); fflush(NULL); */
switch(e->type) {
case UPDATELOG:
{
/* Need write lock for undo.. (Why??) */
if(e->contents.update.rid.size != -1) {
// If the rid is valid, load the page for undoUpdate.
// undoUpdate checks the LSN before applying physical undos
Page * p = loadPage(thisXid, e->contents.update.rid.page);
Page * p = NULL;
if(e->update.rid.size != -1) {
p = loadPage(thisXid, e->update.rid.page);
// If this fails, something is wrong with redo or normal operation.
this_lsn= pageReadLSN(p);
/* Sanity check. If this fails, something is wrong with the
redo phase or normal operation. */
assert(e->LSN <= this_lsn);
/* Need to log a clr here. */
clr_lsn = LogCLR(e->xid, e->LSN, e->contents.update.rid, e->prevLSN);
/* Undo update is a no-op if the page does not reflect this
update, but it will write the new clr_lsn if necessary. */
undoUpdate(e, p, clr_lsn);
releasePage(p);
} else {
} else {
// The log entry is not associated with a particular page.
// (Therefore, it must be an idempotent logical log entry.)
clr_lsn = LogCLR(e->xid, e->LSN, e->contents.update.rid,
e->prevLSN);
undoUpdate(e, NULL, clr_lsn);
}
clr_lsn = LogCLR(e);
undoUpdate(e, p, clr_lsn);
if(p) {
releasePage(p);
}
break;
}
case DEFERLOG:
// The transaction is aborting, so it never committed. Therefore
// actions deferred to commit have never been applied; ignore this
// log entry.
break;
case CLRLOG:
/* Don't need to do anything special to handle CLR's.
Iterator will correctly jump to clr's previous undo record. */
break;
break;
case CLRLOG:
// Don't undo CLRs; they were undone during Redo
break;
case XABORT:
/* Since XABORT is a no-op, we can silentlt ignore it. (XABORT
records may be passed in by undoTrans.)*/
break;
// Since XABORT is a no-op, we can silentlt ignore it. XABORT
// records may be passed in by undoTrans.
break;
case XCOMMIT:
// Should never abort a transaction that contains a commit record
abort();
default:
printf
("Unknown log type to undo (TYPE=%d,XID= %d,LSN=%lld), skipping...\n",
e->type, e->xid, e->LSN);
break;
fflush(NULL);
abort();
}
FreeLogEntry(e);
}

View file

@ -62,7 +62,7 @@ START_TEST(rawLogEntryAlloc)
}
END_TEST
START_TEST(clrLogEntryAlloc)
/*START_TEST(clrLogEntryAlloc)
{
recordid rid = { 3, 4, 5 };
LogEntry * log = allocCLRLogEntry(200, 1, 7, rid, 8);
@ -81,7 +81,7 @@ START_TEST(clrLogEntryAlloc)
free(log);
}
END_TEST
END_TEST */
/** @test
@ -108,12 +108,12 @@ START_TEST(updateLogEntryAlloc)
assert(log->xid == 1);
assert(log->type == UPDATELOG);
assert(log->contents.update.funcID == OPERATION_SET);
assert(log->update.funcID == OPERATION_SET);
/* assert(log->contents.update.invertible == 0); */
assert(log->contents.update.rid.page == 3);
assert(log->contents.update.rid.slot == 4);
assert(log->contents.update.rid.size == 3*sizeof(int));
assert(log->contents.update.argSize == 3*sizeof(char));
assert(log->update.rid.page == 3);
assert(log->update.rid.slot == 4);
assert(log->update.rid.size == 3*sizeof(int));
assert(log->update.argSize == 3*sizeof(char));
assert(getUpdateArgs(log) != NULL);
assert(args[0] == ((char*)getUpdateArgs(log))[0]);
@ -148,12 +148,12 @@ START_TEST(updateLogEntryAllocNoExtras)
assert(log->xid == 1);
assert(log->type == UPDATELOG);
assert(log->contents.update.funcID == OPERATION_LHINSERT);
assert(log->update.funcID == OPERATION_LHINSERT);
/* assert(log->contents.update.invertible == 1); */
assert(log->contents.update.rid.page == 3);
assert(log->contents.update.rid.slot == 4);
assert(log->contents.update.rid.size == 3*sizeof(int));
assert(log->contents.update.argSize == 0);
assert(log->update.rid.page == 3);
assert(log->update.rid.slot == 4);
assert(log->update.rid.size == 3*sizeof(int));
assert(log->update.argSize == 0);
assert(getUpdateArgs(log) == NULL);
preImageCpy = (int*)getUpdatePreImage(log);
@ -175,7 +175,7 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */
tcase_add_test(tc, rawLogEntryAlloc);
tcase_add_test(tc, clrLogEntryAlloc);
// tcase_add_test(tc, clrLogEntryAlloc);
tcase_add_test(tc, updateLogEntryAlloc);
tcase_add_test(tc, updateLogEntryAllocNoExtras);

View file

@ -97,7 +97,8 @@ static void setup_log() {
LogWrite(e);
prevLSN = e->prevLSN;
LogEntry * g = allocCLRLogEntry(100, 1, 200, rid, 0); //prevLSN);
// LogEntry * g = allocCLRLogEntry(100, 1, 200, rid, 0); //prevLSN);
LogEntry * g = allocCLRLogEntry(e); // XXX will probably break
LogWrite(g);
assert (g->type == CLRLOG);

View file

@ -12,9 +12,9 @@ static char * logEntryToString(const LogEntry * le) {
switch(le->type) {
case UPDATELOG:
{
recordid rid = le->contents.clr.rid;
recordid rid = le->update.rid;
asprintf(&ret, "UPDATE\tlsn=%9lld\tprevlsn=%9lld\txid=%4d\trid={%8d %5d %5lld}\tfuncId=%3d\targSize=%9d\n", le->LSN, le->prevLSN, le->xid,
rid.page, rid.slot, (long long int)rid.size, le->contents.update.funcID, le->contents.update.argSize );
rid.page, rid.slot, (long long int)rid.size, le->update.funcID, le->update.argSize );
}
break;
@ -42,9 +42,9 @@ static char * logEntryToString(const LogEntry * le) {
break;
case CLRLOG:
{
recordid rid = le->contents.clr.rid;
asprintf(&ret, "CLR \tlsn=%9lld\tprevlsn=%9lld\txid=%4d\trid={%8d %5d %5lld}\tthisUpdateLSN=%9lld\tundoNextLSN=%9lld\n", le->LSN, le->prevLSN, le->xid,
rid.page, rid.slot, (long long int) rid.size, le->contents.clr.thisUpdateLSN, le->contents.clr.undoNextLSN );
recordid rid = le->update.rid;
asprintf(&ret, "CLR \tlsn=%9lld\tprevlsn=%9lld\txid=%4d\trid={%8d %5d %5lld}\n", le->LSN, le->prevLSN, le->xid,
rid.page, rid.slot, (long long int) rid.size );
}
break;
}