Logwriter can now handler partial log entries correctly (it ignores them)

Buffer Mananger no longer steals pages pre-maturely
Alloc is an operation, and correct.
Caching broken out into a new module
Wrote a utility to translate a log file into human-readable ascii.
This commit is contained in:
Sears Russell 2004-06-30 01:09:57 +00:00
parent c4fac084b8
commit af152bd08e
20 changed files with 756 additions and 367 deletions

View file

@ -1,5 +1,5 @@
EXTRA_DIST = reconf
SUBDIRS = src test
SUBDIRS = src test utilities
AM_CFLAGS = -g -Wall -pedantic
docs:

View file

@ -76,6 +76,7 @@ AC_CONFIG_FILES([Makefile
src/lladd/Makefile
src/pbl/Makefile
src/timing/Makefile
utilities/Makefile
test/2pc/Makefile
test/Makefile
test/cht/Makefile

View file

@ -94,21 +94,6 @@ terms specified in this license.
@todo need alloc + free...
// Calls for LLADD managed memory (returns pointers to LLADD's cache.)
// Read only access to record (requires an un-pinning)
// Cost with cache hit: pointer arithmetic.
// - map_t readMapRecord(rid, &((const void *));
// Map a page read / write. Pins the page, sets its lsn, and provides a pointer to the record:
// Cost with cache hit: pointer arithmetic, eventual disk flush.
// - map_t readWriteMapRecord(rid, &(void *));
// Unmap a mapped page so that it can be kicked.
// @param lsn can be 0 if this is a read-only mapping. Otherwise,
// it should be the LSN of the operation that calls unmapRecord.
// @todo What connection between the lock manager and this function
@ -220,12 +205,25 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat);
void readRecord(int xid, recordid rid, void *dat);
/**
* @param page write page to disk, including correct LSN
* Write page to disk, including correct LSN. Doing so may require a
* call to logSync(). There is not much that can be done to avoid
* this call right now. In the future, it might make sense to check
* to see if some other page can be kicked, in order to avoid the log
* flush.
*
* @param page The page to be flushed to disk.
* @return 0 on success
* @return error code on failure
*/
int flushPage(Page page);
/**
Read a page from disk.
@param A page struct, with id set correctly. The rest of this
struct is filled out by pageMap.
*/
void pageMap(Page * page);
/*
* this function does NOT write to disk, just drops the page from the active
* pages

View file

@ -58,6 +58,7 @@ terms specified in this license.
* never hurts to have more flushes to disk, as long as it doesn't hurt
* performance.
*
* @todo CORRECTNESS: We currently assume that there are no partial entries in the log!
* @todo Everything in this file cores on failure (no error handling yet)
* @todo All of the logWriter calls should be reentrant.
*
@ -83,75 +84,52 @@ int openLogWriter();
/**
@param e Pointer to a log entry. After the call, e->LSN will be set appropriately.
@param e Pointer to a log entry. After the call, e->LSN will be set appropriately. If e's xid is set to -1, then this call has no effect (and e's LSN will be set to -1.)
returns 0 on success, or an error code defined above
*/
int writeLogEntry(LogEntry * e);
/*
/**
flush the entire log (tail) that is currently in memory to disk
*/
void syncLog();
/*
/**
Return the highest LSN that is known to be on disk. (Currently, we
only know if an LSN is on disk if we've written that LSN before a
call to syncLog().
Note: This function might not return an LSN corresponding to a real
log entry, but it will definitely return one that is greater than
or equal to the LSN of a log entry that has been forced to disk,
and is less than the LSN of all log entries that might not have
been forced to disk.
*/
lsn_t flushedLSN();
/**
Close the log stream
*/
void closeLogWriter();
/*
Get the current position of the stream (in terms of bytes)
*/
/*long getFilePos();*/
/*
/**
Actually deletes the log file that may have been written to disk! Danger!!
Only use after calling closeLogStream AND you are sure there are no active (or
future active) transactions!
@todo This is in here now for completeness, but once we implement
log truncation, it should leave.
*/
void deleteLogWriter();
/*
* Returns the current position of the stream no matter where it is
*/
/*long streamPos();*/
/*
* Returns the position of the stream if it were to read.
*/
/*long writeStreamPos();*/
/*
* readLog reads a line from the log puts it in a string
*
* This was made static because it exports state that the interface
* should be hiding. (To use this function, the user must make
* assumptions regarding the value of the FILE's current offset.)
*
* returns the number of bytes read and put into buffer
* */
/*int readLog(byte **buffer);*/
/* LogEntry * readLogEntry(); */
/*
* seek to a position in the log file and read it into the buffer
*
* returns the number of bytes read and put into buffer
* */
/*int seekAndReadLog(long pos, byte **buffer);*/
/**
Read a log entry at a particular LSN.
@param the LSN of the entry that will be read.
*/
LogEntry * readLSNEntry(lsn_t LSN);
/* lsn_t nextLSN(); */
/*
* tell the current position in the log file
* */
/*long readPos ();
void seekInLog(long pos);*/
END_C_DECLS
#endif /* __LLADD_LOGGER_LOGWRITER_H */

View file

@ -129,7 +129,7 @@ void pageRealloc(Page *p, int id);
Page* pageAlloc(int id);
recordid pageSlotRalloc(Page page, recordid rid);
recordid pageSlotRalloc(Page page, lsn_t lsn, recordid rid);
int pageTest();

24
lladd/pageCache.h Normal file
View file

@ -0,0 +1,24 @@
#ifndef __PAGECACHE_H
#define __PAGECACHE_H
/**
Implements lladd's caching policy. Looks up pageid in the cache.
If pageid doesn't exist, then allocate a new slot for it. If
there are no new slots, then callback into bufferManager's
pageRead() function. Eventually, this could be extended to
support application specific caching schemes.
Currently, LLADD uses LRU-2S from Markatos "On Caching Searching
Engine Results"
If you would like to implement your own caching policy, implement
the three functions below. They are relatively straightforward.
Note that pageCache does not perform any file I/O of its own.
*/
#include <lladd/page.h>
void pageCacheInit();
void pageCacheDeinit();
Page * loadPagePtr(int pageid);
#endif

View file

@ -3,6 +3,6 @@
lib_LIBRARIES=liblladd.a
#liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a
# removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c
liblladd_a_SOURCES=bufferManager.c linkedlist.c operations.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/lladdhash.c operations/prepare.c operations/set.c operations/alloc.c
liblladd_a_SOURCES=bufferManager.c linkedlist.c operations.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/lladdhash.c operations/prepare.c operations/set.c operations/alloc.c
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -31,7 +31,7 @@ static void readRawRecord(int xid, recordid rid, void * buf, int size) {
readRecord(xid, blob_rec_rid, buf);
}
static void writeRawRecord(int xid, lsn_t lsn, recordid rid, const void * buf, int size) {
static void writeRawRecord(int xid, recordid rid, const void * buf, int size) {
recordid blob_rec_rid = rid;
blob_rec_rid.size = size;
Tset(xid, blob_rec_rid, buf);
@ -42,7 +42,7 @@ static void writeRawRecord(int xid, lsn_t lsn, recordid rid, const void * buf, i
/* moved verbatim from bufferManger.c, then hacked up to use FILE * instead of ints. */
void openBlobStore() {
int blobfd0, blobfd1;
if( ! (blobf0 = fopen(BLOB0_FILE, "w+"))) { /* file may not exist */
if( ! (blobf0 = fopen(BLOB0_FILE, "r+"))) { /* file may not exist */
if( (blobfd0 = creat(BLOB0_FILE, 0666)) == -1 ) { /* cannot even create it */
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
perror("Creating blob 0 file"); abort();
@ -51,7 +51,7 @@ void openBlobStore() {
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
perror(NULL); abort();
}
if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); }
if(!(blobf0 = fopen(BLOB0_FILE, "r+"))) { perror("Couldn't open or create blob 0 file"); abort(); }
}
DEBUG("blobf0 opened.\n");
@ -93,43 +93,34 @@ void closeBlobStore() {
pblHtDelete(dirtyBlobs);
}
static long myFseek(FILE * f, long offset, int whence) {
long myFseek(FILE * f, long offset, int whence) {
long ret;
if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); }
if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); }
return ret;
}
recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) {
recordid preAllocBlob(int xid, size_t blobSize) {
long fileSize = myFseek(blobf1, 0, SEEK_END);
blob_record_t blob_rec;
Page p;
char zero = 0;
/* char zero = 0; */
/* Allocate space for the blob entry. */
DEBUG("Allocing blob (size %d)\n", blobSize);
assert(blobSize > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
/** Needs to be in alloc.c */
recordid rid = Talloc(xid, sizeof(blob_record_t));
readRecord(xid, rid, &blob_rec);
/** Then in the blob file. @todo: BUG How can we get around doing a
force here? If the user allocates space and we crash, could we
double allocate space, since the file won't have grown. Could
we write a log entry with the new size? Alternatively, is
forcing the files before writing a commit to log enough?*/
/** @todo Should this be -1, not -2? Aren't we writing one byte after the end of the blob? */
myFseek(blobf0, fileSize + blobSize - 1, SEEK_SET);
myFseek(blobf1, fileSize + blobSize - 1, SEEK_SET);
if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); }
if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); }
/** Finally, fix up the fields in the record that points to the blob. */
/** Finally, fix up the fields in the record that points to the blob.
The rest of this also should go into alloc.c
*/
blob_rec.fd = 0;
blob_rec.size = blobSize;
@ -143,11 +134,47 @@ recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) {
/* Tset() needs to know to 'do the right thing' here, since we've
changed the size it has recorded for this record, and
writeRawRecord makes sure that that is the case. */
writeRawRecord (xid, lsn, rid, &blob_rec, sizeof(blob_record_t));
writeRawRecord (xid, rid, &blob_rec, sizeof(blob_record_t));
rid.size = blob_rec.size;
return rid;
}
void allocBlob(int xid, lsn_t lsn, recordid rid) {
long fileSize = myFseek(blobf1, 0, SEEK_END);
blob_record_t blob_rec;
Page p;
char zero = 0;
/* recordid rid = preAllocBlob(xid, blobSize); */
/* Allocate space for the blob entry. */
DEBUG("post Allocing blob (size %d)\n", rid.size);
assert(rid.size > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
p = loadPage(rid.page);
/* Read in record to get the correct offset, size for the blob*/
readRawRecord(xid, rid, &blob_rec, sizeof(blob_record_t));
/** Then in the blob file. @todo: BUG How can we get around doing a
force here? If the user allocates space and we crash, could we
double allocate space, since the file won't have grown. Could
we write a log entry with the new size? Alternatively, is
forcing the files before writing a commit to log enough?*/
/** @todo Should this be -1, not -2? Aren't we writing one byte after the end of the blob? */
myFseek(blobf0, fileSize + rid.size - 1, SEEK_SET);
myFseek(blobf1, fileSize + rid.size - 1, SEEK_SET);
if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); }
if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); }
}
static lsn_t * tripleHashLookup(int xid, recordid rid) {
@ -258,9 +285,20 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) {
FILE * fd;
int readcount;
/** @todo Dang. Blob allocation needs to go in three phases:
1) Alloc record (normal Talloc() before blob alloc)
2) Grab offset / size from blobManager, Tset() record from 1 with it.
3) Log it.
4) Upon recieving log entry, update static file length variable in blobManager. (Will fix double allocation race bug as well, but introduced record leaking on crash... Oh well.)
*/
DEBUG("Writing blob (size %d)\n", rid.size);
/* Tread() raw record */
readRawRecord(xid, rid, &rec, sizeof(blob_record_t));
assert(rec.size == rid.size);
if(dirty) {
assert(lsn > *dirty);
*dirty = lsn; /* Updates value in triple hash (works because of pointer aliasing.) */
@ -275,7 +313,7 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) {
rec.fd = rec.fd ? 0 : 1;
/* Tset() raw record */
writeRawRecord(xid, lsn, rid, &rec, sizeof(blob_record_t));
writeRawRecord(xid, rid, &rec, sizeof(blob_record_t));
}
fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */
@ -290,8 +328,13 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) {
/* No need to update the raw blob record. */
}
/** @todo check to see if commitBlobs actually needs to flush blob
files when it's called (are there any dirty blobs associated with
this transaction? */
void commitBlobs(int xid) {
fdatasync(fileno(blobf0));
fdatasync(fileno(blobf1));
abortBlobs(xid);
}

View file

@ -68,13 +68,17 @@ typedef struct {
unsigned fd : 1;
} blob_record_t;
recordid preAllocBlob(int xid, size_t blobsize);
/**
Allocate a blob of size blobSize.
@todo This function does not atomically allocate space in the blob
file.
*/
recordid allocBlob(int xid, lsn_t lsn, size_t blobSize);
void allocBlob(int xid, lsn_t lsn, recordid rid);
void openBlobStore();
void closeBlobStore();

View file

@ -63,17 +63,122 @@ terms specified in this license.
#include "blobManager.h"
static pblHashTable_t *activePages; /* page lookup */
static unsigned int bufferSize = 1; /* < MAX_BUFFER_SIZE */
static Page *repHead, *repMiddle, *repTail; /* replacement policy */
#include <lladd/pageCache.h>
static int stable = -1;
#include <lladd/logger/logWriter.h>
static void pageMap(Page *ret) {
static FILE * stable = NULL;
static unsigned int lastFreepage = 0;
/* ** File I/O functions ** */
/* Defined in blobManager.c, but don't want to export this in any public .h files... */
long myFseek(FILE * f, long offset, int whence);
void pageRead(Page *ret) {
long fileSize = myFseek(stable, 0, SEEK_END);
long pageoffset = ret->id * PAGE_SIZE;
long offset;
DEBUG("Reading page %d\n", ret->id);
if(!ret->memAddr) {
ret->memAddr = malloc(PAGE_SIZE);
}
assert(ret->memAddr);
if ((ret->id)*PAGE_SIZE >= fileSize) {
myFseek(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET);
if(1 != fwrite("", 1, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
offset = myFseek(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
if(1 != fread(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof reading!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error reading stream! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
void pageWrite(const Page * ret) {
long pageoffset = ret->id * PAGE_SIZE;
long offset = myFseek(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
assert(ret->memAddr);
DEBUG("Writing page %d\n", ret->id);
if(flushedLSN() < pageReadLSN(*ret)) {
DEBUG("pageWrite is calling syncLog()!\n");
syncLog();
}
if(1 != fwrite(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof writing!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error writing stream! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
static void openPageFile() {
int stable_fd;
DEBUG("Opening storefile.\n");
if( ! (stable = fopen(STORE_FILE, "r+"))) { /* file may not exist */
byte* zero = calloc(1, PAGE_SIZE);
DEBUG("Creating new page storefile.\n");
if( (stable_fd = creat(STORE_FILE, 0666)) == -1 ) { /* cannot even create it */
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
perror("Creating store file"); abort();
}
if( close(stable_fd)) {
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
perror(NULL); abort();
}
if(!(stable = fopen(STORE_FILE, "r+"))) { perror("Couldn't open or create store file"); abort(); }
/* Write out one page worth of zeros to get started. */
if(1 != fwrite(zero, PAGE_SIZE, 1, stable)) { assert (0); }
}
lastFreepage = 0;
DEBUG("storefile opened.\n");
}
static void closePageFile() {
int ret = fclose(stable);
assert(!ret);
stable = NULL;
}
void pageMap(Page *ret) {
pageRead(ret);
}
int flushPage(Page ret) {
pageWrite(&ret);
return 0;
}
/*
void pageMap(Page *ret) {
int fileSize;
/* this was lseek(stable, SEEK_SET, pageid*PAGE_SIZE), but changed to
lseek(stable, pageid*PAGE_SIZE, SEEK_SET) by jkit (Wed Mar 24 12:59:18 PST 2004)*/
/ * this was lseek(stable, SEEK_SET, pageid*PAGE_SIZE), but changed to
lseek(stable, pageid*PAGE_SIZE, SEEK_SET) by jkit (Wed Mar 24 12:59:18 PST 2004)* /
fileSize = lseek(stable, 0, SEEK_END);
if ((ret->id)*PAGE_SIZE >= fileSize) {
@ -87,12 +192,19 @@ static void pageMap(Page *ret) {
}
}
int flushPage(Page page) {
if( munmap(page.memAddr, PAGE_SIZE) )
return MEM_WRITE_ERROR;
return 0;
}
*/
int bufInit() {
Page *first;
bufferSize = 1;
stable = -1;
stable = NULL;
/* Create STORE_FILE, if necessary, then open it read/write
@ -104,204 +216,55 @@ int bufInit() {
Now, zero means uninitialized, so this could probably be replaced
with a call to open(... O_CREAT|O_RW) or something like that...
*/
if( (stable = open(STORE_FILE, O_RDWR, 0)) == -1 ) { /* file may not exist */
void *zero = mmap(0, PAGE_SIZE, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0); /* zero = /dev/zero */
if( (stable = creat(STORE_FILE, 0666)) == -1 ) { /* cannot even create it */
/* if( (stable = open(STORE_FILE, O_RDWR, 0)) == -1 ) { / * file may not exist * /
void *zero = mmap(0, PAGE_SIZE, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0); / * zero = /dev/zero * /
if( (stable = creat(STORE_FILE, 0666)) == -1 ) { / * cannot even create it * /
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
exit(errno);
}
/* kick off a fresh page */
if( write(stable, zero, PAGE_SIZE) != PAGE_SIZE ) { /* write zeros out */
/ * kick off a fresh page * /
if( write(stable, zero, PAGE_SIZE) != PAGE_SIZE ) { / * write zeros out * /
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
exit(errno);
}
if( close(stable) || ((stable = open(STORE_FILE, O_RDWR, 0)) == -1) ) { /* need to reopen with read perms */
if( close(stable) || ((stable = open(STORE_FILE, O_RDWR, 0)) == -1) ) { / * need to reopen with read perms * /
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
exit(errno);
}
}
activePages = pblHtCreate();
assert(activePages);
first = pageAlloc(0);
pblHtInsert(activePages, &first->id, sizeof(int), first);
first->prev = first->next = NULL;
pageMap(first);
repHead = repTail = first;
repMiddle = NULL;
} */
openPageFile();
pageCacheInit();
openBlobStore();
return 0;
}
static void headInsert(Page *ret) {
void bufDeinit() {
assert(ret != repMiddle);
assert(ret != repTail);
assert(ret != repHead);
closeBlobStore();
pageCacheDeinit();
closePageFile();
/* if( close(stable) ) {
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
exit(errno);
}*/
repHead->prev = ret;
ret->next = repHead;
ret->prev = NULL;
repHead = ret;
return;
}
static void middleInsert(Page *ret) {
assert( bufferSize == MAX_BUFFER_SIZE );
assert(ret != repMiddle);
assert(ret != repTail);
assert(ret != repHead);
ret->prev = repMiddle->prev;
ret->next = repMiddle;
repMiddle->prev = ret;
ret->prev->next = ret;
ret->queue = 2;
repMiddle = ret;
assert(ret->next != ret && ret->prev != ret);
}
static void qRemove(Page *ret) {
assert( bufferSize == MAX_BUFFER_SIZE );
assert(ret->next != ret && ret->prev != ret);
if( ret->prev )
ret->prev->next = ret->next;
else /* is head */
repHead = ret->next; /* won't have head == tail because of test in loadPage */
if( ret->next ) {
ret->next->prev = ret->prev;
/* TODO: these if can be better organizeed for speed */
if( ret == repMiddle )
/* select new middle */
repMiddle = ret->next;
}
else /* is tail */
repTail = ret->prev;
assert(ret != repMiddle);
assert(ret != repTail);
assert(ret != repHead);
}
/**
LRU-2S from Markatos "On Caching Searching Engine Results"
@todo Should this be its own file?
Just close file descriptors, don't do any other clean up. (For
testing.)
*/
static Page *kickPage(int pageid) {
void simulateBufferManagerCrash() {
closeBlobStore();
closePageFile();
/* close(stable);
stable = -1;*/
Page *ret = repTail;
assert( bufferSize == MAX_BUFFER_SIZE );
qRemove(ret);
pblHtRemove(activePages, &ret->id, sizeof(int));
if( munmap(ret->memAddr, PAGE_SIZE) )
assert( 0 );
pageRealloc(ret, pageid);
middleInsert(ret);
pblHtInsert(activePages, &pageid, sizeof(int), ret);
return ret;
}
int lastPageId = -1;
Page * lastPage = 0;
static Page *loadPagePtr(int pageid) {
/* lock activePages, bufferSize */
Page *ret;
if(lastPage && lastPageId == pageid) {
return lastPage;
} else {
ret = pblHtLookup(activePages, &pageid, sizeof(int));
}
if( ret ) {
if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to worry about page sorting */
/* move to head */
if( ret != repHead ) {
qRemove(ret);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
if( ret->queue == 2 ) {
/* keep first queue same size */
repMiddle = repMiddle->prev;
repMiddle->queue = 2;
ret->queue = 1;
}
}
}
lastPage = ret;
lastPageId = pageid;
return ret;
} else if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to kick */
ret = kickPage(pageid);
} else if( bufferSize == MAX_BUFFER_SIZE-1 ) { /* we need to setup kickPage mechanism */
int i;
Page *iter;
ret = pageAlloc(pageid);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
pblHtInsert( activePages, &pageid, sizeof(int), ret );
bufferSize++;
/* split up queue:
* "in all cases studied ... fixing the primary region to 30% ...
* resulted in the best performance"
*/
repMiddle = repHead;
for( i = 0; i < MAX_BUFFER_SIZE / 3; i++ ) {
repMiddle->queue = 1;
repMiddle = repMiddle->next;
}
for( iter = repMiddle; iter; iter = iter->next ) {
iter->queue = 2;
}
} else { /* we are adding to an nonfull queue */
bufferSize++;
ret = pageAlloc(pageid);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
assert(ret->next != ret && ret->prev != ret);
pblHtInsert( activePages, &pageid, sizeof(int), ret );
}
/* we now have a page we can dump info into */
assert( ret->id == pageid );
pageMap(ret);
lastPage = ret;
lastPageId = pageid;
return ret;
}
/* ** No file I/O below this line. ** */
Page loadPage (int pageid) {
return *loadPagePtr(pageid);
@ -309,21 +272,26 @@ Page loadPage (int pageid) {
Page * lastRallocPage = 0;
recordid ralloc(int xid, lsn_t lsn, size_t size) {
static unsigned int lastFreepage = 0;
recordid ret;
Page p;
if (size >= BLOB_THRESHOLD_SIZE) { /* TODO combine this with if below */
DEBUG("Rallocing blob of size %ld\n", (long int)size);
assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT);
/* if (size >= BLOB_THRESHOLD_SIZE) {
ret = allocBlob(xid, lsn, size);
} else {
} else { */
while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; }
ret = pageRalloc(p, size);
}
/* } */
DEBUG("alloced rid = {%d, %d, %d}\n", ret.page, ret.slot, ret.size);
return ret;
}
@ -331,12 +299,12 @@ long readLSN(int pageid) {
return pageReadLSN(loadPage(pageid));
}
/*
static void writeLSN(lsn_t LSN, int pageid) {
Page *p = loadPagePtr(pageid);
p->LSN = LSN;
pageWriteLSN(*p);
}
}*/
void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
Page *p;
@ -351,7 +319,9 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
assert( (p->id == rid.page) && (p->memAddr != NULL) );
pageWriteRecord(xid, *p, rid, dat);
writeLSN(lsn, rid.page);
/* writeLSN(lsn, rid.page); */
p->LSN = lsn;
pageWriteLSN(*p);
}
}
void readRecord(int xid, recordid rid, void *buf) {
@ -364,66 +334,19 @@ void readRecord(int xid, recordid rid, void *buf) {
}
}
int flushPage(Page page) {
if( munmap(page.memAddr, PAGE_SIZE) )
return MEM_WRITE_ERROR;
return 0;
}
int bufTransCommit(int xid, lsn_t lsn) {
commitBlobs(xid);
/** @todo Figure out where the blob files are fsynced() and delete this and the next few lines... */
/*
fdatasync(blobfd0);
fdatasync(blobfd1);
*/
pageCommit(xid);
return 0;
}
int bufTransAbort(int xid, lsn_t lsn) {
abortBlobs(xid); /* abortBlobs doesn't write any log entries, so it doesn't need the lsn. */
pageAbort(xid);
return 0;
}
void bufDeinit() {
int ret;
Page *p;
for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtRemove( activePages, 0, 0 )) {
if( p->dirty && (ret = flushPage(*p))) {
printf("ERROR: flushPage on %s line %d", __FILE__, __LINE__);
exit(ret);
}
}
pblHtDelete(activePages);
if( close(stable) ) {
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
exit(errno);
}
closeBlobStore();
return;
}
/**
Just close file descriptors, don't do any other clean up. (For
testing.)
*/
void simulateBufferManagerCrash() {
closeBlobStore();
close(stable);
stable = -1;
}

View file

@ -40,7 +40,7 @@ permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include <lladd/logger/logWriter.h>
#include <lladd/logger/logHandle.h>
#include <assert.h>
#include <config.h>
@ -54,6 +54,26 @@ terms specified in this license.
*/
static FILE * log;
/**
@see flushedLSN()
*/
static lsn_t flushedLSN_val;
/**
Before writeLogEntry is called, this value is the value of the
highest LSN encountered so far. Once writeLogEntry is called, it
is the next available LSN.
@see writeLogEntry
*/
static lsn_t nextAvailableLSN = 0;
static int writeLogEntryIsReady = 0;
static lsn_t maxLSNEncountered = sizeof(lsn_t);
int openLogWriter() {
log = fopen(LOG_FILE, "a+");
@ -63,12 +83,16 @@ int openLogWriter() {
return FILE_WRITE_OPEN_ERROR;
}
nextAvailableLSN = 0;
maxLSNEncountered = sizeof(lsn_t);
writeLogEntryIsReady = 0;
/* Note that the position of the file between calls to this library
does not matter, since none of the functions in logWriter.h
assume anything about the position of the stream before they are
called.
However, we need to do this seek to check the length of the file.
However, we need to do this seek to check if the file is empty.
*/
fseek(log, 0, SEEK_END);
@ -91,10 +115,46 @@ int openLogWriter() {
return 0;
}
/**
@internal
Unfortunately, this function can't just seek to the end of the
log. If it did, and a prior instance of LLADD crashed (and wrote
a partial entry), then the log would be corrupted. Therefore, we
need to be a little bit smarter, and track the next LSN value
manually. Calculating it the first time would require a scan over
the entire log, so we use the following optimization:
Every time readLSN is called, we check to see if it is called with
the highest LSN that we've seen so far. (If writeLogEntry has not
been called yet.)
The first time writeLogEntry is called, we seek from the highest
LSN encountered so far to the end of the log.
*/
int writeLogEntry(LogEntry * e) {
int nmemb;
const size_t size = sizeofLogEntry(e);
if(e->xid == -1) { /* Don't write log entries for recovery xacts. */
e->LSN = -1;
return 0;
}
if(!writeLogEntryIsReady) {
LogHandle lh = getLSNHandle(maxLSNEncountered);
LogEntry * le;
while((le = nextInLog(&lh))) {
nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);
}
writeLogEntryIsReady = 1;
}
/* Set the log entry's LSN. */
fseek(log, 0, SEEK_END);
e->LSN = ftell(log);
@ -121,6 +181,13 @@ int writeLogEntry(LogEntry * e) {
}
void syncLog() {
lsn_t newFlushedLSN;
fseek(log, 0, SEEK_END);
/* Wait to set the static variable until after the flush returns.
(In anticipation of multithreading) */
newFlushedLSN = ftell(log);
fflush(log);
#ifdef HAVE_FDATASYNC
/* Should be available in linux >= 2.4 */
@ -129,7 +196,15 @@ void syncLog() {
/* Slow - forces fs implementation to sync the file metadata to disk */
fsync(fileno(log));
#endif
flushedLSN_val = newFlushedLSN;
}
lsn_t flushedLSN() {
return flushedLSN_val;
}
void closeLogWriter() {
/* Get the whole thing to the disk before closing it. */
syncLog();
@ -142,7 +217,7 @@ void deleteLogWriter() {
static LogEntry * readLogEntry() {
LogEntry * ret = NULL;
size_t size;
size_t size, entrySize;
int nmemb;
if(feof(log)) return NULL;
@ -174,8 +249,16 @@ static LogEntry * readLogEntry() {
return 0;
}
entrySize = sizeofLogEntry(ret);
/** Sanity check -- Did we get the whole entry? */
assert(size == sizeofLogEntry(ret));
if(size < entrySize) {
return 0;
}
assert(size == entrySize);
return ret;
}
@ -183,6 +266,12 @@ static LogEntry * readLogEntry() {
LogEntry * readLSNEntry(lsn_t LSN) {
LogEntry * ret;
if(!writeLogEntryIsReady) {
if(LSN > maxLSNEncountered) {
maxLSNEncountered = LSN;
}
}
fseek(log, LSN, SEEK_SET);
ret = readLogEntry();

View file

@ -1,7 +1,7 @@
#include <lladd/operations/alloc.h>
#include <lladd/page.h>
#include <lladd/bufferManager.h>
#include "../blobManager.h"
/**
Implementation of Talloc() as an operation
@ -20,17 +20,22 @@
*/
static int operate(int xid, lsn_t lsn, recordid rid, const void * dat) {
if(rid.size >= BLOB_THRESHOLD_SIZE) {
allocBlob(xid, lsn, rid);
} else {
Page loadedPage = loadPage(rid.page);
/** Has no effect during normal operation. */
pageSlotRalloc(loadedPage, rid);
pageSlotRalloc(loadedPage, lsn, rid);
}
return 0;
}
/** @todo Currently, we just lead store space on dealloc. */
/** @todo Currently, we just leak store space on dealloc. */
static int deoperate(int xid, lsn_t lsn, recordid rid, const void * dat) {
Page loadedPage = loadPage(rid.page);
/** Has no effect during normal operation. */
pageSlotRalloc(loadedPage, rid);
/** Has no effect during normal operation, other than updating the LSN. */
pageSlotRalloc(loadedPage, lsn, rid);
return 0;
}
@ -48,6 +53,10 @@ Operation getAlloc() {
recordid Talloc(int xid, size_t size) {
recordid rid;
if(size >= BLOB_THRESHOLD_SIZE) {
rid = preAllocBlob(xid, size);
} else {
/**
@todo we pass lsn -1 into ralloc here. This is a kludge, since we
@ -68,6 +77,8 @@ recordid Talloc(int xid, size_t size) {
rid = ralloc(xid, -1, size);
}
Tupdate(xid,rid, NULL, OPERATION_ALLOC);
return rid;

View file

@ -52,7 +52,7 @@ terms specified in this license.
recordid prepare_bogus_rec = { 0, 0, 0};
static int operate(int xid, recordid rid, const void *dat) {
static int operate(int xid, lsn_t lsn, recordid rid, const void *dat) {
syncLog();
return 0;
}

View file

@ -333,10 +333,14 @@ recordid pageRalloc(Page page, size_t size) {
/** Only used for recovery, to make sure that consistent RID's are created
* on log playback. */
recordid pageSlotRalloc(Page page, recordid rid) {
recordid pageSlotRalloc(Page page, lsn_t lsn, recordid rid) {
int freeSpace = readFreeSpace(page.memAddr);
int numSlots = readNumSlots(page.memAddr);
/* if(rid.size > BLOB_THRESHOLD_SIZE) {
return blobSlotAlloc(page, lsn_t lsn, recordid rid);
}*/
/* assert(rid.slot >= numSlots); */
if(rid.slot >= numSlots) {

219
src/lladd/pageCache.c Normal file
View file

@ -0,0 +1,219 @@
/**
@file
pageCache handles the replacement policy for buffer manager. This
allows bufferManager's implementation to focus on providing atomic
writes, and locking.
*/
#include <lladd/pageCache.h>
#include <assert.h>
#include <pbl/pbl.h>
#include <lladd/constants.h>
#include <lladd/page.h>
#include <stdlib.h>
static pblHashTable_t *activePages; /* page lookup */
static unsigned int bufferSize; /* < MAX_BUFFER_SIZE */
static Page *repHead, *repMiddle, *repTail; /* replacement policy */
void pageCacheInit() {
Page *first;
bufferSize = 1;
activePages = pblHtCreate();
assert(activePages);
first = pageAlloc(0);
pblHtInsert(activePages, &first->id, sizeof(int), first);
first->prev = first->next = NULL;
/* pageMap(first); */
pageRead(first);
repHead = repTail = first;
repMiddle = NULL;
}
void pageCacheDeinit() {
Page *p;
DEBUG("pageCacheDeinit()");
for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtRemove( activePages, 0, 0 )) {
DEBUG("+");
/** @todo No one seems to set the dirty flag... */
/*if(p->dirty && (ret = pageWrite(p)/ *flushPage(*p)* /)) {
printf("ERROR: flushPage on %s line %d", __FILE__, __LINE__);
abort();
/ * exit(ret); * /
}*/
pageWrite(p);
}
pblHtDelete(activePages);
}
static void headInsert(Page *ret) {
assert(ret != repMiddle);
assert(ret != repTail);
assert(ret != repHead);
repHead->prev = ret;
ret->next = repHead;
ret->prev = NULL;
repHead = ret;
}
static void middleInsert(Page *ret) {
assert( bufferSize == MAX_BUFFER_SIZE );
assert(ret != repMiddle);
assert(ret != repTail);
assert(ret != repHead);
ret->prev = repMiddle->prev;
ret->next = repMiddle;
repMiddle->prev = ret;
ret->prev->next = ret;
ret->queue = 2;
repMiddle = ret;
assert(ret->next != ret && ret->prev != ret);
}
static void qRemove(Page *ret) {
assert( bufferSize == MAX_BUFFER_SIZE );
assert(ret->next != ret && ret->prev != ret);
if( ret->prev )
ret->prev->next = ret->next;
else /* is head */
repHead = ret->next; /* won't have head == tail because of test in loadPage */
if( ret->next ) {
ret->next->prev = ret->prev;
/* TODO: these if can be better organizeed for speed */
if( ret == repMiddle )
/* select new middle */
repMiddle = ret->next;
}
else /* is tail */
repTail = ret->prev;
assert(ret != repMiddle);
assert(ret != repTail);
assert(ret != repHead);
}
static Page *kickPage(int pageid) {
Page *ret = repTail;
assert( bufferSize == MAX_BUFFER_SIZE );
qRemove(ret);
pblHtRemove(activePages, &ret->id, sizeof(int));
/* if( munmap(ret->memAddr, PAGE_SIZE) ) */
if(pageWrite(ret)) /*flushPage(*ret)) */
assert( 0 );
pageRealloc(ret, pageid);
middleInsert(ret);
pblHtInsert(activePages, &pageid, sizeof(int), ret);
return ret;
}
int lastPageId = -1;
Page * lastPage = 0;
Page *loadPagePtr(int pageid) {
/* lock activePages, bufferSize */
Page *ret;
if(lastPage && lastPageId == pageid) {
DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) lastPage->memAddr);
return lastPage;
} else {
ret = pblHtLookup(activePages, &pageid, sizeof(int));
}
if( ret ) {
if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to worry about page sorting */
/* move to head */
if( ret != repHead ) {
qRemove(ret);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
if( ret->queue == 2 ) {
/* keep first queue same size */
repMiddle = repMiddle->prev;
repMiddle->queue = 2;
ret->queue = 1;
}
}
}
lastPage = ret;
lastPageId = pageid;
DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr);
return ret;
} else if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to kick */
ret = kickPage(pageid);
} else if( bufferSize == MAX_BUFFER_SIZE-1 ) { /* we need to setup kickPage mechanism */
int i;
Page *iter;
ret = pageAlloc(pageid);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
pblHtInsert( activePages, &pageid, sizeof(int), ret );
bufferSize++;
/* split up queue:
* "in all cases studied ... fixing the primary region to 30% ...
* resulted in the best performance"
*/
repMiddle = repHead;
for( i = 0; i < MAX_BUFFER_SIZE / 3; i++ ) {
repMiddle->queue = 1;
repMiddle = repMiddle->next;
}
for( iter = repMiddle; iter; iter = iter->next ) {
iter->queue = 2;
}
} else { /* we are adding to an nonfull queue */
bufferSize++;
ret = pageAlloc(pageid);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
assert(ret->next != ret && ret->prev != ret);
pblHtInsert( activePages, &pageid, sizeof(int), ret );
}
/* we now have a page we can dump info into */
assert( ret->id == pageid );
/*pageMap(ret); */
pageRead(ret);
lastPage = ret;
lastPageId = pageid;
DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr);
return ret;
}

View file

@ -11,8 +11,9 @@ int numActiveXactions = 0;
int xidCount = 0;
#define INVALID_XTABLE_XID -1
int Tinit() {
/** Needed for debugging -- sometimes we don't want to run all of Tinit() */
void setupOperationsTable() {
memset(XactionTable, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS);
operationsTable[OPERATION_SET] = getSet();
operationsTable[OPERATION_INCREMENT] = getIncrement();
@ -23,6 +24,12 @@ int Tinit() {
operationsTable[OPERATION_ALLOC] = getAlloc();
operationsTable[OPERATION_DEALLOC] = getDealloc();
}
int Tinit() {
setupOperationsTable();
pageInit();
bufInit();

View file

@ -449,6 +449,8 @@ START_TEST(recoverBlob__crash) {
Tread(xid, rid, &j);
assert(!memcmp(j,k,ARRAY_SIZE * sizeof(int)));
fail_unless(!memcmp(j,k,ARRAY_SIZE), "Recovery failed on second re-open.");
Tdeinit();

View file

@ -41,7 +41,7 @@ terms specified in this license.
---*/
#include <config.h>
#include <check.h>
/*#include <assert.h> */
#include <assert.h>
#include <lladd/transactional.h>
#include <lladd/logger/logWriter.h>
@ -141,6 +141,9 @@ START_TEST (recovery_exactlyOnce) {
Tread(xid, rid, &k);
printf("j = %d, k = %d\n", j, k);
assert(j == k);
fail_unless(j == k, "Recovery messed something up!");
Tcommit(xid);

4
utilities/Makefile.am Normal file
View file

@ -0,0 +1,4 @@
LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \
$(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a
bin_PROGRAMS=logfile_dump
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

79
utilities/logfile_dump.c Normal file
View file

@ -0,0 +1,79 @@
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <lladd/logger/logHandle.h>
#include <lladd/logger/logWriter.h>
static char * logEntryToString(LogEntry * le) {
char * ret = NULL;
switch(le->type) {
case UPDATELOG:
{
recordid rid = le->contents.clr.rid;
asprintf(&ret, "UPDATE\tlsn=%9ld\tprevlsn=%9ld\txid=%4d\trid={%5d %5d %5d}\tfuncId=%3d\targSize=%9d\n", le->LSN, le->prevLSN, le->xid,
rid.page, rid.slot, rid.size, le->contents.update.funcID, le->contents.update.argSize );
}
break;
case XBEGIN:
{
asprintf(&ret, "BEGIN\tlsn=%9ld\tprevlsn=%9ld\txid=%4d\n", le->LSN, le->prevLSN, le->xid);
}
break;
case XCOMMIT:
{
asprintf(&ret, "COMMIT\tlsn=%9ld\tprevlsn=%9ld\txid=%4d\n", le->LSN, le->prevLSN, le->xid);
}
break;
case XABORT:
{
asprintf(&ret, "ABORT\tlsn=%9ld\tprevlsn=%9ld\txid=%4d\n", le->LSN, le->prevLSN, le->xid);
}
break;
case XEND:
{
asprintf(&ret, "END \tlsn=%9ld\tprevlsn=%9ld\txid=%4d\n", le->LSN, le->prevLSN, le->xid);
}
break;
case CLRLOG:
{
recordid rid = le->contents.clr.rid;
asprintf(&ret, "CLR \tlsn=%9ld\tprevlsn=%9ld\txid=%4d\trid={%5d %5d %5d}\tthisUpdateLSN=%9ld\tundoNextLSN=%9ld\n", le->LSN, le->prevLSN, le->xid,
rid.page, rid.slot, rid.size, (long int)le->contents.clr.thisUpdateLSN, (long int)le->contents.clr.undoNextLSN );
}
break;
}
return ret;
}
void setupOperationsTable();
int main() {
LogHandle lh;
LogEntry * le;
setupOperationsTable();
if(openLogWriter()) {
printf("Couldn't open log.\n");
}
lh = getLSNHandle(44);
while((le = nextInLog(&lh))) {
char * s = logEntryToString(le);
if(s) {
printf("%s", s);
free(s);
}
}
}