diff --git a/Makefile.am b/Makefile.am index 6bd87a7..c699ace 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,5 +1,5 @@ EXTRA_DIST = reconf -SUBDIRS = src test +SUBDIRS = src test utilities AM_CFLAGS = -g -Wall -pedantic docs: diff --git a/configure.in b/configure.in index 91d659b..6b80e45 100644 --- a/configure.in +++ b/configure.in @@ -76,6 +76,7 @@ AC_CONFIG_FILES([Makefile src/lladd/Makefile src/pbl/Makefile src/timing/Makefile + utilities/Makefile test/2pc/Makefile test/Makefile test/cht/Makefile diff --git a/lladd/bufferManager.h b/lladd/bufferManager.h index 2cdcdbf..6d7a5aa 100644 --- a/lladd/bufferManager.h +++ b/lladd/bufferManager.h @@ -94,21 +94,6 @@ terms specified in this license. @todo need alloc + free... -// Calls for LLADD managed memory (returns pointers to LLADD's cache.) - -// Read only access to record (requires an un-pinning) - -// Cost with cache hit: pointer arithmetic. - -// - map_t readMapRecord(rid, &((const void *)); - -// Map a page read / write. Pins the page, sets its lsn, and provides a pointer to the record: -// Cost with cache hit: pointer arithmetic, eventual disk flush. - -// - map_t readWriteMapRecord(rid, &(void *)); - -// Unmap a mapped page so that it can be kicked. - // @param lsn can be 0 if this is a read-only mapping. Otherwise, // it should be the LSN of the operation that calls unmapRecord. // @todo What connection between the lock manager and this function @@ -220,12 +205,25 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat); void readRecord(int xid, recordid rid, void *dat); /** - * @param page write page to disk, including correct LSN + * Write page to disk, including correct LSN. Doing so may require a + * call to logSync(). There is not much that can be done to avoid + * this call right now. In the future, it might make sense to check + * to see if some other page can be kicked, in order to avoid the log + * flush. + * + * @param page The page to be flushed to disk. * @return 0 on success * @return error code on failure */ int flushPage(Page page); +/** + Read a page from disk. + + @param A page struct, with id set correctly. The rest of this + struct is filled out by pageMap. +*/ +void pageMap(Page * page); /* * this function does NOT write to disk, just drops the page from the active * pages diff --git a/lladd/logger/logWriter.h b/lladd/logger/logWriter.h index cb13c04..5f2dbfc 100644 --- a/lladd/logger/logWriter.h +++ b/lladd/logger/logWriter.h @@ -58,6 +58,7 @@ terms specified in this license. * never hurts to have more flushes to disk, as long as it doesn't hurt * performance. * + * @todo CORRECTNESS: We currently assume that there are no partial entries in the log! * @todo Everything in this file cores on failure (no error handling yet) * @todo All of the logWriter calls should be reentrant. * @@ -83,75 +84,52 @@ int openLogWriter(); /** - @param e Pointer to a log entry. After the call, e->LSN will be set appropriately. + @param e Pointer to a log entry. After the call, e->LSN will be set appropriately. If e's xid is set to -1, then this call has no effect (and e's LSN will be set to -1.) returns 0 on success, or an error code defined above */ int writeLogEntry(LogEntry * e); -/* +/** flush the entire log (tail) that is currently in memory to disk */ void syncLog(); -/* +/** + Return the highest LSN that is known to be on disk. (Currently, we + only know if an LSN is on disk if we've written that LSN before a + call to syncLog(). + + Note: This function might not return an LSN corresponding to a real + log entry, but it will definitely return one that is greater than + or equal to the LSN of a log entry that has been forced to disk, + and is less than the LSN of all log entries that might not have + been forced to disk. +*/ +lsn_t flushedLSN(); + +/** Close the log stream */ void closeLogWriter(); -/* - Get the current position of the stream (in terms of bytes) -*/ -/*long getFilePos();*/ - -/* +/** Actually deletes the log file that may have been written to disk! Danger!! Only use after calling closeLogStream AND you are sure there are no active (or future active) transactions! + + @todo This is in here now for completeness, but once we implement + log truncation, it should leave. */ void deleteLogWriter(); -/* - * Returns the current position of the stream no matter where it is - */ -/*long streamPos();*/ - -/* - * Returns the position of the stream if it were to read. - */ -/*long writeStreamPos();*/ - -/* - * readLog reads a line from the log puts it in a string - * - * This was made static because it exports state that the interface - * should be hiding. (To use this function, the user must make - * assumptions regarding the value of the FILE's current offset.) - * - * returns the number of bytes read and put into buffer - * */ -/*int readLog(byte **buffer);*/ -/* LogEntry * readLogEntry(); */ - -/* - * seek to a position in the log file and read it into the buffer - * - * returns the number of bytes read and put into buffer - * */ - -/*int seekAndReadLog(long pos, byte **buffer);*/ +/** + Read a log entry at a particular LSN. + @param the LSN of the entry that will be read. +*/ LogEntry * readLSNEntry(lsn_t LSN); -/* lsn_t nextLSN(); */ - -/* - * tell the current position in the log file - * */ -/*long readPos (); - -void seekInLog(long pos);*/ - END_C_DECLS #endif /* __LLADD_LOGGER_LOGWRITER_H */ diff --git a/lladd/page.h b/lladd/page.h index 04328a7..1eab362 100644 --- a/lladd/page.h +++ b/lladd/page.h @@ -129,7 +129,7 @@ void pageRealloc(Page *p, int id); Page* pageAlloc(int id); -recordid pageSlotRalloc(Page page, recordid rid); +recordid pageSlotRalloc(Page page, lsn_t lsn, recordid rid); int pageTest(); diff --git a/lladd/pageCache.h b/lladd/pageCache.h new file mode 100644 index 0000000..bec3491 --- /dev/null +++ b/lladd/pageCache.h @@ -0,0 +1,24 @@ +#ifndef __PAGECACHE_H +#define __PAGECACHE_H + +/** + Implements lladd's caching policy. Looks up pageid in the cache. + If pageid doesn't exist, then allocate a new slot for it. If + there are no new slots, then callback into bufferManager's + pageRead() function. Eventually, this could be extended to + support application specific caching schemes. + + Currently, LLADD uses LRU-2S from Markatos "On Caching Searching + Engine Results" + + If you would like to implement your own caching policy, implement + the three functions below. They are relatively straightforward. + Note that pageCache does not perform any file I/O of its own. + +*/ +#include +void pageCacheInit(); +void pageCacheDeinit(); +Page * loadPagePtr(int pageid); + +#endif diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index 7f7964e..86c1ea2 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -3,6 +3,6 @@ lib_LIBRARIES=liblladd.a #liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a # removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c -liblladd_a_SOURCES=bufferManager.c linkedlist.c operations.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/lladdhash.c operations/prepare.c operations/set.c operations/alloc.c +liblladd_a_SOURCES=bufferManager.c linkedlist.c operations.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/lladdhash.c operations/prepare.c operations/set.c operations/alloc.c AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/src/lladd/blobManager.c b/src/lladd/blobManager.c index 561d86d..0df1bc7 100644 --- a/src/lladd/blobManager.c +++ b/src/lladd/blobManager.c @@ -31,7 +31,7 @@ static void readRawRecord(int xid, recordid rid, void * buf, int size) { readRecord(xid, blob_rec_rid, buf); } -static void writeRawRecord(int xid, lsn_t lsn, recordid rid, const void * buf, int size) { +static void writeRawRecord(int xid, recordid rid, const void * buf, int size) { recordid blob_rec_rid = rid; blob_rec_rid.size = size; Tset(xid, blob_rec_rid, buf); @@ -42,7 +42,7 @@ static void writeRawRecord(int xid, lsn_t lsn, recordid rid, const void * buf, i /* moved verbatim from bufferManger.c, then hacked up to use FILE * instead of ints. */ void openBlobStore() { int blobfd0, blobfd1; - if( ! (blobf0 = fopen(BLOB0_FILE, "w+"))) { /* file may not exist */ + if( ! (blobf0 = fopen(BLOB0_FILE, "r+"))) { /* file may not exist */ if( (blobfd0 = creat(BLOB0_FILE, 0666)) == -1 ) { /* cannot even create it */ printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); perror("Creating blob 0 file"); abort(); @@ -51,7 +51,7 @@ void openBlobStore() { printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); perror(NULL); abort(); } - if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); } + if(!(blobf0 = fopen(BLOB0_FILE, "r+"))) { perror("Couldn't open or create blob 0 file"); abort(); } } DEBUG("blobf0 opened.\n"); @@ -93,43 +93,34 @@ void closeBlobStore() { pblHtDelete(dirtyBlobs); } -static long myFseek(FILE * f, long offset, int whence) { +long myFseek(FILE * f, long offset, int whence) { long ret; if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); } if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); } return ret; } -recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) { - +recordid preAllocBlob(int xid, size_t blobSize) { long fileSize = myFseek(blobf1, 0, SEEK_END); blob_record_t blob_rec; Page p; - char zero = 0; + /* char zero = 0; */ /* Allocate space for the blob entry. */ - + + DEBUG("Allocing blob (size %d)\n", blobSize); + assert(blobSize > 0); /* Don't support zero length blobs right now... */ /* First in buffer manager. */ + /** Needs to be in alloc.c */ + recordid rid = Talloc(xid, sizeof(blob_record_t)); - readRecord(xid, rid, &blob_rec); - /** Then in the blob file. @todo: BUG How can we get around doing a - force here? If the user allocates space and we crash, could we - double allocate space, since the file won't have grown. Could - we write a log entry with the new size? Alternatively, is - forcing the files before writing a commit to log enough?*/ - - /** @todo Should this be -1, not -2? Aren't we writing one byte after the end of the blob? */ - myFseek(blobf0, fileSize + blobSize - 1, SEEK_SET); - myFseek(blobf1, fileSize + blobSize - 1, SEEK_SET); - - if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); } - if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); } - - /** Finally, fix up the fields in the record that points to the blob. */ + /** Finally, fix up the fields in the record that points to the blob. + The rest of this also should go into alloc.c + */ blob_rec.fd = 0; blob_rec.size = blobSize; @@ -143,11 +134,47 @@ recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) { /* Tset() needs to know to 'do the right thing' here, since we've changed the size it has recorded for this record, and writeRawRecord makes sure that that is the case. */ - writeRawRecord (xid, lsn, rid, &blob_rec, sizeof(blob_record_t)); + writeRawRecord (xid, rid, &blob_rec, sizeof(blob_record_t)); rid.size = blob_rec.size; return rid; + +} + +void allocBlob(int xid, lsn_t lsn, recordid rid) { + + long fileSize = myFseek(blobf1, 0, SEEK_END); + blob_record_t blob_rec; + Page p; + char zero = 0; + /* recordid rid = preAllocBlob(xid, blobSize); */ + /* Allocate space for the blob entry. */ + + DEBUG("post Allocing blob (size %d)\n", rid.size); + + assert(rid.size > 0); /* Don't support zero length blobs right now... */ + + /* First in buffer manager. */ + + p = loadPage(rid.page); + + /* Read in record to get the correct offset, size for the blob*/ + readRawRecord(xid, rid, &blob_rec, sizeof(blob_record_t)); + + /** Then in the blob file. @todo: BUG How can we get around doing a + force here? If the user allocates space and we crash, could we + double allocate space, since the file won't have grown. Could + we write a log entry with the new size? Alternatively, is + forcing the files before writing a commit to log enough?*/ + + /** @todo Should this be -1, not -2? Aren't we writing one byte after the end of the blob? */ + myFseek(blobf0, fileSize + rid.size - 1, SEEK_SET); + myFseek(blobf1, fileSize + rid.size - 1, SEEK_SET); + + if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); } + if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); } + } static lsn_t * tripleHashLookup(int xid, recordid rid) { @@ -258,9 +285,20 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { FILE * fd; int readcount; + /** @todo Dang. Blob allocation needs to go in three phases: + 1) Alloc record (normal Talloc() before blob alloc) + 2) Grab offset / size from blobManager, Tset() record from 1 with it. + 3) Log it. + 4) Upon recieving log entry, update static file length variable in blobManager. (Will fix double allocation race bug as well, but introduced record leaking on crash... Oh well.) + */ + DEBUG("Writing blob (size %d)\n", rid.size); + + /* Tread() raw record */ readRawRecord(xid, rid, &rec, sizeof(blob_record_t)); + assert(rec.size == rid.size); + if(dirty) { assert(lsn > *dirty); *dirty = lsn; /* Updates value in triple hash (works because of pointer aliasing.) */ @@ -275,7 +313,7 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { rec.fd = rec.fd ? 0 : 1; /* Tset() raw record */ - writeRawRecord(xid, lsn, rid, &rec, sizeof(blob_record_t)); + writeRawRecord(xid, rid, &rec, sizeof(blob_record_t)); } fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */ @@ -290,8 +328,13 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { /* No need to update the raw blob record. */ } - +/** @todo check to see if commitBlobs actually needs to flush blob + files when it's called (are there any dirty blobs associated with + this transaction? */ void commitBlobs(int xid) { + + fdatasync(fileno(blobf0)); + fdatasync(fileno(blobf1)); abortBlobs(xid); } diff --git a/src/lladd/blobManager.h b/src/lladd/blobManager.h index 032c6f1..d27a72e 100644 --- a/src/lladd/blobManager.h +++ b/src/lladd/blobManager.h @@ -68,13 +68,17 @@ typedef struct { unsigned fd : 1; } blob_record_t; + +recordid preAllocBlob(int xid, size_t blobsize); + /** Allocate a blob of size blobSize. @todo This function does not atomically allocate space in the blob file. */ -recordid allocBlob(int xid, lsn_t lsn, size_t blobSize); + +void allocBlob(int xid, lsn_t lsn, recordid rid); void openBlobStore(); void closeBlobStore(); diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index cb1515d..d58a795 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -63,17 +63,122 @@ terms specified in this license. #include "blobManager.h" -static pblHashTable_t *activePages; /* page lookup */ -static unsigned int bufferSize = 1; /* < MAX_BUFFER_SIZE */ -static Page *repHead, *repMiddle, *repTail; /* replacement policy */ +#include -static int stable = -1; +#include -static void pageMap(Page *ret) { +static FILE * stable = NULL; +static unsigned int lastFreepage = 0; + +/* ** File I/O functions ** */ +/* Defined in blobManager.c, but don't want to export this in any public .h files... */ +long myFseek(FILE * f, long offset, int whence); + +void pageRead(Page *ret) { + long fileSize = myFseek(stable, 0, SEEK_END); + long pageoffset = ret->id * PAGE_SIZE; + long offset; + + DEBUG("Reading page %d\n", ret->id); + + if(!ret->memAddr) { + ret->memAddr = malloc(PAGE_SIZE); + } + assert(ret->memAddr); + + if ((ret->id)*PAGE_SIZE >= fileSize) { + myFseek(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET); + if(1 != fwrite("", 1, 1, stable)) { + if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); } + if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); } + } + + } + offset = myFseek(stable, pageoffset, SEEK_SET); + assert(offset == pageoffset); + + if(1 != fread(ret->memAddr, PAGE_SIZE, 1, stable)) { + + if(feof(stable)) { printf("Unexpected eof reading!\n"); fflush(NULL); abort(); } + if(ferror(stable)) { printf("Error reading stream! %d", ferror(stable)); fflush(NULL); abort(); } + + } + +} + +void pageWrite(const Page * ret) { + + long pageoffset = ret->id * PAGE_SIZE; + long offset = myFseek(stable, pageoffset, SEEK_SET); + assert(offset == pageoffset); + assert(ret->memAddr); + + DEBUG("Writing page %d\n", ret->id); + + if(flushedLSN() < pageReadLSN(*ret)) { + DEBUG("pageWrite is calling syncLog()!\n"); + syncLog(); + } + + if(1 != fwrite(ret->memAddr, PAGE_SIZE, 1, stable)) { + + if(feof(stable)) { printf("Unexpected eof writing!\n"); fflush(NULL); abort(); } + if(ferror(stable)) { printf("Error writing stream! %d", ferror(stable)); fflush(NULL); abort(); } + + } + + +} + +static void openPageFile() { + int stable_fd; + + DEBUG("Opening storefile.\n"); + if( ! (stable = fopen(STORE_FILE, "r+"))) { /* file may not exist */ + byte* zero = calloc(1, PAGE_SIZE); + + DEBUG("Creating new page storefile.\n"); + + if( (stable_fd = creat(STORE_FILE, 0666)) == -1 ) { /* cannot even create it */ + printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); + perror("Creating store file"); abort(); + } + if( close(stable_fd)) { + printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); + perror(NULL); abort(); + } + if(!(stable = fopen(STORE_FILE, "r+"))) { perror("Couldn't open or create store file"); abort(); } + + /* Write out one page worth of zeros to get started. */ + + if(1 != fwrite(zero, PAGE_SIZE, 1, stable)) { assert (0); } + } + + lastFreepage = 0; + DEBUG("storefile opened.\n"); + +} +static void closePageFile() { + + int ret = fclose(stable); + assert(!ret); + stable = NULL; +} + +void pageMap(Page *ret) { + pageRead(ret); +} +int flushPage(Page ret) { + pageWrite(&ret); + return 0; +} + +/* +void pageMap(Page *ret) { int fileSize; - /* this was lseek(stable, SEEK_SET, pageid*PAGE_SIZE), but changed to - lseek(stable, pageid*PAGE_SIZE, SEEK_SET) by jkit (Wed Mar 24 12:59:18 PST 2004)*/ + / * this was lseek(stable, SEEK_SET, pageid*PAGE_SIZE), but changed to + lseek(stable, pageid*PAGE_SIZE, SEEK_SET) by jkit (Wed Mar 24 12:59:18 PST 2004)* / fileSize = lseek(stable, 0, SEEK_END); if ((ret->id)*PAGE_SIZE >= fileSize) { @@ -87,12 +192,19 @@ static void pageMap(Page *ret) { } } +int flushPage(Page page) { + + if( munmap(page.memAddr, PAGE_SIZE) ) + return MEM_WRITE_ERROR; + + return 0; +} + +*/ + int bufInit() { - Page *first; - - bufferSize = 1; - stable = -1; + stable = NULL; /* Create STORE_FILE, if necessary, then open it read/write @@ -104,204 +216,55 @@ int bufInit() { Now, zero means uninitialized, so this could probably be replaced with a call to open(... O_CREAT|O_RW) or something like that... */ - if( (stable = open(STORE_FILE, O_RDWR, 0)) == -1 ) { /* file may not exist */ - void *zero = mmap(0, PAGE_SIZE, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0); /* zero = /dev/zero */ - if( (stable = creat(STORE_FILE, 0666)) == -1 ) { /* cannot even create it */ + /* if( (stable = open(STORE_FILE, O_RDWR, 0)) == -1 ) { / * file may not exist * / + void *zero = mmap(0, PAGE_SIZE, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0); / * zero = /dev/zero * / + if( (stable = creat(STORE_FILE, 0666)) == -1 ) { / * cannot even create it * / printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); exit(errno); } - /* kick off a fresh page */ - if( write(stable, zero, PAGE_SIZE) != PAGE_SIZE ) { /* write zeros out */ + / * kick off a fresh page * / + if( write(stable, zero, PAGE_SIZE) != PAGE_SIZE ) { / * write zeros out * / printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); exit(errno); } - if( close(stable) || ((stable = open(STORE_FILE, O_RDWR, 0)) == -1) ) { /* need to reopen with read perms */ + if( close(stable) || ((stable = open(STORE_FILE, O_RDWR, 0)) == -1) ) { / * need to reopen with read perms * / printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); exit(errno); } - } - - activePages = pblHtCreate(); - assert(activePages); - - first = pageAlloc(0); - pblHtInsert(activePages, &first->id, sizeof(int), first); - - first->prev = first->next = NULL; - pageMap(first); - - repHead = repTail = first; - repMiddle = NULL; + } */ + openPageFile(); + pageCacheInit(); openBlobStore(); - + return 0; } -static void headInsert(Page *ret) { +void bufDeinit() { - assert(ret != repMiddle); - assert(ret != repTail); - assert(ret != repHead); + closeBlobStore(); + pageCacheDeinit(); + closePageFile(); + /* if( close(stable) ) { + printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); + exit(errno); + }*/ - repHead->prev = ret; - ret->next = repHead; - ret->prev = NULL; - repHead = ret; + return; } - -static void middleInsert(Page *ret) { - - assert( bufferSize == MAX_BUFFER_SIZE ); - - assert(ret != repMiddle); - assert(ret != repTail); - assert(ret != repHead); - - ret->prev = repMiddle->prev; - ret->next = repMiddle; - repMiddle->prev = ret; - ret->prev->next = ret; - ret->queue = 2; - - repMiddle = ret; - assert(ret->next != ret && ret->prev != ret); -} - -static void qRemove(Page *ret) { - - assert( bufferSize == MAX_BUFFER_SIZE ); - assert(ret->next != ret && ret->prev != ret); - - if( ret->prev ) - ret->prev->next = ret->next; - else /* is head */ - repHead = ret->next; /* won't have head == tail because of test in loadPage */ - if( ret->next ) { - ret->next->prev = ret->prev; - /* TODO: these if can be better organizeed for speed */ - if( ret == repMiddle ) - /* select new middle */ - repMiddle = ret->next; - } - else /* is tail */ - repTail = ret->prev; - - assert(ret != repMiddle); - assert(ret != repTail); - assert(ret != repHead); -} - -/** - LRU-2S from Markatos "On Caching Searching Engine Results" - - @todo Should this be its own file? - +/** + Just close file descriptors, don't do any other clean up. (For + testing.) */ -static Page *kickPage(int pageid) { +void simulateBufferManagerCrash() { + closeBlobStore(); + closePageFile(); + /* close(stable); + stable = -1;*/ - Page *ret = repTail; - - assert( bufferSize == MAX_BUFFER_SIZE ); - - qRemove(ret); - pblHtRemove(activePages, &ret->id, sizeof(int)); - if( munmap(ret->memAddr, PAGE_SIZE) ) - assert( 0 ); - - pageRealloc(ret, pageid); - - middleInsert(ret); - pblHtInsert(activePages, &pageid, sizeof(int), ret); - - return ret; } - -int lastPageId = -1; -Page * lastPage = 0; - -static Page *loadPagePtr(int pageid) { - /* lock activePages, bufferSize */ - Page *ret; - - if(lastPage && lastPageId == pageid) { - return lastPage; - } else { - ret = pblHtLookup(activePages, &pageid, sizeof(int)); - } - - if( ret ) { - if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to worry about page sorting */ - /* move to head */ - if( ret != repHead ) { - qRemove(ret); - headInsert(ret); - assert(ret->next != ret && ret->prev != ret); - - if( ret->queue == 2 ) { - /* keep first queue same size */ - repMiddle = repMiddle->prev; - repMiddle->queue = 2; - - ret->queue = 1; - } - } - } - - lastPage = ret; - lastPageId = pageid; - - return ret; - } else if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to kick */ - ret = kickPage(pageid); - } else if( bufferSize == MAX_BUFFER_SIZE-1 ) { /* we need to setup kickPage mechanism */ - int i; - Page *iter; - - ret = pageAlloc(pageid); - headInsert(ret); - assert(ret->next != ret && ret->prev != ret); - - pblHtInsert( activePages, &pageid, sizeof(int), ret ); - - bufferSize++; - - /* split up queue: - * "in all cases studied ... fixing the primary region to 30% ... - * resulted in the best performance" - */ - repMiddle = repHead; - for( i = 0; i < MAX_BUFFER_SIZE / 3; i++ ) { - repMiddle->queue = 1; - repMiddle = repMiddle->next; - } - - for( iter = repMiddle; iter; iter = iter->next ) { - iter->queue = 2; - } - - } else { /* we are adding to an nonfull queue */ - - bufferSize++; - - ret = pageAlloc(pageid); - headInsert(ret); - assert(ret->next != ret && ret->prev != ret); - assert(ret->next != ret && ret->prev != ret); - pblHtInsert( activePages, &pageid, sizeof(int), ret ); - } - - /* we now have a page we can dump info into */ - assert( ret->id == pageid ); - - pageMap(ret); - - lastPage = ret; - lastPageId = pageid; - - return ret; -} +/* ** No file I/O below this line. ** */ Page loadPage (int pageid) { return *loadPagePtr(pageid); @@ -309,21 +272,26 @@ Page loadPage (int pageid) { Page * lastRallocPage = 0; + recordid ralloc(int xid, lsn_t lsn, size_t size) { - static unsigned int lastFreepage = 0; + recordid ret; Page p; - if (size >= BLOB_THRESHOLD_SIZE) { /* TODO combine this with if below */ + DEBUG("Rallocing blob of size %ld\n", (long int)size); + + assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT); + + /* if (size >= BLOB_THRESHOLD_SIZE) { ret = allocBlob(xid, lsn, size); - } else { + } else { */ while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; } ret = pageRalloc(p, size); - } + /* } */ DEBUG("alloced rid = {%d, %d, %d}\n", ret.page, ret.slot, ret.size); return ret; } @@ -331,12 +299,12 @@ long readLSN(int pageid) { return pageReadLSN(loadPage(pageid)); } - +/* static void writeLSN(lsn_t LSN, int pageid) { Page *p = loadPagePtr(pageid); p->LSN = LSN; pageWriteLSN(*p); -} + }*/ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) { Page *p; @@ -351,7 +319,9 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) { assert( (p->id == rid.page) && (p->memAddr != NULL) ); pageWriteRecord(xid, *p, rid, dat); - writeLSN(lsn, rid.page); + /* writeLSN(lsn, rid.page); */ + p->LSN = lsn; + pageWriteLSN(*p); } } void readRecord(int xid, recordid rid, void *buf) { @@ -364,66 +334,19 @@ void readRecord(int xid, recordid rid, void *buf) { } } -int flushPage(Page page) { - - if( munmap(page.memAddr, PAGE_SIZE) ) - return MEM_WRITE_ERROR; - - return 0; -} - int bufTransCommit(int xid, lsn_t lsn) { commitBlobs(xid); - - /** @todo Figure out where the blob files are fsynced() and delete this and the next few lines... */ - - /* - fdatasync(blobfd0); - fdatasync(blobfd1); - */ - pageCommit(xid); return 0; } int bufTransAbort(int xid, lsn_t lsn) { + abortBlobs(xid); /* abortBlobs doesn't write any log entries, so it doesn't need the lsn. */ pageAbort(xid); return 0; } -void bufDeinit() { - int ret; - Page *p; - - for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtRemove( activePages, 0, 0 )) { - if( p->dirty && (ret = flushPage(*p))) { - printf("ERROR: flushPage on %s line %d", __FILE__, __LINE__); - exit(ret); - } - } - pblHtDelete(activePages); - - if( close(stable) ) { - printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); - exit(errno); - } - - closeBlobStore(); - - return; -} -/** - Just close file descriptors, don't do any other clean up. (For - testing.) -*/ -void simulateBufferManagerCrash() { - closeBlobStore(); - - close(stable); - stable = -1; - -} diff --git a/src/lladd/logger/logWriter.c b/src/lladd/logger/logWriter.c index 223b63e..103aea6 100644 --- a/src/lladd/logger/logWriter.c +++ b/src/lladd/logger/logWriter.c @@ -40,7 +40,7 @@ permission to use and distribute the software in accordance with the terms specified in this license. ---*/ #include - +#include #include #include @@ -54,6 +54,26 @@ terms specified in this license. */ static FILE * log; +/** + @see flushedLSN() +*/ +static lsn_t flushedLSN_val; + +/** + + Before writeLogEntry is called, this value is the value of the + highest LSN encountered so far. Once writeLogEntry is called, it + is the next available LSN. + + @see writeLogEntry +*/ +static lsn_t nextAvailableLSN = 0; +static int writeLogEntryIsReady = 0; +static lsn_t maxLSNEncountered = sizeof(lsn_t); + + + + int openLogWriter() { log = fopen(LOG_FILE, "a+"); @@ -63,12 +83,16 @@ int openLogWriter() { return FILE_WRITE_OPEN_ERROR; } + nextAvailableLSN = 0; + maxLSNEncountered = sizeof(lsn_t); + writeLogEntryIsReady = 0; + /* Note that the position of the file between calls to this library does not matter, since none of the functions in logWriter.h assume anything about the position of the stream before they are called. - However, we need to do this seek to check the length of the file. + However, we need to do this seek to check if the file is empty. */ fseek(log, 0, SEEK_END); @@ -91,10 +115,46 @@ int openLogWriter() { return 0; } + +/** + @internal + + Unfortunately, this function can't just seek to the end of the + log. If it did, and a prior instance of LLADD crashed (and wrote + a partial entry), then the log would be corrupted. Therefore, we + need to be a little bit smarter, and track the next LSN value + manually. Calculating it the first time would require a scan over + the entire log, so we use the following optimization: + + Every time readLSN is called, we check to see if it is called with + the highest LSN that we've seen so far. (If writeLogEntry has not + been called yet.) + + The first time writeLogEntry is called, we seek from the highest + LSN encountered so far to the end of the log. + +*/ int writeLogEntry(LogEntry * e) { int nmemb; const size_t size = sizeofLogEntry(e); + if(e->xid == -1) { /* Don't write log entries for recovery xacts. */ + e->LSN = -1; + return 0; + } + + if(!writeLogEntryIsReady) { + LogHandle lh = getLSNHandle(maxLSNEncountered); + LogEntry * le; + + while((le = nextInLog(&lh))) { + nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t); + } + writeLogEntryIsReady = 1; + } + + + /* Set the log entry's LSN. */ fseek(log, 0, SEEK_END); e->LSN = ftell(log); @@ -121,6 +181,13 @@ int writeLogEntry(LogEntry * e) { } void syncLog() { + lsn_t newFlushedLSN; + + fseek(log, 0, SEEK_END); + /* Wait to set the static variable until after the flush returns. + (In anticipation of multithreading) */ + newFlushedLSN = ftell(log); + fflush(log); #ifdef HAVE_FDATASYNC /* Should be available in linux >= 2.4 */ @@ -129,7 +196,15 @@ void syncLog() { /* Slow - forces fs implementation to sync the file metadata to disk */ fsync(fileno(log)); #endif + + flushedLSN_val = newFlushedLSN; + } + +lsn_t flushedLSN() { + return flushedLSN_val; +} + void closeLogWriter() { /* Get the whole thing to the disk before closing it. */ syncLog(); @@ -142,7 +217,7 @@ void deleteLogWriter() { static LogEntry * readLogEntry() { LogEntry * ret = NULL; - size_t size; + size_t size, entrySize; int nmemb; if(feof(log)) return NULL; @@ -174,8 +249,16 @@ static LogEntry * readLogEntry() { return 0; } + entrySize = sizeofLogEntry(ret); + + /** Sanity check -- Did we get the whole entry? */ - assert(size == sizeofLogEntry(ret)); + if(size < entrySize) { + return 0; + } + + + assert(size == entrySize); return ret; } @@ -183,6 +266,12 @@ static LogEntry * readLogEntry() { LogEntry * readLSNEntry(lsn_t LSN) { LogEntry * ret; + if(!writeLogEntryIsReady) { + if(LSN > maxLSNEncountered) { + maxLSNEncountered = LSN; + } + } + fseek(log, LSN, SEEK_SET); ret = readLogEntry(); diff --git a/src/lladd/operations/alloc.c b/src/lladd/operations/alloc.c index 7f4ffcf..438ad6e 100644 --- a/src/lladd/operations/alloc.c +++ b/src/lladd/operations/alloc.c @@ -1,7 +1,7 @@ #include #include #include - +#include "../blobManager.h" /** Implementation of Talloc() as an operation @@ -20,17 +20,22 @@ */ static int operate(int xid, lsn_t lsn, recordid rid, const void * dat) { - Page loadedPage = loadPage(rid.page); - /** Has no effect during normal operation. */ - pageSlotRalloc(loadedPage, rid); + if(rid.size >= BLOB_THRESHOLD_SIZE) { + allocBlob(xid, lsn, rid); + } else { + Page loadedPage = loadPage(rid.page); + /** Has no effect during normal operation. */ + pageSlotRalloc(loadedPage, lsn, rid); + } + return 0; } -/** @todo Currently, we just lead store space on dealloc. */ +/** @todo Currently, we just leak store space on dealloc. */ static int deoperate(int xid, lsn_t lsn, recordid rid, const void * dat) { Page loadedPage = loadPage(rid.page); - /** Has no effect during normal operation. */ - pageSlotRalloc(loadedPage, rid); + /** Has no effect during normal operation, other than updating the LSN. */ + pageSlotRalloc(loadedPage, lsn, rid); return 0; } @@ -48,25 +53,31 @@ Operation getAlloc() { recordid Talloc(int xid, size_t size) { recordid rid; - /** + if(size >= BLOB_THRESHOLD_SIZE) { + rid = preAllocBlob(xid, size); + } else { - @todo we pass lsn -1 into ralloc here. This is a kludge, since we - need to log ralloc's return value, but can't get that return value - until after its executed. When it comes time to perform recovery, - it is possible that this record will be leaked during the undo - phase. We could do a two phase allocation to prevent the leak, but - then we'd need to lock the page that we're allocating a record in, - and that's a pain. Plus, this is a small leak. (There is a similar - problem involving blob allocation, which is more serious, as it may - result in double allocation...) + /** + + @todo we pass lsn -1 into ralloc here. This is a kludge, since we + need to log ralloc's return value, but can't get that return value + until after its executed. When it comes time to perform recovery, + it is possible that this record will be leaked during the undo + phase. We could do a two phase allocation to prevent the leak, but + then we'd need to lock the page that we're allocating a record in, + and that's a pain. Plus, this is a small leak. (There is a similar + problem involving blob allocation, which is more serious, as it may + result in double allocation...) + + @todo If this should be the only call to ralloc in lladd, we may consider + removing the lsn parameter someday. (Is there a good reason to + provide users with direct access to ralloc()?) - @todo If this should be the only call to ralloc in lladd, we may consider - removing the lsn parameter someday. (Is there a good reason to - provide users with direct access to ralloc()?) + */ + + rid = ralloc(xid, -1, size); - */ - - rid = ralloc(xid, -1, size); + } Tupdate(xid,rid, NULL, OPERATION_ALLOC); diff --git a/src/lladd/operations/prepare.c b/src/lladd/operations/prepare.c index 0649644..0a915b1 100644 --- a/src/lladd/operations/prepare.c +++ b/src/lladd/operations/prepare.c @@ -52,7 +52,7 @@ terms specified in this license. recordid prepare_bogus_rec = { 0, 0, 0}; -static int operate(int xid, recordid rid, const void *dat) { +static int operate(int xid, lsn_t lsn, recordid rid, const void *dat) { syncLog(); return 0; } diff --git a/src/lladd/page.c b/src/lladd/page.c index 1c2bf1b..f6c5c9c 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -333,10 +333,14 @@ recordid pageRalloc(Page page, size_t size) { /** Only used for recovery, to make sure that consistent RID's are created * on log playback. */ -recordid pageSlotRalloc(Page page, recordid rid) { +recordid pageSlotRalloc(Page page, lsn_t lsn, recordid rid) { int freeSpace = readFreeSpace(page.memAddr); int numSlots = readNumSlots(page.memAddr); +/* if(rid.size > BLOB_THRESHOLD_SIZE) { + return blobSlotAlloc(page, lsn_t lsn, recordid rid); + }*/ + /* assert(rid.slot >= numSlots); */ if(rid.slot >= numSlots) { diff --git a/src/lladd/pageCache.c b/src/lladd/pageCache.c new file mode 100644 index 0000000..81e6b34 --- /dev/null +++ b/src/lladd/pageCache.c @@ -0,0 +1,219 @@ +/** + @file + + pageCache handles the replacement policy for buffer manager. This + allows bufferManager's implementation to focus on providing atomic + writes, and locking. +*/ +#include +#include +#include +#include +#include +#include + +static pblHashTable_t *activePages; /* page lookup */ +static unsigned int bufferSize; /* < MAX_BUFFER_SIZE */ +static Page *repHead, *repMiddle, *repTail; /* replacement policy */ + +void pageCacheInit() { + + Page *first; + bufferSize = 1; + activePages = pblHtCreate(); + assert(activePages); + + first = pageAlloc(0); + pblHtInsert(activePages, &first->id, sizeof(int), first); + + first->prev = first->next = NULL; + /* pageMap(first); */ + pageRead(first); + + repHead = repTail = first; + repMiddle = NULL; + +} + +void pageCacheDeinit() { + Page *p; + DEBUG("pageCacheDeinit()"); + + for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtRemove( activePages, 0, 0 )) { + DEBUG("+"); + /** @todo No one seems to set the dirty flag... */ + /*if(p->dirty && (ret = pageWrite(p)/ *flushPage(*p)* /)) { + printf("ERROR: flushPage on %s line %d", __FILE__, __LINE__); + abort(); + / * exit(ret); * / + }*/ + pageWrite(p); + } + pblHtDelete(activePages); +} + +static void headInsert(Page *ret) { + + assert(ret != repMiddle); + assert(ret != repTail); + assert(ret != repHead); + + repHead->prev = ret; + ret->next = repHead; + ret->prev = NULL; + repHead = ret; +} + +static void middleInsert(Page *ret) { + + assert( bufferSize == MAX_BUFFER_SIZE ); + + assert(ret != repMiddle); + assert(ret != repTail); + assert(ret != repHead); + + ret->prev = repMiddle->prev; + ret->next = repMiddle; + repMiddle->prev = ret; + ret->prev->next = ret; + ret->queue = 2; + + repMiddle = ret; + assert(ret->next != ret && ret->prev != ret); +} + +static void qRemove(Page *ret) { + + assert( bufferSize == MAX_BUFFER_SIZE ); + assert(ret->next != ret && ret->prev != ret); + + if( ret->prev ) + ret->prev->next = ret->next; + else /* is head */ + repHead = ret->next; /* won't have head == tail because of test in loadPage */ + if( ret->next ) { + ret->next->prev = ret->prev; + /* TODO: these if can be better organizeed for speed */ + if( ret == repMiddle ) + /* select new middle */ + repMiddle = ret->next; + } + else /* is tail */ + repTail = ret->prev; + + assert(ret != repMiddle); + assert(ret != repTail); + assert(ret != repHead); +} + +static Page *kickPage(int pageid) { + + Page *ret = repTail; + + assert( bufferSize == MAX_BUFFER_SIZE ); + + qRemove(ret); + pblHtRemove(activePages, &ret->id, sizeof(int)); + + /* if( munmap(ret->memAddr, PAGE_SIZE) ) */ + if(pageWrite(ret)) /*flushPage(*ret)) */ + assert( 0 ); + + pageRealloc(ret, pageid); + + middleInsert(ret); + pblHtInsert(activePages, &pageid, sizeof(int), ret); + + return ret; +} + +int lastPageId = -1; +Page * lastPage = 0; + +Page *loadPagePtr(int pageid) { + /* lock activePages, bufferSize */ + Page *ret; + + if(lastPage && lastPageId == pageid) { + DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) lastPage->memAddr); + return lastPage; + } else { + ret = pblHtLookup(activePages, &pageid, sizeof(int)); + } + + if( ret ) { + if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to worry about page sorting */ + /* move to head */ + if( ret != repHead ) { + qRemove(ret); + headInsert(ret); + assert(ret->next != ret && ret->prev != ret); + + if( ret->queue == 2 ) { + /* keep first queue same size */ + repMiddle = repMiddle->prev; + repMiddle->queue = 2; + + ret->queue = 1; + } + } + } + + lastPage = ret; + lastPageId = pageid; + + DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); + + return ret; + } else if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to kick */ + ret = kickPage(pageid); + } else if( bufferSize == MAX_BUFFER_SIZE-1 ) { /* we need to setup kickPage mechanism */ + int i; + Page *iter; + + ret = pageAlloc(pageid); + headInsert(ret); + assert(ret->next != ret && ret->prev != ret); + + pblHtInsert( activePages, &pageid, sizeof(int), ret ); + + bufferSize++; + + /* split up queue: + * "in all cases studied ... fixing the primary region to 30% ... + * resulted in the best performance" + */ + repMiddle = repHead; + for( i = 0; i < MAX_BUFFER_SIZE / 3; i++ ) { + repMiddle->queue = 1; + repMiddle = repMiddle->next; + } + + for( iter = repMiddle; iter; iter = iter->next ) { + iter->queue = 2; + } + + } else { /* we are adding to an nonfull queue */ + + bufferSize++; + + ret = pageAlloc(pageid); + headInsert(ret); + assert(ret->next != ret && ret->prev != ret); + assert(ret->next != ret && ret->prev != ret); + pblHtInsert( activePages, &pageid, sizeof(int), ret ); + } + + /* we now have a page we can dump info into */ + assert( ret->id == pageid ); + + /*pageMap(ret); */ + pageRead(ret); + + lastPage = ret; + lastPageId = pageid; + + DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); + + return ret; +} diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index e7149fb..b44ae98 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -11,8 +11,9 @@ int numActiveXactions = 0; int xidCount = 0; #define INVALID_XTABLE_XID -1 -int Tinit() { +/** Needed for debugging -- sometimes we don't want to run all of Tinit() */ +void setupOperationsTable() { memset(XactionTable, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS); operationsTable[OPERATION_SET] = getSet(); operationsTable[OPERATION_INCREMENT] = getIncrement(); @@ -22,6 +23,12 @@ int Tinit() { operationsTable[OPERATION_LHREMOVE] = getLHRemove(); operationsTable[OPERATION_ALLOC] = getAlloc(); operationsTable[OPERATION_DEALLOC] = getDealloc(); + +} + +int Tinit() { + + setupOperationsTable(); pageInit(); bufInit(); diff --git a/test/lladd/check_blobRecovery.c b/test/lladd/check_blobRecovery.c index 72c54bf..9526436 100644 --- a/test/lladd/check_blobRecovery.c +++ b/test/lladd/check_blobRecovery.c @@ -449,6 +449,8 @@ START_TEST(recoverBlob__crash) { Tread(xid, rid, &j); + assert(!memcmp(j,k,ARRAY_SIZE * sizeof(int))); + fail_unless(!memcmp(j,k,ARRAY_SIZE), "Recovery failed on second re-open."); Tdeinit(); diff --git a/test/lladd/check_recovery.c b/test/lladd/check_recovery.c index 249ef9e..936809d 100644 --- a/test/lladd/check_recovery.c +++ b/test/lladd/check_recovery.c @@ -41,7 +41,7 @@ terms specified in this license. ---*/ #include #include -/*#include */ +#include #include #include @@ -141,6 +141,9 @@ START_TEST (recovery_exactlyOnce) { Tread(xid, rid, &k); + printf("j = %d, k = %d\n", j, k); + + assert(j == k); fail_unless(j == k, "Recovery messed something up!"); Tcommit(xid); diff --git a/utilities/Makefile.am b/utilities/Makefile.am new file mode 100644 index 0000000..70e90e9 --- /dev/null +++ b/utilities/Makefile.am @@ -0,0 +1,4 @@ +LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \ + $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a +bin_PROGRAMS=logfile_dump +AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/utilities/logfile_dump.c b/utilities/logfile_dump.c new file mode 100644 index 0000000..e2eafe7 --- /dev/null +++ b/utilities/logfile_dump.c @@ -0,0 +1,79 @@ +#define _GNU_SOURCE +#include +#include + +#include +#include + + +static char * logEntryToString(LogEntry * le) { + char * ret = NULL; + + switch(le->type) { + case UPDATELOG: + { + recordid rid = le->contents.clr.rid; + asprintf(&ret, "UPDATE\tlsn=%9ld\tprevlsn=%9ld\txid=%4d\trid={%5d %5d %5d}\tfuncId=%3d\targSize=%9d\n", le->LSN, le->prevLSN, le->xid, + rid.page, rid.slot, rid.size, le->contents.update.funcID, le->contents.update.argSize ); + + } + break; + case XBEGIN: + { + asprintf(&ret, "BEGIN\tlsn=%9ld\tprevlsn=%9ld\txid=%4d\n", le->LSN, le->prevLSN, le->xid); + } + break; + case XCOMMIT: + { + asprintf(&ret, "COMMIT\tlsn=%9ld\tprevlsn=%9ld\txid=%4d\n", le->LSN, le->prevLSN, le->xid); + + } + break; + case XABORT: + { + asprintf(&ret, "ABORT\tlsn=%9ld\tprevlsn=%9ld\txid=%4d\n", le->LSN, le->prevLSN, le->xid); + + } + break; + case XEND: + { + asprintf(&ret, "END \tlsn=%9ld\tprevlsn=%9ld\txid=%4d\n", le->LSN, le->prevLSN, le->xid); + } + break; + case CLRLOG: + { + recordid rid = le->contents.clr.rid; + asprintf(&ret, "CLR \tlsn=%9ld\tprevlsn=%9ld\txid=%4d\trid={%5d %5d %5d}\tthisUpdateLSN=%9ld\tundoNextLSN=%9ld\n", le->LSN, le->prevLSN, le->xid, + rid.page, rid.slot, rid.size, (long int)le->contents.clr.thisUpdateLSN, (long int)le->contents.clr.undoNextLSN ); + } + break; + } + return ret; +} + +void setupOperationsTable(); + +int main() { + LogHandle lh; + LogEntry * le; + + setupOperationsTable(); + + if(openLogWriter()) { + printf("Couldn't open log.\n"); + } + + lh = getLSNHandle(44); + + while((le = nextInLog(&lh))) { + + char * s = logEntryToString(le); + if(s) { + printf("%s", s); + + free(s); + } + } + + +}