Implemented log truncation.

This commit is contained in:
Sears Russell 2006-04-14 03:45:26 +00:00
parent 11c311bc91
commit ef43d5e4ce
22 changed files with 145 additions and 92 deletions

View file

@ -80,6 +80,7 @@ typedef struct {
typedef struct {
int xid;
lsn_t prevLSN;
lsn_t recLSN;
LogHandle lh;
} TransactionLog;

View file

@ -6,7 +6,7 @@
#define RO 0
#define RW 1
Page * getPage(int pageid, int locktype);
//Page * getPage(int pageid, int locktype);
/**
Implements lladd's caching policy. Looks up pageid in the cache.
If pageid doesn't exist, then allocate a new slot for it. If

View file

@ -378,6 +378,10 @@ void Trevive(int xid, long lsn);
*/
void TsetXIDCount(int xid);
/**
This is used by log truncation.
*/
lsn_t transactions_minRecLSN();
END_C_DECLS

View file

@ -4,7 +4,8 @@ lib_LIBRARIES=liblladd.a
#liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a
# removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c
liblladd_a_SOURCES=crc32.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \
pageFile.c pageCache.c page.c bufferPool.c blobManager.c recovery2.c transactional2.c \
pageFile.c pageCache.c page.c bufferPool.c blobManager.c recovery2.c truncation.c \
transactional2.c \
lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\
logger/logEntry.c logger/logWriter.c logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \
logger/logMemory.c \

View file

@ -59,7 +59,7 @@ terms specified in this license.
#include <lladd/pageCache.h>
#include "pageFile.h"
#include <pbl/pbl.h>
#include <lladd/truncation.h>
static pblHashTable_t *activePages; /* page lookup */
/*static Page * activePagePtrs[MAX_BUFFER_SIZE];*/
@ -107,6 +107,7 @@ void bufDeinit() {
pblHtRemove( activePages, 0, 0 );
DEBUG("+");
pageWrite(p);
// dirtyPages_remove(p);
}
@ -150,7 +151,7 @@ int bufTransAbort(int xid, lsn_t lsn) {
return 0;
}
Page * getPage(int pageid, int locktype) {
static Page * getPage(int pageid, int locktype) {
Page * ret;
int spin = 0;
@ -232,6 +233,7 @@ Page * getPage(int pageid, int locktype) {
assert(ret != dummy_page);
if(ret->id != -1) {
pageWrite(ret);
// dirtyPages_remove(ret);
}
pageFree(ret, pageid);

View file

@ -53,6 +53,8 @@ terms specified in this license.
#include <lladd/bufferPool.h>
#include <assert.h>
#include <lladd/truncation.h>
/* TODO: Combine with buffer size... */
static int nextPage = 0;
static pthread_mutex_t pageMallocMutex;

View file

@ -237,7 +237,7 @@ int openLogWriter() {
int writeLogEntry(LogEntry * e) {
const long size = sizeofLogEntry(e);
const lsn_t size = sizeofLogEntry(e);
pthread_mutex_lock(&log_write_mutex);
@ -250,7 +250,7 @@ int writeLogEntry(LogEntry * e) {
lh = getLSNHandle(nextAvailableLSN);
while((le = nextInLog(&lh))) {
nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(long);;
nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);;
FreeLogEntry(le);
}
}
@ -260,9 +260,9 @@ int writeLogEntry(LogEntry * e) {
//printf ("\nLSN: %ld\n", e->LSN);
//fflush(stdout);
nextAvailableLSN += (size + sizeof(long));
nextAvailableLSN += (size + sizeof(lsn_t));
size_t nmemb = fwrite(&size, sizeof(long), 1, log);
size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, log);
if(nmemb != 1) {
if(feof(log)) { abort(); /* feof makes no sense here */ }
if(ferror(log)) {
@ -292,9 +292,10 @@ int writeLogEntry(LogEntry * e) {
void syncLog() {
lsn_t newFlushedLSN;
newFlushedLSN = ftell(log);
pthread_mutex_lock(&log_read_mutex);
newFlushedLSN = ftell(log) + global_offset;
pthread_mutex_unlock(&log_read_mutex);
// Wait to set the static variable until after the flush returns.
// Since we opened the logfile with O_SYNC, fflush() is sufficient.
@ -334,15 +335,15 @@ void closeLogWriter() {
void deleteLogWriter() {
remove(LOG_FILE);
}
long debug_lsn = -1;
lsn_t debug_lsn = -1;
static LogEntry * readLogEntry() {
LogEntry * ret = 0;
long size;
long entrySize;
lsn_t size;
lsn_t entrySize;
ssize_t bytesRead = read(roLogFD, &size, sizeof(long));
ssize_t bytesRead = read(roLogFD, &size, sizeof(lsn_t));
if(bytesRead != sizeof(long)) {
if(bytesRead != sizeof(lsn_t)) {
if(bytesRead == 0) {
// fprintf(stderr, "eof on log entry size\n");
// fflush(stderr);
@ -352,7 +353,7 @@ static LogEntry * readLogEntry() {
abort();
return (LogEntry*)LLADD_IO_ERROR;
} else {
fprintf(stderr, "short read from log. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", sizeof(long), bytesRead);
fprintf(stderr, "short read from log. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", sizeof(lsn_t), bytesRead);
fflush(stderr);
abort(); // really abort here. This code should attempt to piece together short log reads...
}
@ -376,8 +377,8 @@ static LogEntry * readLogEntry() {
} else {
printf("short read from log w/ lsn %ld. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", debug_lsn, size, bytesRead);
fflush(stderr);
long newSize = size - bytesRead;
long newBytesRead = read (roLogFD, ((byte*)ret)+bytesRead, newSize);
lsn_t newSize = size - bytesRead;
lsn_t newBytesRead = read (roLogFD, ((byte*)ret)+bytesRead, newSize);
printf("\nattempt to read again produced newBytesRead = %ld, newSize was %ld\n", newBytesRead, newSize);
fflush(stderr);
abort();
@ -396,16 +397,16 @@ LogEntry * readLSNEntry(lsn_t LSN) {
/** Because we use two file descriptors to access the log, we need
to flush the log write buffer before concluding we're at EOF. */
if(flushedLSN() <= LSN) {
if(flushedLSN() <= LSN && LSN < nextAvailableLSN) {
// fprintf(stderr, "Syncing log flushed = %d, requested = %d\n", flushedLSN(), LSN);
syncLog();
assert(flushedLSN() >= LSN);
// fprintf(stderr, "Synced log flushed = %d, requested = %d\n", flushedLSN(), LSN);
}
pthread_mutex_lock(&log_read_mutex);
assert(global_offset <= LSN);
debug_lsn = LSN;
off_t newPosition = lseek(roLogFD, LSN - global_offset, SEEK_SET);
@ -418,6 +419,7 @@ LogEntry * readLSNEntry(lsn_t LSN) {
}
ret = readLogEntry();
assert(ret || LSN >= nextAvailableLSN);
pthread_mutex_unlock(&log_read_mutex);
@ -431,7 +433,7 @@ int truncateLog(lsn_t LSN) {
const LogEntry * le;
LogHandle lh;
long size;
lsn_t size;
pthread_mutex_lock(&truncateLog_mutex);
@ -468,15 +470,18 @@ int truncateLog(lsn_t LSN) {
*/
pthread_mutex_lock(&log_write_mutex);
lh = getLSNHandle(LSN);
fflush(log);
lh = getLSNHandle(LSN);
lsn_t lengthOfCopiedLog = 0;
while((le = nextInLog(&lh))) {
size = sizeofLogEntry(le);
lengthOfCopiedLog += (size + sizeof(lsn_t));
myFwrite(&size, sizeof(lsn_t), tmpLog);
myFwrite(le, size, tmpLog);
FreeLogEntry(le);
}
assert(nextAvailableLSN == LSN + lengthOfCopiedLog);
fflush(tmpLog);
#ifdef HAVE_FDATASYNC
fdatasync(fileno(tmpLog));
@ -539,6 +544,7 @@ int truncateLog(lsn_t LSN) {
global_offset = LSN - sizeof(lsn_t);
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
pthread_mutex_unlock(&truncateLog_mutex);
@ -549,5 +555,8 @@ int truncateLog(lsn_t LSN) {
lsn_t firstLogEntry() {
assert(log);
return global_offset + sizeof(lsn_t);
pthread_mutex_lock(&log_read_mutex); // for global offset...
lsn_t ret = global_offset + sizeof(lsn_t);
pthread_mutex_unlock(&log_read_mutex);
return ret;
}

View file

@ -51,7 +51,7 @@ terms specified in this license.
#include <stdio.h>
#include <assert.h>
int loggerType = LOG_TO_MEMORY;
int loggerType = LOG_TO_FILE;
void genericLogWrite(LogEntry * e) {
if(loggerType == LOG_TO_FILE) {
@ -65,6 +65,7 @@ void genericLogWrite(LogEntry * e) {
}
int LogInit(int logType) {
assert(logType == loggerType);
if(LOG_TO_FILE == logType) {
openLogWriter();
} else if(LOG_TO_MEMORY == logType) {
@ -100,7 +101,14 @@ void LogForce(lsn_t lsn) {
abort();
}
void LogTruncate(lsn_t lsn) {
truncateLog(lsn);
if(LOG_TO_FILE == loggerType) {
truncateLog(lsn);
} else if(LOG_TO_MEMORY == loggerType) {
abort();
} else {
abort();
}
}
lsn_t LogFlushedLSN() {
@ -142,13 +150,13 @@ void FreeLogEntry(const LogEntry * e) {
abort();
}
TransactionLog LogTransBegin(int xid) {
TransactionLog tl;
tl.xid = xid;
DEBUG("Log Begin %d\n", xid);
tl.prevLSN = -1;
tl.recLSN = -1;
return tl;
}
@ -158,6 +166,7 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) {
genericLogWrite(e);
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
@ -281,7 +290,7 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
if(preImage) {
free(preImage);
}
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
return e;
}

View file

@ -89,13 +89,16 @@ terms specified in this license.
#include "page/slotted.h"
#include "page/fixed.h"
#include <lladd/bufferPool.h>
#include <lladd/truncation.h>
void pageWriteLSN(int xid, Page * page, lsn_t lsn) {
// if(!page->dirty) { // This assert belongs here, but would cause some hacked up unit tests to fail...
// assert(page->LSN < lsn);
// }
if(page->LSN < lsn) {
page->LSN = lsn;
*lsn_ptr(page) = page->LSN;
}
page->dirty = 1;
dirtyPages_add(page);
return;
}
@ -119,7 +122,6 @@ lsn_t pageReadLSN(const Page * page) {
void pageInit() {
bufferPoolInit();
slottedPageInit();
}
void pageDeInit() {

View file

@ -192,12 +192,21 @@ void pageInit();
void pageDeInit();
/**
* assumes that the page is already loaded in memory. It takes
* as a parameter a Page. The Page struct contains the new LSN and the page
* number to which the new LSN must be written to.
* assumes that the page is already loaded in memory. It takes as a
* parameter a Page. The Page struct contains the new LSN and the
* page number to which the new LSN must be written to. Furthermore,
* this function updates the dirtyPages table, if necessary. The
* dirtyPages table is needed for log truncation. (If the page->id is
* null, this function assumes the page is not in the buffer pool, and
* does not update dirtyPages. Similarly, if the page is already
* dirty, there is no need to udpate dirtyPages.
*
* @param page You must have a writelock on page before calling this function.
* @param lsn The new lsn of the page. If the new lsn is less than the page's current lsn, then the page's lsn will not be changed.
* @param page You must have a writelock on page before calling this
* function.
*
* @param lsn The new lsn of the page. If the new lsn is less than
* the page's current lsn, then the page's lsn will not be changed.
* If the page is clean, the new lsn must be greater than the old lsn.
*/
void pageWriteLSN(int xid, Page * page, lsn_t lsn);

View file

@ -1,5 +1,6 @@
#include "page/raw.h"
#include <lladd/logger/logger2.h>
#include <lladd/truncation.h>
/**
@todo Should rawPageInferMetadata set a page type in the Page
struct?
@ -15,7 +16,8 @@ byte* rawPageGetData(int xid, Page * p) {
void rawPageSetData(int xid, lsn_t lsn, Page * p) {
writelock(p->rwlatch, 255);
rawPageWriteLSN(xid, p, lsn);
p->dirty = 1;
// p->dirty = 1;
dirtyPages_add(p);
unlock(p->rwlatch);
return;
}

View file

@ -9,7 +9,7 @@
#include "pageFile.h"
#include <assert.h>
#include <lladd/logger/logger2.h>
#include <lladd/truncation.h>
#include <sys/types.h>
#include <sys/stat.h>
@ -119,6 +119,7 @@ void pageWrite(Page * ret) {
}
ret->dirty = 0;
dirtyPages_remove(ret);
pthread_mutex_unlock(&stable_mutex);
}

View file

@ -215,7 +215,8 @@ static void Undo(int recovery) {
/* Need write lock for undo.. (Why??) */
if(e->contents.update.rid.size != -1) {
Page * p = getPage(e->contents.update.rid.page, RO);
// Page * p = getPage(e->contents.update.rid.page, RO);
Page * p = loadPage(thisXid, e->contents.update.rid.page);
this_lsn= pageReadLSN(p); /* e->contents.update.rid.page); */

View file

@ -11,7 +11,7 @@
#include "page.h"
#include <lladd/logger/logger2.h>
#include <lladd/truncation.h>
#include <stdio.h>
#include <assert.h>
#include "page/indirect.h"
@ -92,15 +92,17 @@ void setupOperationsTable() {
int Tinit() {
pthread_mutex_init(&transactional_2_mutex, NULL);
numActiveXactions = 0;
setupOperationsTable();
dirtyPagesInit();
bufInit();
LogInit(loggerType);
try_ret(compensation_error()) {
pageOperationsInit();
} end_ret(compensation_error());
@ -116,6 +118,10 @@ int Tinit() {
InitiateRecovery();
truncationInit();
if(lladd_enableAutoTruncation) {
autoTruncate(); // should this be before InitiateRecovery?
}
return 0;
}
@ -208,20 +214,6 @@ compensated_function void Tupdate(int xid, recordid rid, const void *dat, int op
/** @todo For logical undo logs, grabbing a lock makes no sense! */
begin_action(releasePage, p) {
TupdateHelper(xid, rid, dat, op, p);
/* if(globalLockManager.writeLockPage) {
globalLockManager.writeLockPage(xid, rid.page);
}
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
} en d_action;
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
doUpdate(e, p);
releasePage(p);*/
} compensate;
}
@ -349,10 +341,11 @@ int Tdeinit() {
}
}
assert( numActiveXactions == 0 );
truncationDeinit();
ThashDeinit();
bufDeinit();
LogDeinit();
dirtyPagesDeinit();
return 0;
}
@ -384,3 +377,19 @@ void TsetXIDCount(int xid) {
xidCount = xid;
pthread_mutex_unlock(&transactional_2_mutex);
}
lsn_t transactions_minRecLSN() {
lsn_t minRecLSN = LogFlushedLSN();
pthread_mutex_lock(&transactional_2_mutex);
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(XactionTable[i].xid != INVALID_XTABLE_XID) {
lsn_t recLSN = XactionTable[i].recLSN;
if(recLSN != -1 && recLSN < minRecLSN) {
minRecLSN = recLSN;
}
}
}
pthread_mutex_unlock(&transactional_2_mutex);
return minRecLSN;
}

View file

@ -432,7 +432,7 @@ START_TEST(recoverBlob__crash) {
Tread(xid, rid, &j);
fail_unless(!memcmp(j,k,ARRAY_SIZE), NULL);
truncationDeinit();
simulateBufferManagerCrash();
closeLogWriter();
numActiveXactions = 0;
@ -519,7 +519,7 @@ START_TEST (recoverBlob__multiple_xacts) {
Tset(xid1, rid1,&k);
/*simulate crash */
truncationDeinit();
simulateBufferManagerCrash();
closeLogWriter();
numActiveXactions = 0;

View file

@ -207,7 +207,7 @@ START_TEST(indirectAccessDirect) {
}
Tcommit(xid);
Tdeinit();
} END_TEST
START_TEST(indirectAccessIndirect) {

View file

@ -81,7 +81,7 @@ START_TEST(linearHashNTAtest)
assert(val2->size == val2->slot * NUM_ENTRIES);
free(val2);
}
if(1) { return; }
// if(1) { return; }
Tcommit(xid);
printf("\n"); fflush(stdout);

View file

@ -53,25 +53,26 @@ terms specified in this license.
#include <sched.h>
#include <assert.h>
#include "../check_includes.h"
#include <lladd/truncation.h>
#define LOG_NAME "check_logWriter.log"
static int logType = LOG_TO_MEMORY;
//static int logType = LOG_TO_MEMORY;
static void setup_log() {
int i;
lsn_t prevLSN = -1;
int xid = 42;
deleteLogWriter();
lladd_enableAutoTruncation = 0;
Tinit();
LogDeinit();
deleteLogWriter();
// LogDeinit();
//deleteLogWriter();
// openLogWriter();
LogInit(logType);
// LogInit(logType);
for(i = 0 ; i < 1000; i++) {
LogEntry * e = allocCommonLogEntry(prevLSN, xid, XBEGIN);
@ -110,6 +111,7 @@ static void setup_log() {
FreeLogEntry (e);
FreeLogEntry (g);
}
// truncationDeinit();
}
/**
@ -140,9 +142,9 @@ START_TEST(logWriterTest)
setup_log();
// syncLog();
//closeLogWriter();
LogDeinit();
// LogDeinit();
// openLogWriter();
LogInit(logType);
// LogInit(logType);
h = getLogHandle();
/* LogReadLSN(sizeof(lsn_t)); */
@ -268,18 +270,21 @@ START_TEST(logWriterTruncate) {
} END_TEST
#define ENTRIES_PER_THREAD 2000
#define ENTRIES_PER_THREAD 200
pthread_mutex_t random_mutex;
lsn_t truncated_to = 4;
//pthread_mutex_t truncated_to_mutex = PTHREAD_MUTEX_INITIALIZER;
static void* worker_thread(void * arg) {
long key = *(int*)arg;
long i = 0;
int truncated_to = 4;
int lsns[ENTRIES_PER_THREAD];
lsn_t lsns[ENTRIES_PER_THREAD];
/* fail_unless(NULL != le, NULL); */
@ -289,6 +294,7 @@ static void* worker_thread(void * arg) {
int threshold;
long entry;
int needToTruncate = 0;
lsn_t myTruncVal = 0;
pthread_mutex_lock(&random_mutex);
threshold = (int) (2000.0*random()/(RAND_MAX+1.0));
@ -299,28 +305,20 @@ static void* worker_thread(void * arg) {
needToTruncate = 1;
if(lsns[i - 10] > truncated_to) {
truncated_to = lsns[i - 10];
myTruncVal = truncated_to;
}
}
}
pthread_mutex_unlock(&random_mutex);
/* fail_unless(threshold <= 100, NULL); */
if(needToTruncate) {
LogTruncate(myTruncVal);
assert(LogTruncationPoint() >= myTruncVal);
}
if(threshold < 3) {
if(i > 10) {
/* Truncate the log .15% of the time; result in a bit over 100 truncates per test run.*/
/* fail_unless(1, NULL); */
/*truncateLog(lsns[i - 10]);*/
//truncated_to = i - 10;
}
/* fail_unless(1, NULL); */
} else {
/* DEBUG("i = %d, le = %x\n", i, (unsigned int)le); */
/* fail_unless(1, NULL); */
le->xid = i+key;
genericLogWrite(le);
//printf("reportedLSN: %ld\n", le->LSN);
@ -330,9 +328,9 @@ static void* worker_thread(void * arg) {
/* fail_unless(1, NULL); */
pthread_mutex_lock(&random_mutex);
if(lsns[entry] > truncated_to && entry < i) {
pthread_mutex_unlock(&random_mutex);
/*printf("X %d\n", (LogReadLSN(lsns[entry])->xid == entry+key)); fflush(stdout); */
const LogEntry * e = LogReadLSN(lsns[entry]);
pthread_mutex_unlock(&random_mutex);
assert(e->xid == entry+key);
FreeLogEntry(e);
/* fail_unless(LogReadLSN(lsns[entry])->xid == entry+key, NULL); */
@ -395,7 +393,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, logWriterTest);
tcase_add_test(tc, logHandleColdReverseIterator);
/*tcase_add_test(tc, logWriterTruncate);*/
tcase_add_test(tc, logWriterTruncate);
tcase_add_test(tc, logWriterCheckWorker);
tcase_add_test(tc, logWriterCheckThreaded);

View file

@ -161,7 +161,7 @@ START_TEST(operation_physical_do_undo) {
/** @todo need to re-think check_operations. The test is pretty broken. */
Tdeinit();
return;
setToTwo->LSN = 10;
@ -237,7 +237,7 @@ START_TEST(operation_prepare) {
Tset(prepared, b, &two);
Tcommit(winner);
truncationDeinit();
simulateBufferManagerCrash();
// closeLogWriter();
LogDeinit();
@ -295,7 +295,7 @@ START_TEST(operation_prepare) {
Tset(prepared, b, &two);
Tcommit(winner);
truncationDeinit();
simulateBufferManagerCrash();
// closeLogWriter();
LogDeinit();

View file

@ -90,10 +90,12 @@ START_TEST(pageOpCheckRecovery) {
TpageDealloc(xid, pageid1);
TpageDealloc(xid, pageid2);
truncationDeinit();
simulateBufferManagerCrash();
closeLogWriter();
numActiveXactions = 0;
Tinit();
xid = Tbegin();

View file

@ -66,6 +66,7 @@ START_TEST(emptyIterator) {
abort();
}
Tcommit(xid);
Tdeinit();
} END_TEST
START_TEST(pagedListCheck) {

View file

@ -382,7 +382,7 @@ START_TEST(recovery_crash) {
Tread(xid, rid, &j);
fail_unless(j == 6, "Decrement not working?");
truncationDeinit();
simulateBufferManagerCrash();
closeLogWriter();
numActiveXactions = 0;
@ -462,7 +462,7 @@ START_TEST (recovery_multiple_xacts) {
Tset(xid1, rid1,&k);
/*simulate crash */
truncationDeinit();
simulateBufferManagerCrash();
closeLogWriter();
numActiveXactions = 0;