diff --git a/config.h b/config.h deleted file mode 100644 index d25d6a8..0000000 --- a/config.h +++ /dev/null @@ -1,188 +0,0 @@ -/* config.h. Generated by configure. */ -/* config.h.in. Generated from configure.in by autoheader. */ - -/* Define to 1 if you have the header file. */ -#define HAVE_ARPA_INET_H 1 - -/* Define to 1 if you have the `bzero' function. */ -#define HAVE_BZERO 1 - -/* Define to 1 if you have the header file, and it defines `DIR'. - */ -#define HAVE_DIRENT_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_DLFCN_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_FCNTL_H 1 - -/* Define to 1 if you have the `fdatasync' function. */ -#define HAVE_FDATASYNC 1 - -/* Define to 1 if you have the `getcwd' function. */ -#define HAVE_GETCWD 1 - -/* Define to 1 if you have the `getpagesize' function. */ -#define HAVE_GETPAGESIZE 1 - -/* Define to 1 if you have the `gettimeofday' function. */ -#define HAVE_GETTIMEOFDAY 1 - -/* Define to 1 if you have the `inet_ntoa' function. */ -#define HAVE_INET_NTOA 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_INTTYPES_H 1 - -/* Define to 1 if you have the `pthread' library (-lpthread). */ -#define HAVE_LIBPTHREAD 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_LIMITS_H 1 - -/* Define to 1 if your system has a GNU libc compatible `malloc' function, and - to 0 otherwise. */ -#define HAVE_MALLOC 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_MALLOC_H 1 - -/* Define to 1 if you have the `memmove' function. */ -#define HAVE_MEMMOVE 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_MEMORY_H 1 - -/* Define to 1 if you have the `memset' function. */ -#define HAVE_MEMSET 1 - -/* Define to 1 if you have the `mkdir' function. */ -#define HAVE_MKDIR 1 - -/* Define to 1 if you have a working `mmap' system call. */ -#define HAVE_MMAP 1 - -/* Define to 1 if you have the `munmap' function. */ -#define HAVE_MUNMAP 1 - -/* Define to 1 if you have the header file, and it defines `DIR'. */ -/* #undef HAVE_NDIR_H */ - -/* Define to 1 if you have the header file. */ -#define HAVE_NETDB_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_NETINET_IN_H 1 - -/* Define to 1 if you have the `socket' function. */ -#define HAVE_SOCKET 1 - -/* Define to 1 if `stat' has the bug that it succeeds when given the - zero-length file name argument. */ -/* #undef HAVE_STAT_EMPTY_STRING_BUG */ - -/* Define to 1 if you have the header file. */ -#define HAVE_STDDEF_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_STDINT_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_STDLIB_H 1 - -/* Define to 1 if you have the `strchr' function. */ -#define HAVE_STRCHR 1 - -/* Define to 1 if you have the `strdup' function. */ -#define HAVE_STRDUP 1 - -/* Define to 1 if you have the `strerror' function. */ -#define HAVE_STRERROR 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_STRINGS_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_STRING_H 1 - -/* Define to 1 if you have the `strrchr' function. */ -#define HAVE_STRRCHR 1 - -/* Define to 1 if you have the `strstr' function. */ -#define HAVE_STRSTR 1 - -/* Define to 1 if you have the `strtoul' function. */ -#define HAVE_STRTOUL 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_SYSLOG_H 1 - -/* Define to 1 if you have the header file, and it defines `DIR'. - */ -/* #undef HAVE_SYS_DIR_H */ - -/* Define to 1 if you have the header file, and it defines `DIR'. - */ -/* #undef HAVE_SYS_NDIR_H */ - -/* Define to 1 if you have the header file. */ -#define HAVE_SYS_SOCKET_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_SYS_STAT_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_SYS_TIME_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_SYS_TYPES_H 1 - -/* Define to 1 if you have that is POSIX.1 compatible. */ -#define HAVE_SYS_WAIT_H 1 - -/* Define to 1 if you have the header file. */ -#define HAVE_UNISTD_H 1 - -/* Define to 1 if `lstat' dereferences a symlink specified with a trailing - slash. */ -#define LSTAT_FOLLOWS_SLASHED_SYMLINK 1 - -/* Name of package */ -#define PACKAGE "hello" - -/* Define to the address where bug reports for this package should be sent. */ -#define PACKAGE_BUGREPORT "sears@cs.berkeley.edu" - -/* Define to the full name of this package. */ -#define PACKAGE_NAME "PACKAGE" - -/* Define to the full name and version of this package. */ -#define PACKAGE_STRING "PACKAGE VERSION" - -/* Define to the one symbol short name of this package. */ -#define PACKAGE_TARNAME "package" - -/* Define to the version of this package. */ -#define PACKAGE_VERSION "VERSION" - -/* Define to 1 if you have the ANSI C header files. */ -#define STDC_HEADERS 1 - -/* Define to 1 if you can safely include both and . */ -#define TIME_WITH_SYS_TIME 1 - -/* Version number of package */ -#define VERSION "0.1" - -/* Define to empty if `const' does not conform to ANSI C. */ -/* #undef const */ - -/* Define to rpl_malloc if the replacement function should be used. */ -/* #undef malloc */ - -/* Define to `int' if does not define. */ -/* #undef pid_t */ - -/* Define to `unsigned' if does not define. */ -/* #undef size_t */ diff --git a/configure.in b/configure.in index f25ac89..91d659b 100644 --- a/configure.in +++ b/configure.in @@ -27,6 +27,21 @@ AM_PATH_CHECK(,[have_check="yes"], [have_check="no"]) AM_CONDITIONAL(HAVE_CHECK, test x"$have_check", "xyes") +## alas, it won't link if this is put in here.. instead, it's linked in manually in the test directory... + +#AC_ARG_ENABLE(efence, +#[ --enable-efence Use electric fence (www.perens.com)], +#[case "$enableval" in \ +# yes) efence=yes ;; \ +# no) efence=no;; \ +# *) efence=no;; \ +#esac], [efence=no]) +# +#dnl Check for efence +#if test $efence = yes; then +#AC_CHECK_LIB(efence,memalign) +#fi + # Checks for header files. AC_HEADER_DIRENT AC_HEADER_STDC diff --git a/lladd/bufferManager.h b/lladd/bufferManager.h index 697cedb..2cdcdbf 100644 --- a/lladd/bufferManager.h +++ b/lladd/bufferManager.h @@ -168,7 +168,7 @@ Page loadPage(int pageid); * @param size The size of the new record * @return allocated record */ -recordid ralloc(int xid, size_t size); +recordid ralloc(int xid, lsn_t lsn, size_t size); /** * Find a page with some free space. @@ -177,7 +177,9 @@ recordid ralloc(int xid, size_t size); /** - * This function updates the LSN of a page. This is needed by the + * This function updates the LSN of a page. + * + * This is needed by the * recovery process to make sure that each action is undone or redone * exactly once. * @@ -195,7 +197,7 @@ recordid ralloc(int xid, size_t size); * atomically. (write does not have to write all of the data that was * passed to it...) */ -void writeLSN(long LSN, int pageid); +/*void writeLSN(long LSN, int pageid); */ /** * @param pageid ID of page you want to read @@ -208,7 +210,7 @@ long readLSN(int pageid); * @param rid recordid where you want to write * @param dat data you wish to write */ -void writeRecord(int xid, recordid rid, const void *dat); +void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat); /** * @param xid transaction ID @@ -245,7 +247,7 @@ int dropPage(Page page); * @return 0 on success * @return error code on failure */ -int bufTransCommit(int xid); +int bufTransCommit(int xid, lsn_t lsn); /** * @@ -255,7 +257,7 @@ int bufTransCommit(int xid); * @return 0 on success * @return error code on failure */ -int bufTransAbort(int xid); +int bufTransAbort(int xid, lsn_t lsn); /** * will write out any dirty pages, assumes that there are no running diff --git a/lladd/logger/logEntry.h b/lladd/logger/logEntry.h index ef93fcb..4b66954 100644 --- a/lladd/logger/logEntry.h +++ b/lladd/logger/logEntry.h @@ -68,7 +68,7 @@ typedef struct { typedef struct { unsigned int funcID : 8; recordid rid; - unsigned int argSize : 16; /*2^16 = 64M*/ + unsigned int argSize; /* int invertible; */ /* no longer needed */ /* Implicit members: args; @ ((byte*)ule) + sizeof(UpdateLogEntry) diff --git a/lladd/logger/logger2.h b/lladd/logger/logger2.h index 7a37e09..be21559 100644 --- a/lladd/logger/logger2.h +++ b/lladd/logger/logger2.h @@ -75,16 +75,16 @@ TransactionLog LogTransBegin(int xid); /** Write a transaction COMMIT to the log tail, then flush the log tail immediately to disk - @return 0 if the transaction succeeds, an error code otherwise. + @return The lsn of the commit log entry. */ -void LogTransCommit(TransactionLog * l); +lsn_t LogTransCommit(TransactionLog * l); /** Write a transaction ABORT to the log tail - @return 0 if the transaction was successfully aborted. + @return The lsn of the abort log entry. */ -void LogTransAbort(TransactionLog * l); +lsn_t LogTransAbort(TransactionLog * l); /** LogUpdate writes an UPDATE log record to the log tail diff --git a/lladd/operations.h b/lladd/operations.h index 8bc46f4..90a6078 100644 --- a/lladd/operations.h +++ b/lladd/operations.h @@ -68,7 +68,7 @@ BEGIN_C_DECLS /* @type Function * function pointer that the operation will run */ -typedef int (*Function)(int xid, recordid r, const void *d); +typedef int (*Function)(int xid, lsn_t lsn, recordid r, const void *d); /* @type Operation @@ -153,14 +153,19 @@ extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */ void doUpdate(const LogEntry * e); /** Undo the update under normal operation, and during recovery. - Assumes that the operation's results are reflected in the contents of the buffer manager. + Checks to see if the operation's results are reflected in the + contents of the buffer manager. If they are, then it performs the + undo. Does not write to the log. - @todo Currently, undos do not result in CLR entries, but they should. (Should this be done here?) + This function does not generate CLR because this would result in + extra CLRs being generated during recovery. + @param e The log entry containing the operation to be undone. + @param clr_lsn The lsn of the clr that corresponds to this undo operation. */ -void undoUpdate(const LogEntry * e); +void undoUpdate(const LogEntry * e, lsn_t clr_lsn); /** Redoes an operation during recovery. This is different than doUpdate because it checks to see if the operation needs to be redone diff --git a/src/lladd/blobManager.c b/src/lladd/blobManager.c index 083394b..087d2f8 100644 --- a/src/lladd/blobManager.c +++ b/src/lladd/blobManager.c @@ -1,23 +1,47 @@ -#include "blobManager.h" -#include -#include -#include +#include +#include +#include +#include +/* stdio */ + #include #include -#include -#include -#include -#include #include +#include +#include +#include +#include -static FILE * blobf0, * blobf1; +#include "blobManager.h" + + + +static FILE * blobf0 = NULL, * blobf1 = NULL; /** This is a hash of hash tables. The outer hash maps from xid to inner hash. The inner hash maps from rid to lsn. */ static pblHashTable_t * dirtyBlobs; +/** Plays a nasty trick on bufferManager to force it to read and write + blob_record_t items for us. Relies upon bufferManager (and + page.c's) trust in the rid.size field... */ +static void readRawRecord(int xid, recordid rid, void * buf, int size) { + recordid blob_rec_rid = rid; + blob_rec_rid.size = size; + readRecord(xid, blob_rec_rid, buf); +} + +static void writeRawRecord(int xid, lsn_t lsn, recordid rid, const void * buf, int size) { + recordid blob_rec_rid = rid; + blob_rec_rid.size = size; + /* writeRecord(xid, lsn, blob_rec_rid, buf); */ + Tset(xid, blob_rec_rid, buf); +} + + + /* moved verbatim from bufferManger.c */ void openBlobStore() { int blobfd0, blobfd1; @@ -32,7 +56,10 @@ void openBlobStore() { } if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); } } - if( ! (blobf1 = fopen(BLOB1_FILE, "w+"))) { /* file may not exist */ + + DEBUG("blobf0 opened.\n"); + + if( ! (blobf1 = fopen(BLOB1_FILE, "r+"))) { /* file may not exist */ if( (blobfd1 = creat(BLOB1_FILE, 0666)) == -1 ) { /* cannot even create it */ printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); perror("Creating blob 1 file"); abort(); @@ -41,8 +68,11 @@ void openBlobStore() { printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__); perror(NULL); abort(); } - if(!(blobf1 = fopen(BLOB1_FILE, "w+"))) { perror("Couldn't open or create blob 1 file"); abort(); } + if(!(blobf1 = fopen(BLOB1_FILE, "r+"))) { perror("Couldn't open or create blob 1 file"); abort(); } } + + DEBUG("blobf1 opened.\n"); + dirtyBlobs = pblHtCreate(); } @@ -50,11 +80,16 @@ void openBlobStore() { @todo memory leak: Will leak memory if there are any outstanding xacts that have written to blobs. Should explicitly abort them - instead of just invalidating the dirtyBlobs hash. + instead of just invalidating the dirtyBlobs hash. + + (If the you fix the above @todo, don't forget to fix + bufferManager's simulateBufferManagerCrash.) */ void closeBlobStore() { - assert(!fclose(blobf0)); - assert(!fclose(blobf1)); + int ret = fclose(blobf0); + assert(!ret); + ret = fclose(blobf1); + assert(!ret); blobf0 = NULL; blobf1 = NULL; @@ -63,8 +98,8 @@ void closeBlobStore() { static long myFseek(FILE * f, long offset, int whence) { long ret; - if(!fseek(blobf1, offset, whence)) { perror (NULL); abort(); } - if(-1 == (ret = ftell(f))) { perror(NULL); abort(); } + if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); } + if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); } return ret; } @@ -76,8 +111,12 @@ recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) { char zero = 0; /* Allocate space for the blob entry. */ + assert(blobSize > 0); /* Don't support zero length blobs right now... */ + /* First in buffer manager. */ - recordid rid = ralloc(xid, sizeof(blob_record_t)); + /* recordid rid = ralloc(xid, lsn, sizeof(blob_record_t)); */ + + recordid rid = Talloc(xid, sizeof(blob_record_t)); readRecord(xid, rid, &blob_rec); @@ -91,8 +130,8 @@ recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) { myFseek(blobf0, fileSize + blobSize - 1, SEEK_SET); myFseek(blobf1, fileSize + blobSize - 1, SEEK_SET); - if(1 != fwrite(&zero, 0, sizeof(char), blobf0)) { perror(NULL); abort(); } - if(1 != fwrite(&zero, 0, sizeof(char), blobf1)) { perror(NULL); abort(); } + if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); } + if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); } /** Finally, fix up the fields in the record that points to the blob. */ @@ -107,12 +146,14 @@ recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) { /* writeRecord needs to know to 'do the right thing' here, since we've changed the size it has recorded for this record. */ /* @todo What should writeRawRecord do with the lsn? */ - writeRawRecord (rid, lsn, &blob_rec, sizeof(blob_record_t)); + writeRawRecord (xid, lsn, rid, &blob_rec, sizeof(blob_record_t)); + + rid.size = blob_rec.size; return rid; } -static lsn_t * tripleHashLookup(int xid, recordid rid) { + static lsn_t * tripleHashLookup(int xid, recordid rid) { pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(xid)); if(xidHash == NULL) { return NULL; @@ -149,11 +190,11 @@ static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) { pblHtInsert(pageXidHash, &rid, sizeof(recordid), copy); } - +/* static void tripleHashRemove(int xid, recordid rid) { pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); - if(xidHash) { /* Else, there was no xid, rid pair. */ + if(xidHash) { / * Else, there was no xid, rid pair. * / pblHashTable_t * pageXidHash = pblHtLookup(xidHash, &(rid.page), sizeof(int)); if(pageXidHash) { @@ -162,11 +203,11 @@ static void tripleHashRemove(int xid, recordid rid) { pblHtRemove(pageXidHash, &rid, sizeof(recordid)); free(delme); - /* We freed a member of pageXidHash. Is it empty? */ + / * We freed a member of pageXidHash. Is it empty? * / if(!pblHtFirst(pageXidHash)) { pblHtRemove(xidHash, &(rid.page), sizeof(int)); - /* Is xidHash now empty? */ + / * Is xidHash now empty? * / if(!pblHtFirst(xidHash)) { pblHtRemove(dirtyBlobs, &xid, sizeof(int)); free(xidHash); @@ -176,92 +217,127 @@ static void tripleHashRemove(int xid, recordid rid) { } } } -} +}*/ + void readBlob(int xid, recordid rid, void * buf) { /* First, determine if the blob is dirty. */ - lsn_t * dirty = tripleHashLookup(xid, rid); - - blob_rec_t rec; + /* lsn_t * dirty = tripleHashLookup(xid, rid); */ + blob_record_t rec; + /* int readcount; */ FILE * fd; + long offset; - readRawRecord(rid, &rec, sizeof(blob_rec_t)); + assert(buf); - if(dirty) { - fd = rec.fd ? blobf0 : blobf1; /* Read the updated version */ + readRawRecord(xid, rid, &rec, sizeof(blob_record_t)); + + /* if(dirty) { + DEBUG("Reading dirty blob.\n"); + fd = rec.fd ? blobf0 : blobf1; / * Read the updated version * / } else { - fd = rec.fd ? blobf1 : blobf0; /* Read the clean version */ - } + DEBUG("Reading clean blob.\n"); + fd = rec.fd ? blobf1 : blobf0; / * Read the clean version * / + } */ - assert(rec.offset == myFseek(fd, rec.offset, SEEK_SET)); - assert(1 == fread(buf, rec.size, 1, fd)); + fd = rec.fd ? blobf1 : blobf0; + + offset = myFseek(fd, (long int) rec.offset, SEEK_SET); + + DEBUG("reading blob at offset %d (%ld), size %ld, buffer %x\n", rec.offset, offset, rec.size, (unsigned int) buf); + + assert(rec.offset == offset); + if(1 != fread(buf, rec.size, 1, fd)) { + + if(feof(fd)) { printf("Unexpected eof!\n"); fflush(NULL); abort(); } + if(ferror(fd)) { printf("Error reading stream! %d", ferror(fd)); fflush(NULL); abort(); } + + } } -static int one = 1; /** @todo dirtyBlobs should contain the highest LSN that wrote to the current version of the dirty blob, and the lsn field should be checked to be sure that it increases monotonically. */ -void writeBlob(int xid, recordid rid, lsn_t lsn, void * buf) { +void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { /* First, determine if the blob is dirty. */ lsn_t * dirty = tripleHashLookup(xid, rid); - blob_rec_t rec; - + blob_record_t rec; + long offset; FILE * fd; + int readcount; - assert(rid.size == BLOB_SLOT); + /* Tread() raw record */ + readRawRecord(xid, rid, &rec, sizeof(blob_record_t)); if(dirty) { assert(lsn > *dirty); *dirty = lsn; /* Updates value in dirty blobs (works because of pointer aliasing.) */ - } else { - tripleHashInsert(xid, rid, lsn); - } - readRawRecord(rid, &rec, sizeof(blob_record_t)); + DEBUG("Blob already dirty.\n"); + - fd = rec.fd ? blobf0 : blobf1; /* Read the slot for the dirty (updated) version. */ - - assert(rec.offset == myFseek(fd, rec.offset, SEEK_SET)); - assert(1 == fwrite(buf, rec.size, 1, fd)); + + } else { + DEBUG("Marking blob dirty.\n"); + tripleHashInsert(xid, rid, lsn); + /* Flip the fd bit on the record. */ + rec.fd = rec.fd ? 0 : 1; + + /* Tset() raw record */ + writeRawRecord(xid, lsn, rid, &rec, sizeof(blob_record_t)); + } + /* + readRawRecord(xid, rid, &rec, sizeof(blob_record_t)); + + fd = rec.fd ? blobf0 : blobf1; / * Read the slot for the dirty (updated) version. * / + + */ + + fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */ + + offset = myFseek(fd, rec.offset, SEEK_SET); + + printf("Writing at offset = %d, size = %ld\n", rec.offset, rec.size); + assert(offset == rec.offset); + readcount = fwrite(buf, rec.size, 1, fd); + assert(1 == readcount); /* No need to update the raw blob record. */ } /** @todo check return values */ +/* void commitBlobs(int xid, lsn_t lsn) { - lsn_t * dirty = tripleHashLookup(xid, rid); - - /* Because this is a commit, we must update each page atomically. + / * Because this is a commit, we must update each page atomically. Therefore, we need to re-group the dirtied blobs by page id, and then issue one write per page. Since we write flip the bits of each dirty blob record on the page, we can't get away with incrementally - updating things. */ + updating things. * / pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); - lsn_t * lsn; - int last_page = -1; - Page p; pblHashTable_t * this_bucket; - + + if(!rid_buckets) { return; } / * No blobs for this xid. * / + for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) { blob_record_t buf; recordid * rid_ptr; lsn_t * rid_lsn; int first = 1; int page_number; - /* All right, this_bucket contains all of the rids for this page. */ + / * All right, this_bucket contains all of the rids for this page. * / for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) { - /** @todo INTERFACE VIOLATION Can only use bufferManager's + / ** @todo INTERFACE VIOLATION Can only use bufferManager's read/write record since we're single threaded, and this calling sequence cannot possibly call kick page. Really, we sould use pageReadRecord / pageWriteRecord, and bufferManager should let - us write out the whole page atomically... */ + us write out the whole page atomically... * / - rid_ptr = pblCurrentKey(this_bucket); + rid_ptr = pblHtCurrentKey(this_bucket); if(first) { page_number = rid_ptr->page; @@ -270,46 +346,58 @@ void commitBlobs(int xid, lsn_t lsn) { assert(page_number == rid_ptr->page); } - readRawRecord(*rid_ptr, &buf, sizeof(blob_record_t)); - /* This rid is dirty, so swap the fd pointer. */ + / ** @todo For now, we assume that overlapping transactions (from + the Tbegin() to Tcommit() call) do not access the same + blob. * / + + readRawRecord(xid, *rid_ptr, &buf, sizeof(blob_record_t)); + / * This rid is dirty, so swap the fd pointer. * / buf.fd = (buf.fd ? 0 : 1); - writeRawRecord(*rid_ptr, lsn, &buf, sizeof(blob_record_t)); - pblHtRemove(rid_ptr); - free(rid_ptr); + writeRawRecord(xid, lsn, *rid_ptr, &buf, sizeof(blob_record_t)); + pblHtRemove(this_bucket, rid_ptr, sizeof(recordid)); + / * free(rid_ptr); * / + free(rid_lsn); } if(!first) { pblHtRemove(rid_buckets, &page_number, sizeof(int)); } else { - abort(); /* Bucket existed, but was empty?!? */ + abort(); / * Bucket existed, but was empty?!? * / } pblHtDelete(this_bucket); } } +*/ +void commitBlobs(int xid) { + abortBlobs(xid); +} +/** + Just clean up the dirty list for this xid. @todo Check return values. + + (Functionally equivalent to the old rmTouch() function. Just + deletes this xid's dirty list.) -/** Easier than commit blobs. Just clean up the dirty list for this xid. @todo Check return values. */ + @todo doesn't take lsn_t, since it doesnt write any blobs. Change the api? + +*/ void abortBlobs(int xid) { pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); - lsn_t * lsn; - int last_page = -1; - Page p; pblHashTable_t * this_bucket; + if(!rid_buckets) { return; } /* No dirty blobs for this xid.. */ + for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) { - blob_record_t buf; - recordid * rid_ptr; lsn_t * rid_lsn; - int first = 1; int page_number; /* All right, this_bucket contains all of the rids for this page. */ for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) { recordid * rid = pblHtCurrentKey(this_bucket); + page_number = rid->page; pblHtRemove(this_bucket, rid, sizeof(recordid)); free(rid_lsn); - page_number = rid->page; } pblHtRemove(rid_buckets, &page_number, sizeof(int)); diff --git a/src/lladd/blobManager.h b/src/lladd/blobManager.h index c2a78f8..2d2dae6 100644 --- a/src/lladd/blobManager.h +++ b/src/lladd/blobManager.h @@ -5,10 +5,35 @@ #include BEGIN_C_DECLS -/** blobManager - Provides blob handling @todo Set range?? +/** + @file + blobManager - Provides blob handling @todo Set range?? Plan for modularity: Exactly one blob manager per storeFile. Blob manager interacts with page manger via page manager's public api. + + Blob updates work as follows: + + (1) A transaction obtains an exclusive write lock on a blob + (not implemented yet.) + + (2) When it requests a write, the blob it writes to is added to + a data structure that lists all dirty blobs by transaction, + and the page containing the blob entry is updated. (The fd + bit in the record is flipped, and the LSN is updated.) The + write to the blob store is not flushed to disk. + + (3) All subsequent writes to the same blob just update the + backing file. + + (4) On commit and rollback, the data structure containing the xid's + dirty blobs is destroyed. + + (5) recovery2.c handles the restoration of the fd bits using + physical logging (this is automatic, since we use Tset() + calls to update the records.) + + @todo Should the tripleHash be its own little support library? */ @@ -17,12 +42,14 @@ BEGIN_C_DECLS it's dirty (it could have been stolen), an retrieve it from the appropriate blob file. */ -void readBlob(recordid rid, void * buf); +void readBlob(int xid, recordid rid, void * buf); + /** If you write to a blob, call this function to mark it dirty. */ -void writeBlob(recordid rid, lsn_t lsn, void * buf); +void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf); + /** Atomically (with respect to recovery) make the dirty version of the diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index 60b1b0b..5ca2e05 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -68,8 +68,8 @@ static unsigned int bufferSize = 1; /* < MAX_BUFFER_SIZE */ static Page *repHead, *repMiddle, *repTail; /* replacement policy */ static int stable = -1; -int blobfd0 = -1; -int blobfd1 = -1; +/*int blobfd0 = -1; + int blobfd1 = -1;*/ static void pageMap(Page *ret) { @@ -95,8 +95,8 @@ int bufInit() { bufferSize = 1; stable = -1; - blobfd0 = -1; - blobfd1 = -1; + /* blobfd0 = -1; + blobfd1 = -1; */ /* Create STORE_FILE, BLOB0_FILE, BLOB1_FILE if necessary, @@ -332,52 +332,73 @@ Page loadPage (int pageid) { Page * lastRallocPage = 0; -recordid ralloc(int xid, size_t size) { +recordid ralloc(int xid, lsn_t lsn, size_t size) { static unsigned int lastFreepage = 0; + recordid ret; Page p; - int blobSize = 0; + /* int blobSize = 0; */ if (size >= BLOB_THRESHOLD_SIZE) { /* TODO combine this with if below */ - blobSize = size; - size = BLOB_REC_SIZE; - } - - while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; } - - if (blobSize >= BLOB_THRESHOLD_SIZE) { - int fileSize = (int) lseek(blobfd1, 0 , SEEK_END); - /* fstat(blobfd1, &sb); - fileSize = (int) sb.st_size; */ - lseek(blobfd0, fileSize+blobSize-1, SEEK_SET); + ret = allocBlob(xid, lsn, size); + /* blobSize = size; + size = BLOB_REC_SIZE; */ + } else { + + while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; } + + /* if (blobSize >= BLOB_THRESHOLD_SIZE) { + int fileSize = (int) lseek(blobfd1, 0 , SEEK_END); + / * fstat(blobfd1, &sb); + fileSize = (int) sb.st_size; * / + lseek(blobfd0, fileSize+blobSize-1, SEEK_SET); write(blobfd0, "", 1); lseek(blobfd1, fileSize+blobSize-1, SEEK_SET); write(blobfd1, "", 1); return pageBalloc(p, blobSize, fileSize); - } else { - return pageRalloc(p, size); - } + } else { */ + ret = pageRalloc(p, size); + /* } */ + } + DEBUG("alloced rid = {%d, %d, %d}\n", ret.page, ret.slot, ret.size); + return ret; } long readLSN(int pageid) { return pageReadLSN(loadPage(pageid)); } -void writeLSN(long LSN, int pageid) { +static void writeLSN(lsn_t LSN, int pageid) { Page *p = loadPagePtr(pageid); p->LSN = LSN; pageWriteLSN(*p); } -void writeRecord(int xid, recordid rid, const void *dat) { +void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) { - Page *p = loadPagePtr(rid.page); - assert( (p->id == rid.page) && (p->memAddr != NULL) ); + Page *p; - pageWriteRecord(xid, *p, rid, dat); /* Used to attempt to return this. */ + if(rid.size > BLOB_THRESHOLD_SIZE) { + DEBUG("Writing blob.\n"); + writeBlob(xid, lsn, rid, dat); + + } else { + DEBUG("Writing record.\n"); + p = loadPagePtr(rid.page); + assert( (p->id == rid.page) && (p->memAddr != NULL) ); + + pageWriteRecord(xid, *p, rid, dat); + writeLSN(lsn, rid.page); + } } void readRecord(int xid, recordid rid, void *buf) { - pageReadRecord(xid, loadPage(rid.page), rid, buf); /* Used to attempt to return this. */ + if(rid.size > BLOB_THRESHOLD_SIZE) { + DEBUG("Reading blob. xid = %d rid = { %d %d %d } buf = %x\n", xid, rid.page, rid.slot, rid.size, (unsigned int)buf); + readBlob(xid, rid, buf); + } else { + DEBUG("Reading record xid = %d rid = { %d %d %d } buf = %x\n", xid, rid.page, rid.slot, rid.size, (unsigned int)buf); + pageReadRecord(xid, loadPage(rid.page), rid, buf); + } } int flushPage(Page page) { @@ -388,21 +409,27 @@ int flushPage(Page page) { return 0; } -int bufTransCommit(int xid) { +int bufTransCommit(int xid, lsn_t lsn) { + commitBlobs(xid); + + /** @todo Figure out where the blob files are fsynced() and delete this and the next few lines... */ + + /* fdatasync(blobfd0); fdatasync(blobfd1); + */ + + pageCommit(xid); - pageCommit(xid); - - return 0; + return 0; } -int bufTransAbort(int xid) { +int bufTransAbort(int xid, lsn_t lsn) { + abortBlobs(xid); /* abortBlobs doesn't write any log entries, so it doesn't need the lsn. */ + pageAbort(xid); - pageAbort(xid); - - return 0; + return 0; } void bufDeinit() { diff --git a/src/lladd/logger/logger2.c b/src/lladd/logger/logger2.c index 6ba9fdc..8ba54de 100644 --- a/src/lladd/logger/logger2.c +++ b/src/lladd/logger/logger2.c @@ -44,6 +44,8 @@ terms specified in this license. #include #include +#include + TransactionLog LogTransBegin(int xid) { TransactionLog tl; tl.xid = xid; @@ -53,21 +55,29 @@ TransactionLog LogTransBegin(int xid) { return tl; } -static void LogTransCommon(TransactionLog * l, int type) { +static lsn_t LogTransCommon(TransactionLog * l, int type) { LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type); + lsn_t ret; + writeLogEntry(e); l->prevLSN = e->LSN; DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); + + ret = e->LSN; + free(e); + + return ret; + } -void LogTransCommit(TransactionLog * l) { - LogTransCommon(l, XCOMMIT); +lsn_t LogTransCommit(TransactionLog * l) { + return LogTransCommon(l, XCOMMIT); } -void LogTransAbort(TransactionLog * l) { - LogTransCommon(l, XABORT); +lsn_t LogTransAbort(TransactionLog * l) { + return LogTransCommon(l, XABORT); } LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte * args) { @@ -82,14 +92,17 @@ LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte } if(operationsTable[operation].undo == NO_INVERSE) { + DEBUG("Creating %d byte physical pre-image.\n", rid.size); preImage = malloc(rid.size); + if(!preImage) { perror("malloc"); abort(); } readRecord(l->xid, rid, preImage); + DEBUG("got preimage"); } e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage); writeLogEntry(e); - DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, - (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); + DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid, + (long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize); if(preImage) { free(preImage); diff --git a/src/lladd/operations.c b/src/lladd/operations.c index 7c055e2..7f0d924 100644 --- a/src/lladd/operations.c +++ b/src/lladd/operations.c @@ -48,19 +48,23 @@ terms specified in this license. Operation operationsTable[MAX_OPERATIONS]; void doUpdate(const LogEntry * e) { - operationsTable[e->contents.update.funcID].run(e->xid, e->contents.update.rid, getUpdateArgs(e)); + + DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN); + + operationsTable[e->contents.update.funcID].run(e->xid, e->LSN, e->contents.update.rid, getUpdateArgs(e)); } void redoUpdate(const LogEntry * e) { if(e->type == UPDATELOG) { + lsn_t pageLSN = readLSN(e->contents.update.rid.page); #ifdef DEBUGGING recordid rid = e->contents.update.rid; #endif - if(e->LSN > readLSN(e->contents.update.rid.page)) { - DEBUG("Redo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size); + if(e->LSN > pageLSN) { + DEBUG("OPERATION Redo, %ld > %ld {%d %d %d}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size); doUpdate(e); } else { - DEBUG("Skipping redo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size); + DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %d}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size); } } else if(e->type == CLRLOG) { LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN); @@ -71,10 +75,10 @@ void redoUpdate(const LogEntry * e) { doesn't, then undo the original operation. */ if(f->LSN > readLSN(e->contents.update.rid.page)) { - DEBUG("Undoing for clr, %ld {%d %d %d}\n", f->LSN, rid.page, rid.slot, rid.size); - undoUpdate(f); + DEBUG("OPERATION Undoing for clr, %ld {%d %d %d}\n", f->LSN, rid.page, rid.slot, rid.size); + undoUpdate(f, e->LSN); } else { - DEBUG("Skiping undo for clr, %ld {%d %d %d}\n", f->LSN, rid.page, rid.slot, rid.size); + DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %d}\n", f->LSN, rid.page, rid.slot, rid.size); } } else { assert(0); @@ -83,30 +87,39 @@ void redoUpdate(const LogEntry * e) { } -void undoUpdate(const LogEntry * e) { +void undoUpdate(const LogEntry * e, lsn_t clr_lsn) { int undo = operationsTable[e->contents.update.funcID].undo; - /* printf("FuncID: %d Undo op: %d\n",e->contents.update.funcID, undo); fflush(NULL); */ - if(e->LSN <= readLSN(e->contents.update.rid.page)) { + DEBUG("OPERATION FuncID %d Undo op %d LSN %ld\n",e->contents.update.funcID, undo, clr_lsn); + #ifdef DEBUGGING - recordid rid = e->contents.update.rid; + recordid rid = e->contents.update.rid; #endif - - + lsn_t page_lsn = readLSN(e->contents.update.rid.page); + if(e->LSN <= page_lsn) { /* Actually execute the undo */ if(undo == NO_INVERSE) { /* Physical undo */ - DEBUG("Physical undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size); + DEBUG("OPERATION Physical undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size); /** @todo Why were we passing in RECOVERY_XID? */ - writeRecord(e->xid, e->contents.update.rid, getUpdatePreImage(e)); + writeRecord(e->xid, clr_lsn, e->contents.update.rid, getUpdatePreImage(e)); } else { /* @see doUpdate() */ /* printf("Logical undo"); fflush(NULL); */ - DEBUG("Logical undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size); - operationsTable[undo].run(e->xid, e->contents.update.rid, getUpdateArgs(e)); + DEBUG("OPERATION Logical undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size); + operationsTable[undo].run(e->xid, clr_lsn, e->contents.update.rid, getUpdateArgs(e)); } + } else { + DEBUG("OPERATION Skipping undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size); + /* if(page_lsn < clr_lsn) { + / * It is possible that we are in the process of lazily + propogating this change to the page. If this is the case, + simply update the page lsn to the new clr lsn. (This is no longer true)* / + writeLSN(clr_lsn, e->contents.update.rid.page); + } */ + } /* printf("Undo done."); fflush(NULL); */ diff --git a/src/lladd/operations/alloc.c b/src/lladd/operations/alloc.c index d1d3bab..2409101 100644 --- a/src/lladd/operations/alloc.c +++ b/src/lladd/operations/alloc.c @@ -18,7 +18,7 @@ */ -static int operate(int xid, recordid rid, const void * dat) { +static int operate(int xid, lsn_t lsn, recordid rid, const void * dat) { Page loadedPage = loadPage(rid.page); /** Has no effect during normal operation. */ pageSlotRalloc(loadedPage, rid); @@ -26,7 +26,7 @@ static int operate(int xid, recordid rid, const void * dat) { } /** @todo Currently, we just lead store space on dealloc. */ -static int deoperate(int xid, recordid rid, const void * dat) { +static int deoperate(int xid, lsn_t lsn, recordid rid, const void * dat) { Page loadedPage = loadPage(rid.page); /** Has no effect during normal operation. */ pageSlotRalloc(loadedPage, rid); @@ -43,10 +43,25 @@ Operation getAlloc() { return o; } + recordid Talloc(int xid, size_t size) { recordid rid; - rid = ralloc(xid, size); + /** + + @todo we pass lsn -1 into ralloc here. This is a kludge, since we + need to log ralloc's return value, but can't get that return value + until after its executed. When it comes time to perform recovery, + it is possible that this record will be leaked during the undo + phase. We could do a two phase allocation to prevent the leak, but + then we'd need to lock the page that we're allocating a record in, + and that's a pain. Plus, this is a small leak. (There is a similar + problem involving blob allocation, which is more serious, as it may + result in double allocation...) + + */ + + rid = ralloc(xid, -1, size); Tupdate(xid,rid, NULL, OPERATION_ALLOC); diff --git a/src/lladd/operations/decrement.c b/src/lladd/operations/decrement.c index fa4ea33..02ff244 100644 --- a/src/lladd/operations/decrement.c +++ b/src/lladd/operations/decrement.c @@ -48,12 +48,12 @@ terms specified in this license. #include #include -static int operate(int xid, recordid r, const void *d) { +static int operate(int xid, lsn_t lsn, recordid r, const void *d) { int i; readRecord(xid, r, &i); i--; - writeRecord(xid, r, &i); + writeRecord(xid, lsn, r, &i); return 0; } diff --git a/src/lladd/operations/increment.c b/src/lladd/operations/increment.c index cfb54da..a08210c 100644 --- a/src/lladd/operations/increment.c +++ b/src/lladd/operations/increment.c @@ -47,12 +47,12 @@ terms specified in this license. #include #include -static int operate(int xid, recordid r, const void *d) { +static int operate(int xid, lsn_t lsn, recordid r, const void *d) { int i; readRecord(xid, r, &i); i++; - writeRecord(xid, r, &i); + writeRecord(xid, lsn, r, &i); return 0; } diff --git a/src/lladd/operations/set.c b/src/lladd/operations/set.c index d79a69c..e388ac5 100644 --- a/src/lladd/operations/set.c +++ b/src/lladd/operations/set.c @@ -47,8 +47,8 @@ terms specified in this license. #include #include -static int operate(int xid, recordid rid, const void *dat) { - writeRecord(xid, rid, dat); +static int operate(int xid, lsn_t lsn, recordid rid, const void *dat) { + writeRecord(xid, lsn, rid, dat); return 0; } diff --git a/src/lladd/page.c b/src/lladd/page.c index 7f0e799..ae7faa1 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -247,22 +247,22 @@ static int touchBlob(int xid, recordid rid) { return 0; } -static void rmTouch(int xid) { +/*static void rmTouch(int xid) { touchedBlob_t *t = &touched[xid%touchedLen]; if( t ) { free( t->records ); t->records = NULL; - /* touched[xid%touchedLen].xid = -1; TODO: necessary? */ + / * touched[xid%touchedLen].xid = -1; TODO: necessary? * / } -} +}*/ void pageCommit(int xid) { - rmTouch(xid); + /* rmTouch(xid); */ } void pageAbort(int xid) { - rmTouch(xid); + /* rmTouch(xid); */ } /*#define getFirstHalfOfWord(memAddr) (((*(int*)memAddr) >> (2*BITS_PER_BYTE)) & MASK_0000FFFF) */ @@ -548,12 +548,14 @@ int isBlobSlot(byte *pageMemAddr, int slot) { TODO: BufferManager should pass in file descriptors so that this function doesn't have to open and close the file on each call. + @todo This needs to trust the rid, which is fine, but it could do more to check if the page agrees with the rid... + */ void pageReadRecord(int xid, Page page, recordid rid, byte *buff) { byte *recAddress = page.memAddr + getSlotOffset(page.memAddr, rid.slot); /*look at record, if slot offset == blob_slot, then its a blob, else its normal. */ - if(isBlobSlot(page.memAddr, rid.slot)) { + /* if(isBlobSlot(page.memAddr, rid.slot)) { int fd = -1; int version; int offset; @@ -565,33 +567,34 @@ void pageReadRecord(int xid, Page page, recordid rid, byte *buff) { fd = version == 0 ? blobfd0 : blobfd1; - /* if( (fd = open(version == 0 ? BLOB0_FILE : BLOB1_FILE, O_RDWR, 0)) == -1 ) { + / * if( (fd = open(version == 0 ? BLOB0_FILE : BLOB1_FILE, O_RDWR, 0)) == -1 ) { printf("page.c:pageReadRecord error and exiting\n"); exit(-1); - } */ + } * / lseek(fd, offset, SEEK_SET); read(fd, buff, length); - /* close(fd); */ - } else { - int size = getSecondHalfOfWord((int*)slotMemAddr(page.memAddr, rid.slot)); + / * close(fd); * / + } else { */ + /* For now, we need page.c to trust the rid. */ + /* int size = getSecondHalfOfWord((int*)slotMemAddr(page.memAddr, rid.slot)); */ - memcpy(buff, recAddress, size); - } + memcpy(buff, recAddress, rid.size); + /*}*/ } void pageWriteRecord(int xid, Page page, recordid rid, const byte *data) { byte *rec; - int version = -1; + /* int version = -1; int fd = -1; int blobRec[3]; if (isBlobSlot(page.memAddr, rid.slot)) { - /* TODO: Rusty's wild guess as to what's supposed to happen. Touch blob appears to lookup the blob, + / * TODO: Rusty's wild guess as to what's supposed to happen. Touch blob appears to lookup the blob, and allocate it if its not there. It returns 0 if it's a new blob, 1 otherwise, so I think we - just ignore its return value...*/ + just ignore its return value...* / if( !touchBlob(xid, rid) ) { - /* record hasn't been touched yet */ + / * record hasn't been touched yet * / rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot); version = *(int *)rec; blobRec[0] = version == 0 ? 1 : 0; @@ -612,40 +615,42 @@ void pageWriteRecord(int xid, Page page, recordid rid, const byte *data) { perror("write"); } - /* Flush kernel buffers to hard drive. TODO: the + / * Flush kernel buffers to hard drive. TODO: the (standard) fdatasync() call only flushes the data instead of the data + metadata. Need to have makefile figure out if it's available, and do some macro magic in order to use it, if possible... This is no longer called here, since it is called at commit. - */ - /* fsync(fd); */ - } else { /* write a record that is not a blob */ + * / + / * fsync(fd); * / + } else { / * write a record that is not a blob */ + + assert(rid.size < PAGE_SIZE); rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot); if(memcpy(rec, data, rid.size) == NULL ) { printf("ERROR: MEM_WRITE_ERROR on %s line %d", __FILE__, __LINE__); exit(MEM_WRITE_ERROR); } - } + /*}*/ } /* Currently not called any where, or tested. */ -byte * pageMMapRecord(int xid, Page page, recordid rid) { +/*byte * pageMMapRecord(int xid, Page page, recordid rid) { byte *rec; int version = -1; int fd = -1; int blobRec[3]; byte * ret; if (isBlobSlot(page.memAddr, rid.slot)) { - /* TODO: Rusty's wild guess as to what's supposed to happen. Touch blob appears to lookup the blob, + / * TODO: Rusty's wild guess as to what's supposed to happen. Touch blob appears to lookup the blob, and allocate it if its not there. It returns 0 if it's a new blob, 1 otherwise, so I think we - just ignore its return value...*/ + just ignore its return value...* / if( !touchBlob(xid, rid) ) { - /* record hasn't been touched yet */ + / * record hasn't been touched yet * / rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot); version = *(int *)rec; blobRec[0] = version == 0 ? 1 : 0; @@ -663,32 +668,32 @@ byte * pageMMapRecord(int xid, Page page, recordid rid) { perror("pageMMapRecord"); } - /* if(-1 == lseek(fd, *(int *)(rec +4), SEEK_SET)) { + / * if(-1 == lseek(fd, *(int *)(rec +4), SEEK_SET)) { perror("lseek"); } if(-1 == write(fd, data, *(int *)(rec +8))) { perror("write"); - } */ + } * / - /* Flush kernel buffers to hard drive. TODO: the + / * Flush kernel buffers to hard drive. TODO: the (standard) fdatasync() call only flushes the data instead of the data + metadata. Need to have makefile figure out if it's available, and do some - macro magic in order to use it, if possible...*/ - /* fsync(fd); */ + macro magic in order to use it, if possible...* / + / * fsync(fd); * / - } else { /* write a record that is not a blob */ + } else { / * write a record that is not a blob * / rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot); ret = rec; - /* if(memcpy(rec, data, rid.size) == NULL ) { + / * if(memcpy(rec, data, rid.size) == NULL ) { printf("ERROR: MEM_WRITE_ERROR on %s line %d", __FILE__, __LINE__); exit(MEM_WRITE_ERROR); - }*/ + }* / } return ret; } - +*/ void pageRealloc(Page *p, int id) { p->id = id; p->LSN = 0; diff --git a/src/lladd/recovery2.c b/src/lladd/recovery2.c index 4a962e5..28dfd57 100644 --- a/src/lladd/recovery2.c +++ b/src/lladd/recovery2.c @@ -204,18 +204,32 @@ static void Undo(int recovery) { /* printf("."); fflush(NULL); */ switch(e->type) { case UPDATELOG: + + /* Sanity check. If this fails, we've already undone this - update, or something is wrong with the redo phase. */ - this_lsn= readLSN(e->contents.update.rid.page); + update, or something is wrong with the redo phase or normal operation. */ + this_lsn= readLSN(e->contents.update.rid.page); + + /* printf("1"); fflush(NULL); */ - assert(e->LSN <= this_lsn); + + /* This check was incorrect. Since blobManager may lazily + update the pageManager, it is possible for this test to + fail. In that case, the undo is handled by blob manager + (it removes this page from it's dirty blob list), not + us. (Nope, it is now, again correct--fixed blobManager)*/ + + assert(e->LSN <= this_lsn); /* printf("1a"); fflush(NULL); */ /* Need to log a clr here. */ clr_lsn = LogCLR(e); - writeLSN(clr_lsn, e->contents.update.rid.page); - undoUpdate(e); + /* writeLSN(clr_lsn, e->contents.update.rid.page); */ + + /* Undo update is a no-op if the page does not reflect this + update, but it will write the new clr_lsn. */ + undoUpdate(e, clr_lsn); /* printf("1b"); fflush(NULL); */ break; case CLRLOG: @@ -227,6 +241,9 @@ static void Undo(int recovery) { /* Don't use xalloc anymore. */ /*assert(0);*/ break; + case XABORT: + /* Since XABORT is a no-op, we can safely ignore it. (XABORT records may be passed in by undoTrans.)*/ + break; default: printf ("Unknown log type to undo (TYPE=%d, XID= %d, LSN=%ld), skipping...\n", e->type, e->xid, e->LSN); break; diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 4aa6250..0506d4a 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -68,7 +68,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) { /* printf("e->LSN: %ld\n", e->LSN); */ - writeLSN(e->LSN, rid.page); + /* writeLSN(e->LSN, rid.page); <-- Handled by doUpdate now */ doUpdate(e); } @@ -78,9 +78,10 @@ void Tread(int xid, recordid rid, void * dat) { } int Tcommit(int xid) { + lsn_t lsn; assert(numActiveXactions <= MAX_TRANSACTIONS); - LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]); - bufTransCommit(xid); /* unlocks pages */ + lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]); + bufTransCommit(xid, lsn); /* unlocks pages */ XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID; numActiveXactions--; assert( numActiveXactions >= 0 ); @@ -88,10 +89,13 @@ int Tcommit(int xid) { } int Tabort(int xid) { + lsn_t lsn; + lsn = LogTransAbort(&XactionTable[xid%MAX_TRANSACTIONS]); /* should call undoTrans after log trans abort. undoTrans will cause pages to contain CLR values corresponding to */ + + /* @todo is the order of the next two calls important? */ undoTrans(XactionTable[xid%MAX_TRANSACTIONS]); - LogTransAbort(&XactionTable[xid%MAX_TRANSACTIONS]); - bufTransAbort(xid); + bufTransAbort(xid, lsn); XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID; numActiveXactions--; assert( numActiveXactions >= 0 ); diff --git a/test/lladd/Makefile.am b/test/lladd/Makefile.am index fb085ff..e908532 100644 --- a/test/lladd/Makefile.am +++ b/test/lladd/Makefile.am @@ -1,11 +1,11 @@ INCLUDES = @CHECK_CFLAGS@ if HAVE_CHECK ## Had to disable check_lht because lht needs to be rewritten. -TESTS = check_logEntry check_logWriter check_operations check_transactional2 check_recovery +TESTS = check_logEntry check_logWriter check_operations check_transactional2 check_recovery check_blobRecovery else TESTS = endif noinst_PROGRAMS = $(TESTS) -LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a -CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt -AM_CFLAGS= -g -Wall -pedantic -std=gnu99 \ No newline at end of file +LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a -lefence +CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log +AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/test/lladd/check_blobRecovery.c b/test/lladd/check_blobRecovery.c new file mode 100644 index 0000000..72c54bf --- /dev/null +++ b/test/lladd/check_blobRecovery.c @@ -0,0 +1,575 @@ +/*--- +This software is copyrighted by the Regents of the University of +California, and other parties. The following terms apply to all files +associated with the software unless explicitly disclaimed in +individual files. + +The authors hereby grant permission to use, copy, modify, distribute, +and license this software and its documentation for any purpose, +provided that existing copyright notices are retained in all copies +and that this notice is included verbatim in any distributions. No +written agreement, license, or royalty fee is required for any of the +authorized uses. Modifications to this software may be copyrighted by +their authors and need not follow the licensing terms described here, +provided that the new terms are clearly indicated on the first page of +each file where they apply. + +IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY +FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES +ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY +DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, +INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND +NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND +THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE +MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. + +GOVERNMENT USE: If you are acquiring this software on behalf of the +U.S. government, the Government shall have only "Restricted Rights" in +the software and related documentation as defined in the Federal +Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are +acquiring the software on behalf of the Department of Defense, the +software shall be classified as "Commercial Computer Software" and the +Government shall have only "Restricted Rights" as defined in Clause +252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the +authors grant the U.S. Government and others acting in its behalf +permission to use and distribute the software in accordance with the +terms specified in this license. +---*/ +#include +#include +/*#include */ + +#include +#include +#include "../check_includes.h" +#include + +#define LOG_NAME "check_blobRecovery.log" + + +#define ARRAY_SIZE 20321 + + +static void arraySet(int * a, int mul) { + int i; + + for ( i = 0 ; i < ARRAY_SIZE; i++) { + a[i]= mul*i; + } +} + + +/** + @test + Simple test: Insert some stuff. Commit. Call Tdeinit(). Call + Tinit() (Which initiates recovery), and see if the stuff we + inserted is still there. + + Only performs idempotent operations (Tset). +*/ +START_TEST (recoverBlob__idempotent) { + int xid; + + + int j[ARRAY_SIZE]; + int k[ARRAY_SIZE]; + + recordid rid; + Tinit(); + xid = Tbegin(); + + rid = Talloc(xid, ARRAY_SIZE * sizeof(int)); + arraySet(j, 1); + + Tset(xid, rid, j); + + Tread(xid, rid, k); + fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "Get/Set broken?"); + + arraySet(k, 12312); + + Tcommit(xid); + + xid = Tbegin(); + + Tread(xid, rid, k); + + fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "commit broken"); + + Tcommit(xid); + + Tdeinit(); + + Tinit(); /* Runs recoverBlob_.. */ + + arraySet(k, 12312); + + xid = Tbegin(); + + Tread(xid, rid, k); + + fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "Recovery messed something up!"); + + Tcommit(xid); + + Tdeinit(); + +} +END_TEST + +/** + @test + Simple test: Alloc a record, commit. Call Tincrement on it, and + remember its value and commit. Then, call Tdeinit() and Tinit() + (Which initiates recovery), and see if the value changes. + + @todo: Until we have a non-idempotent operation on blobs, this test can't be written. +*/ +START_TEST (recoverBlob__exactlyOnce) { + + int xid; + int j; + int k; + recordid rid; + /* if(1) { + return; + } */ + fail_unless(0, "Need to write this test..."); + + Tinit(); + xid = Tbegin(); + + rid = Talloc(xid, sizeof(int)); + + Tincrement(xid, rid); + + Tread(xid, rid, &j); + + Tcommit(xid); + + xid = Tbegin(); + + Tread(xid, rid, &k); + + fail_unless(j == k, "Get/Set broken?"); + + Tcommit(xid); + + Tdeinit(); + + Tinit(); /* Runs recovery.. */ + + k = 12312; + + xid = Tbegin(); + + Tread(xid, rid, &k); + + fail_unless(j == k, "Recovery messed something up!"); + + Tcommit(xid); + + Tdeinit(); + + + +} +END_TEST + + +/** + @test + Makes sure that aborted idempotent operations are correctly undone. +*/ +START_TEST (recoverBlob__idempotentAbort) { + + int xid; + int j[ARRAY_SIZE]; + int k[ARRAY_SIZE]; + recordid rid; + + Tinit(); + xid = Tbegin(); + + rid = Talloc(xid, ARRAY_SIZE * sizeof(int)); + + arraySet(j, 1); + + Tset(xid, rid, j); + + arraySet(k, 2); + Tread(xid, rid, k); + + fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "Get/set broken?!"); + + Tcommit(xid); + + xid = Tbegin(); + arraySet(k, 3); + Tread(xid, rid, k); + + fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "commit broken?"); + + Tcommit(xid); + xid = Tbegin(); + arraySet(k, 2); + + Tset(xid, rid, k); + + arraySet(k, 4); + Tread(xid, rid, k); + + arraySet(j, 2); + + fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)),NULL); + + Tabort(xid); + + xid = Tbegin(); + arraySet(j, 1); + arraySet(k, 4); + + Tread(xid, rid, &k); + + Tabort(xid); + + fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)),"Didn't abort!"); + + Tdeinit(); + + Tinit(); /* Runs recovery.. */ + + arraySet(k, 12312); + + xid = Tbegin(); + + Tread(xid, rid, k); + + fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)),"recovery messed something up."); + + Tcommit(xid); + + Tdeinit(); + +} +END_TEST + + +/** + @test Makes sure that aborted non-idempotent operations are + correctly undone. Curently, we don't support such operations on + blobs, so this test is not implemented. + + @todo logical operations on blobs. +*/ +START_TEST (recoverBlob__exactlyOnceAbort) { + + int xid; + int j; + int k; + recordid rid; + /* if(1) + return ; + */ + fail_unless(0, "Need to write this test..."); + + Tinit(); + xid = Tbegin(); + + rid = Talloc(xid, sizeof(int)); + j = 1; + Tincrement(xid, rid); + + Tread(xid, rid, &j); + + Tcommit(xid); + + xid = Tbegin(); + + Tincrement(xid, rid); + Tread(xid, rid, &k); + fail_unless(j == k-1, NULL); + Tabort(xid); + xid = Tbegin(); + Tread(xid, rid, &k); + fail_unless(j == k, "didn't abort?"); + Tcommit(xid); + + Tdeinit(); + Tinit(); + + xid = Tbegin(); + + Tread(xid, rid, &k); + fail_unless(j == k, "Recovery didn't abort correctly"); + Tcommit(xid); + Tdeinit(); + +} +END_TEST + +/** + @test + Check the CLR mechanism with an aborted logical operation, and multipl Tinit()/Tdeinit() cycles. + + @todo Devise a way of implementing this for blobs. +*/ +START_TEST(recoverBlob__clr) { + recordid rid; + int xid; + int j; + int k; + + /* if(1) return; */ + + fail_unless(0, "Need to write this test..."); + + DEBUG("\n\nStart CLR test\n\n"); + + Tinit(); + + xid = Tbegin(); + + rid = Talloc(xid, sizeof(int)); + + Tread(xid, rid, &j); + + Tincrement(xid, rid); + + Tabort(xid); + + xid = Tbegin(); + + Tread(xid, rid, &k); + + Tcommit(xid); + + fail_unless(j == k, NULL); + + Tdeinit(); + + + Tinit(); + Tdeinit(); + + Tinit(); + + xid = Tbegin(); + + Tread(xid, rid, &k); + + Tcommit(xid); + + fail_unless(j == k, NULL); + + Tdeinit(); + Tinit(); + + xid = Tbegin(); + + Tread(xid, rid, &k); + + Tcommit(xid); + + fail_unless(j == k, NULL); + + Tdeinit(); + + +} END_TEST + +void simulateBufferManagerCrash(); +extern int numActiveXactions; +/** + @test + + Tests the undo phase of recovery by simulating a crash, and calling Tinit(). + + @todo Really should check logical operations, if they are ever supported for blobs. + +*/ +START_TEST(recoverBlob__crash) { + int xid; + recordid rid; + int j[ARRAY_SIZE]; + int k[ARRAY_SIZE]; + + Tinit(); + + xid = Tbegin(); + rid = Talloc(xid, sizeof(int)* ARRAY_SIZE); + + arraySet(j, 3); + + Tset(xid, rid, &j); + + arraySet(j, 9); + + Tset(xid, rid, &j); + + + /* RID = 9. */ + + Tread(xid, rid, &j); + + arraySet(k, 9); + fail_unless(!memcmp(j,k,ARRAY_SIZE), "set not working?"); + + + Tcommit(xid); + xid = Tbegin(); + + arraySet(k, 6); + + Tset(xid, rid, &k); + + /* RID = 6. */ + + Tread(xid, rid, &j); + fail_unless(!memcmp(j,k,ARRAY_SIZE), NULL); + + simulateBufferManagerCrash(); + closeLogWriter(); + numActiveXactions = 0; + + Tinit(); + + Tread(xid, rid, &j); + + arraySet(k, 9); + + fail_unless(!memcmp(j,k,ARRAY_SIZE), "Recovery didn't roll back in-progress xact!"); + + Tdeinit(); + Tinit(); + + Tread(xid, rid, &j); + + fail_unless(!memcmp(j,k,ARRAY_SIZE), "Recovery failed on second re-open."); + + Tdeinit(); + +} END_TEST +/** + @test Tests recovery when more than one transaction is in progress + at the time of the crash. This test is interesting because blob + operations from multiple transactions could hit the same page. + + @todo implement this sometime... +*/ +START_TEST (recoverBlob__multiple_xacts) { + int xid1, xid2, xid3, xid4; + recordid rid1, rid2, rid3, rid4; + int j1, j2, j3, j4, k; + + Tinit(); + j1 = 1; + j2 = 2; + j3 = 4; + j4 = 3; + xid1 = Tbegin(); + rid1 = Talloc(xid1, sizeof(int)); + + xid2 = Tbegin(); + + xid3 = Tbegin(); + + Tset(xid1, rid1, &j1); + + rid2 = Talloc(xid2, sizeof(int)); + rid3 = Talloc(xid3, sizeof(int)); + Tread(xid3, rid3, &k); + + Tset(xid3, rid3, &j3); + + Tcommit(xid3); + xid3 = Tbegin(); + + Tincrement(xid3, rid3); + Tset(xid2, rid2, &j2); + Tcommit(xid1); + + + xid4 = Tbegin(); + Tcommit(xid2); + + rid4 = Talloc(xid4, sizeof(int)); + Tset(xid4, rid4, &j4); + Tincrement(xid4, rid4); + Tcommit(xid4); + + xid1 = Tbegin(); + k = 100000; + Tset(xid1, rid1,&k); + xid2 = Tbegin(); + Tdecrement(xid2, rid2); + + Tdecrement(xid2, rid2); + Tdecrement(xid2, rid2); + Tdecrement(xid2, rid2); + Tdecrement(xid2, rid2); + Tincrement(xid1, rid1); + Tset(xid1, rid1,&k); + + /*simulate crash */ + + simulateBufferManagerCrash(); + closeLogWriter(); + numActiveXactions = 0; + Tinit(); + Tdeinit(); + Tinit(); + xid1 = Tbegin(); + xid2 = Tbegin(); + xid3 = Tbegin(); + xid4 = Tbegin(); + + Tread(xid1, rid1, &j1); + Tread(xid2, rid2, &j2); + Tread(xid3, rid3, &j3); + Tread(xid4, rid4, &j4); + + fail_unless(j1 == 1, NULL); + fail_unless(j2 == 2, NULL); + fail_unless(j3 == 4, NULL); + fail_unless(j4 == 4, NULL); + Tdeinit(); +} END_TEST + + +/** + Add suite declarations here +*/ +Suite * check_suite(void) { + Suite *s = suite_create("recovery_suite"); + /* Begin a new test */ + TCase *tc = tcase_create("recovery"); + void * foobar; /* used to supress warnings. */ + /* Sub tests are added, one per line, here */ + tcase_add_test(tc, recoverBlob__idempotent); + /* tcase_add_test(tc, recoverBlob__exactlyOnce); */ + foobar = (void*)&recoverBlob__exactlyOnce; + + tcase_add_test(tc, recoverBlob__idempotentAbort); + /* tcase_add_test(tc, recoverBlob__exactlyOnceAbort); */ + foobar = (void*)&recoverBlob__exactlyOnceAbort; + + /* tcase_add_test(tc, recoverBlob__clr); */ + foobar = (void*)&recoverBlob__clr; + + tcase_add_test(tc, recoverBlob__crash); + /* tcase_add_test(tc, recoverBlob__multiple_xacts); */ + foobar = (void*)&recoverBlob__multiple_xacts; + + /* --------------------------------------------- */ + tcase_add_checked_fixture(tc, setup, teardown); + suite_add_tcase(s, tc); + + return s; +} + +#include "../check_setup.h" diff --git a/test/lladd/check_operations.c b/test/lladd/check_operations.c index 7de71ea..023e435 100644 --- a/test/lladd/check_operations.c +++ b/test/lladd/check_operations.c @@ -52,49 +52,64 @@ terms specified in this license. Assuming that the Tset() operation is implemented correctly, checks that doUpdate, redoUpdate and undoUpdate are working correctly, for operations that use physical logging. + + @todo Now that writes + lsn updates are atomic, this test case probably breaks. */ START_TEST(operation_physical_do_undo) { int xid = 1; recordid rid; - lsn_t lsn = 0; + lsn_t lsn = 2; int buf; int arg; LogEntry * setToTwo; Tinit(); - rid = ralloc(xid, sizeof(int)); + rid = ralloc(xid, 1, sizeof(int)); buf = 1; arg = 2; + + DEBUG("A\n"); setToTwo = allocUpdateLogEntry(-1, xid, OPERATION_SET, rid, (void*)&arg, sizeof(int), (void*)&buf); /* Do, undo and redo operation without updating the LSN field of the page. */ - writeLSN(lsn, rid.page); - writeRecord(xid, rid, &buf); + /* writeLSN(lsn, rid.page); */ + DEBUG("B\n"); + writeRecord(xid, lsn, rid, &buf); - setToTwo->LSN = 1; + setToTwo->LSN = 10; - doUpdate(setToTwo); + DEBUG("C\n"); + doUpdate(setToTwo); /* PAGE LSN= 10, value = 2. */ readRecord(xid, rid, &buf); fail_unless(buf == 2, NULL); - undoUpdate(setToTwo); /* Should fail, LSN wasn't updated. */ + + + DEBUG("D\n"); + + fail_unless(10 == readLSN(rid.page), "page lsn not set correctly."); + + setToTwo->LSN = 5; + + undoUpdate(setToTwo, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */ readRecord(xid, rid, &buf); - fail_unless(buf == 2, NULL); + fail_unless(buf == 1, NULL); + DEBUG("E\n"); redoUpdate(setToTwo); readRecord(xid, rid, &buf); - fail_unless(buf == 2, NULL); + fail_unless(buf == 1, NULL); - writeLSN(3,rid.page); + /* writeLSN(3,rid.page); */ /* Now, simulate scenarios from normal operation: do the operation, and update the LSN, (update happens) @@ -109,32 +124,50 @@ START_TEST(operation_physical_do_undo) { lsn = 0; buf = 1; - writeLSN(lsn, rid.page); - writeRecord(xid, rid, &buf); + /* writeLSN(lsn, rid.page); */ + writeRecord(xid, lsn, rid, &buf); - setToTwo->LSN = 1; + /* Trace of test: + + PAGE LSN LOG LSN CLR LSN TYPE SUCCEED? + + 2 10 - do write YES (C) + 10 5 8 undo write YES (D) + 8 5 - redo write NO (E) + 8 10 - redo write YES (F) + ....... and so on. + */ + + + setToTwo->LSN = 10; - doUpdate(setToTwo); - writeLSN(setToTwo->LSN, rid.page); + DEBUG("F\n"); + redoUpdate(setToTwo); + /* writeLSN(setToTwo->LSN, rid.page); */ readRecord(xid, rid, &buf); fail_unless(buf == 2, NULL); - undoUpdate(setToTwo); /* Succeeds */ + DEBUG("G undo set to 2\n"); + undoUpdate(setToTwo, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/ readRecord(xid, rid, &buf); fail_unless(buf == 1, NULL); + DEBUG("H don't redo set to 2\n"); redoUpdate(setToTwo); /* Fails */ readRecord(xid, rid, &buf); fail_unless(buf == 1, NULL); - writeLSN(0,rid.page); + writeRecord(xid, 0, rid, &buf); /* reset the page's LSN. */ + /* writeLSN(0,rid.page); */ + + DEBUG("I redo set to 2\n"); redoUpdate(setToTwo); /* Succeeds */ readRecord(xid, rid, &buf); diff --git a/test/lladd/check_transactional2.c b/test/lladd/check_transactional2.c index c662975..3825dc8 100644 --- a/test/lladd/check_transactional2.c +++ b/test/lladd/check_transactional2.c @@ -43,7 +43,7 @@ terms specified in this license. #include #include - +#include "../check_includes.h" #define LOG_NAME "check_transactional2.log" /** Assuming that the Tset() operation is implemented correctly, checks @@ -86,6 +86,62 @@ START_TEST(transactional_smokeTest) { } END_TEST +/** + Just like transactional_smokeTest, but check blobs instead. +*/ +START_TEST(transactional_blobSmokeTest) { + +#define ARRAY_SIZE 10000 + int xid; + recordid rid; + int foo[ARRAY_SIZE]; + int bar[ARRAY_SIZE]; + int i; + for(i = 0; i < ARRAY_SIZE; i++) { + foo[i] = i; + bar[i] = 2 * i; + } + + Tinit(); + + xid = Tbegin(); + + rid = Talloc(xid, ARRAY_SIZE * sizeof(int)); + + fail_unless(rid.size == ARRAY_SIZE * sizeof(int), NULL); + + printf("TSet starting.\n"); fflush(NULL); + Tset(xid, rid, &foo); + printf("TSet returning.\n"); fflush(NULL); + + Tread(xid, rid, &bar); + + for(i = 0 ; i < ARRAY_SIZE; i++) { + assert(bar[i] == foo[i]); + fail_unless(bar[i] == foo[i], NULL); + } + + Tcommit(xid); + + xid = Tbegin(); + + for(i = 0; i < ARRAY_SIZE; i++) { + bar[i] = 2 * i; + } + + Tread(xid, rid, &bar); + + + for(i = 0 ; i < ARRAY_SIZE; i++) { + fail_unless(bar[i] == foo[i], NULL); + } + + Tabort(xid); + + Tdeinit(); +} +END_TEST + /** Add suite declarations here */ @@ -96,7 +152,9 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ tcase_add_test(tc, transactional_smokeTest); + tcase_add_test(tc, transactional_blobSmokeTest); /* --------------------------------------------- */ + tcase_add_checked_fixture(tc, setup, teardown); suite_add_tcase(s, tc);