Bugfixes ; blobs pass regression. Next stop: Delete old cruft.

This commit is contained in:
Sears Russell 2004-06-28 21:10:10 +00:00
parent 7e2f1fc5b2
commit fab0e6cbbd
22 changed files with 1124 additions and 415 deletions

188
config.h
View file

@ -1,188 +0,0 @@
/* config.h. Generated by configure. */
/* config.h.in. Generated from configure.in by autoheader. */
/* Define to 1 if you have the <arpa/inet.h> header file. */
#define HAVE_ARPA_INET_H 1
/* Define to 1 if you have the `bzero' function. */
#define HAVE_BZERO 1
/* Define to 1 if you have the <dirent.h> header file, and it defines `DIR'.
*/
#define HAVE_DIRENT_H 1
/* Define to 1 if you have the <dlfcn.h> header file. */
#define HAVE_DLFCN_H 1
/* Define to 1 if you have the <fcntl.h> header file. */
#define HAVE_FCNTL_H 1
/* Define to 1 if you have the `fdatasync' function. */
#define HAVE_FDATASYNC 1
/* Define to 1 if you have the `getcwd' function. */
#define HAVE_GETCWD 1
/* Define to 1 if you have the `getpagesize' function. */
#define HAVE_GETPAGESIZE 1
/* Define to 1 if you have the `gettimeofday' function. */
#define HAVE_GETTIMEOFDAY 1
/* Define to 1 if you have the `inet_ntoa' function. */
#define HAVE_INET_NTOA 1
/* Define to 1 if you have the <inttypes.h> header file. */
#define HAVE_INTTYPES_H 1
/* Define to 1 if you have the `pthread' library (-lpthread). */
#define HAVE_LIBPTHREAD 1
/* Define to 1 if you have the <limits.h> header file. */
#define HAVE_LIMITS_H 1
/* Define to 1 if your system has a GNU libc compatible `malloc' function, and
to 0 otherwise. */
#define HAVE_MALLOC 1
/* Define to 1 if you have the <malloc.h> header file. */
#define HAVE_MALLOC_H 1
/* Define to 1 if you have the `memmove' function. */
#define HAVE_MEMMOVE 1
/* Define to 1 if you have the <memory.h> header file. */
#define HAVE_MEMORY_H 1
/* Define to 1 if you have the `memset' function. */
#define HAVE_MEMSET 1
/* Define to 1 if you have the `mkdir' function. */
#define HAVE_MKDIR 1
/* Define to 1 if you have a working `mmap' system call. */
#define HAVE_MMAP 1
/* Define to 1 if you have the `munmap' function. */
#define HAVE_MUNMAP 1
/* Define to 1 if you have the <ndir.h> header file, and it defines `DIR'. */
/* #undef HAVE_NDIR_H */
/* Define to 1 if you have the <netdb.h> header file. */
#define HAVE_NETDB_H 1
/* Define to 1 if you have the <netinet/in.h> header file. */
#define HAVE_NETINET_IN_H 1
/* Define to 1 if you have the `socket' function. */
#define HAVE_SOCKET 1
/* Define to 1 if `stat' has the bug that it succeeds when given the
zero-length file name argument. */
/* #undef HAVE_STAT_EMPTY_STRING_BUG */
/* Define to 1 if you have the <stddef.h> header file. */
#define HAVE_STDDEF_H 1
/* Define to 1 if you have the <stdint.h> header file. */
#define HAVE_STDINT_H 1
/* Define to 1 if you have the <stdlib.h> header file. */
#define HAVE_STDLIB_H 1
/* Define to 1 if you have the `strchr' function. */
#define HAVE_STRCHR 1
/* Define to 1 if you have the `strdup' function. */
#define HAVE_STRDUP 1
/* Define to 1 if you have the `strerror' function. */
#define HAVE_STRERROR 1
/* Define to 1 if you have the <strings.h> header file. */
#define HAVE_STRINGS_H 1
/* Define to 1 if you have the <string.h> header file. */
#define HAVE_STRING_H 1
/* Define to 1 if you have the `strrchr' function. */
#define HAVE_STRRCHR 1
/* Define to 1 if you have the `strstr' function. */
#define HAVE_STRSTR 1
/* Define to 1 if you have the `strtoul' function. */
#define HAVE_STRTOUL 1
/* Define to 1 if you have the <syslog.h> header file. */
#define HAVE_SYSLOG_H 1
/* Define to 1 if you have the <sys/dir.h> header file, and it defines `DIR'.
*/
/* #undef HAVE_SYS_DIR_H */
/* Define to 1 if you have the <sys/ndir.h> header file, and it defines `DIR'.
*/
/* #undef HAVE_SYS_NDIR_H */
/* Define to 1 if you have the <sys/socket.h> header file. */
#define HAVE_SYS_SOCKET_H 1
/* Define to 1 if you have the <sys/stat.h> header file. */
#define HAVE_SYS_STAT_H 1
/* Define to 1 if you have the <sys/time.h> header file. */
#define HAVE_SYS_TIME_H 1
/* Define to 1 if you have the <sys/types.h> header file. */
#define HAVE_SYS_TYPES_H 1
/* Define to 1 if you have <sys/wait.h> that is POSIX.1 compatible. */
#define HAVE_SYS_WAIT_H 1
/* Define to 1 if you have the <unistd.h> header file. */
#define HAVE_UNISTD_H 1
/* Define to 1 if `lstat' dereferences a symlink specified with a trailing
slash. */
#define LSTAT_FOLLOWS_SLASHED_SYMLINK 1
/* Name of package */
#define PACKAGE "hello"
/* Define to the address where bug reports for this package should be sent. */
#define PACKAGE_BUGREPORT "sears@cs.berkeley.edu"
/* Define to the full name of this package. */
#define PACKAGE_NAME "PACKAGE"
/* Define to the full name and version of this package. */
#define PACKAGE_STRING "PACKAGE VERSION"
/* Define to the one symbol short name of this package. */
#define PACKAGE_TARNAME "package"
/* Define to the version of this package. */
#define PACKAGE_VERSION "VERSION"
/* Define to 1 if you have the ANSI C header files. */
#define STDC_HEADERS 1
/* Define to 1 if you can safely include both <sys/time.h> and <time.h>. */
#define TIME_WITH_SYS_TIME 1
/* Version number of package */
#define VERSION "0.1"
/* Define to empty if `const' does not conform to ANSI C. */
/* #undef const */
/* Define to rpl_malloc if the replacement function should be used. */
/* #undef malloc */
/* Define to `int' if <sys/types.h> does not define. */
/* #undef pid_t */
/* Define to `unsigned' if <sys/types.h> does not define. */
/* #undef size_t */

View file

@ -27,6 +27,21 @@ AM_PATH_CHECK(,[have_check="yes"],
[have_check="no"])
AM_CONDITIONAL(HAVE_CHECK, test x"$have_check", "xyes")
## alas, it won't link if this is put in here.. instead, it's linked in manually in the test directory...
#AC_ARG_ENABLE(efence,
#[ --enable-efence Use electric fence (www.perens.com)],
#[case "$enableval" in \
# yes) efence=yes ;; \
# no) efence=no;; \
# *) efence=no;; \
#esac], [efence=no])
#
#dnl Check for efence
#if test $efence = yes; then
#AC_CHECK_LIB(efence,memalign)
#fi
# Checks for header files.
AC_HEADER_DIRENT
AC_HEADER_STDC

View file

@ -168,7 +168,7 @@ Page loadPage(int pageid);
* @param size The size of the new record
* @return allocated record
*/
recordid ralloc(int xid, size_t size);
recordid ralloc(int xid, lsn_t lsn, size_t size);
/**
* Find a page with some free space.
@ -177,7 +177,9 @@ recordid ralloc(int xid, size_t size);
/**
* This function updates the LSN of a page. This is needed by the
* This function updates the LSN of a page.
*
* This is needed by the
* recovery process to make sure that each action is undone or redone
* exactly once.
*
@ -195,7 +197,7 @@ recordid ralloc(int xid, size_t size);
* atomically. (write does not have to write all of the data that was
* passed to it...)
*/
void writeLSN(long LSN, int pageid);
/*void writeLSN(long LSN, int pageid); */
/**
* @param pageid ID of page you want to read
@ -208,7 +210,7 @@ long readLSN(int pageid);
* @param rid recordid where you want to write
* @param dat data you wish to write
*/
void writeRecord(int xid, recordid rid, const void *dat);
void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat);
/**
* @param xid transaction ID
@ -245,7 +247,7 @@ int dropPage(Page page);
* @return 0 on success
* @return error code on failure
*/
int bufTransCommit(int xid);
int bufTransCommit(int xid, lsn_t lsn);
/**
*
@ -255,7 +257,7 @@ int bufTransCommit(int xid);
* @return 0 on success
* @return error code on failure
*/
int bufTransAbort(int xid);
int bufTransAbort(int xid, lsn_t lsn);
/**
* will write out any dirty pages, assumes that there are no running

View file

@ -68,7 +68,7 @@ typedef struct {
typedef struct {
unsigned int funcID : 8;
recordid rid;
unsigned int argSize : 16; /*2^16 = 64M*/
unsigned int argSize;
/* int invertible; */ /* no longer needed */
/* Implicit members:
args; @ ((byte*)ule) + sizeof(UpdateLogEntry)

View file

@ -75,16 +75,16 @@ TransactionLog LogTransBegin(int xid);
/**
Write a transaction COMMIT to the log tail, then flush the log tail immediately to disk
@return 0 if the transaction succeeds, an error code otherwise.
@return The lsn of the commit log entry.
*/
void LogTransCommit(TransactionLog * l);
lsn_t LogTransCommit(TransactionLog * l);
/**
Write a transaction ABORT to the log tail
@return 0 if the transaction was successfully aborted.
@return The lsn of the abort log entry.
*/
void LogTransAbort(TransactionLog * l);
lsn_t LogTransAbort(TransactionLog * l);
/**
LogUpdate writes an UPDATE log record to the log tail

View file

@ -68,7 +68,7 @@ BEGIN_C_DECLS
/* @type Function
* function pointer that the operation will run
*/
typedef int (*Function)(int xid, recordid r, const void *d);
typedef int (*Function)(int xid, lsn_t lsn, recordid r, const void *d);
/* @type Operation
@ -153,14 +153,19 @@ extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */
void doUpdate(const LogEntry * e);
/** Undo the update under normal operation, and during recovery.
Assumes that the operation's results are reflected in the contents of the buffer manager.
Checks to see if the operation's results are reflected in the
contents of the buffer manager. If they are, then it performs the
undo.
Does not write to the log.
@todo Currently, undos do not result in CLR entries, but they should. (Should this be done here?)
This function does not generate CLR because this would result in
extra CLRs being generated during recovery.
@param e The log entry containing the operation to be undone.
@param clr_lsn The lsn of the clr that corresponds to this undo operation.
*/
void undoUpdate(const LogEntry * e);
void undoUpdate(const LogEntry * e, lsn_t clr_lsn);
/**
Redoes an operation during recovery. This is different than
doUpdate because it checks to see if the operation needs to be redone

View file

@ -1,23 +1,47 @@
#include "blobManager.h"
#include <lladd/constants.h>
#include <lladd/bufferManager.h>
#include <lladd/page.h>
#include <unistd.h>
#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
/* stdio */
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <assert.h>
#include <stdlib.h>
#include <pbl/pbl.h>
#include <lladd/transactional.h>
#include <lladd/bufferManager.h>
#include <lladd/page.h>
#include <lladd/constants.h>
static FILE * blobf0, * blobf1;
#include "blobManager.h"
static FILE * blobf0 = NULL, * blobf1 = NULL;
/**
This is a hash of hash tables. The outer hash maps from xid to
inner hash. The inner hash maps from rid to lsn.
*/
static pblHashTable_t * dirtyBlobs;
/** Plays a nasty trick on bufferManager to force it to read and write
blob_record_t items for us. Relies upon bufferManager (and
page.c's) trust in the rid.size field... */
static void readRawRecord(int xid, recordid rid, void * buf, int size) {
recordid blob_rec_rid = rid;
blob_rec_rid.size = size;
readRecord(xid, blob_rec_rid, buf);
}
static void writeRawRecord(int xid, lsn_t lsn, recordid rid, const void * buf, int size) {
recordid blob_rec_rid = rid;
blob_rec_rid.size = size;
/* writeRecord(xid, lsn, blob_rec_rid, buf); */
Tset(xid, blob_rec_rid, buf);
}
/* moved verbatim from bufferManger.c */
void openBlobStore() {
int blobfd0, blobfd1;
@ -32,7 +56,10 @@ void openBlobStore() {
}
if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); }
}
if( ! (blobf1 = fopen(BLOB1_FILE, "w+"))) { /* file may not exist */
DEBUG("blobf0 opened.\n");
if( ! (blobf1 = fopen(BLOB1_FILE, "r+"))) { /* file may not exist */
if( (blobfd1 = creat(BLOB1_FILE, 0666)) == -1 ) { /* cannot even create it */
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
perror("Creating blob 1 file"); abort();
@ -41,8 +68,11 @@ void openBlobStore() {
printf("ERROR: %i on %s line %d", errno, __FILE__, __LINE__);
perror(NULL); abort();
}
if(!(blobf1 = fopen(BLOB1_FILE, "w+"))) { perror("Couldn't open or create blob 1 file"); abort(); }
if(!(blobf1 = fopen(BLOB1_FILE, "r+"))) { perror("Couldn't open or create blob 1 file"); abort(); }
}
DEBUG("blobf1 opened.\n");
dirtyBlobs = pblHtCreate();
}
@ -51,10 +81,15 @@ void openBlobStore() {
@todo memory leak: Will leak memory if there are any outstanding
xacts that have written to blobs. Should explicitly abort them
instead of just invalidating the dirtyBlobs hash.
(If the you fix the above @todo, don't forget to fix
bufferManager's simulateBufferManagerCrash.)
*/
void closeBlobStore() {
assert(!fclose(blobf0));
assert(!fclose(blobf1));
int ret = fclose(blobf0);
assert(!ret);
ret = fclose(blobf1);
assert(!ret);
blobf0 = NULL;
blobf1 = NULL;
@ -63,8 +98,8 @@ void closeBlobStore() {
static long myFseek(FILE * f, long offset, int whence) {
long ret;
if(!fseek(blobf1, offset, whence)) { perror (NULL); abort(); }
if(-1 == (ret = ftell(f))) { perror(NULL); abort(); }
if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); }
if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); }
return ret;
}
@ -76,8 +111,12 @@ recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) {
char zero = 0;
/* Allocate space for the blob entry. */
assert(blobSize > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
recordid rid = ralloc(xid, sizeof(blob_record_t));
/* recordid rid = ralloc(xid, lsn, sizeof(blob_record_t)); */
recordid rid = Talloc(xid, sizeof(blob_record_t));
readRecord(xid, rid, &blob_rec);
@ -91,8 +130,8 @@ recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) {
myFseek(blobf0, fileSize + blobSize - 1, SEEK_SET);
myFseek(blobf1, fileSize + blobSize - 1, SEEK_SET);
if(1 != fwrite(&zero, 0, sizeof(char), blobf0)) { perror(NULL); abort(); }
if(1 != fwrite(&zero, 0, sizeof(char), blobf1)) { perror(NULL); abort(); }
if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); }
if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); }
/** Finally, fix up the fields in the record that points to the blob. */
@ -107,12 +146,14 @@ recordid allocBlob(int xid, lsn_t lsn, size_t blobSize) {
/* writeRecord needs to know to 'do the right thing' here, since
we've changed the size it has recorded for this record. */
/* @todo What should writeRawRecord do with the lsn? */
writeRawRecord (rid, lsn, &blob_rec, sizeof(blob_record_t));
writeRawRecord (xid, lsn, rid, &blob_rec, sizeof(blob_record_t));
rid.size = blob_rec.size;
return rid;
}
static lsn_t * tripleHashLookup(int xid, recordid rid) {
static lsn_t * tripleHashLookup(int xid, recordid rid) {
pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(xid));
if(xidHash == NULL) {
return NULL;
@ -149,11 +190,11 @@ static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) {
pblHtInsert(pageXidHash, &rid, sizeof(recordid), copy);
}
/*
static void tripleHashRemove(int xid, recordid rid) {
pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int));
if(xidHash) { /* Else, there was no xid, rid pair. */
if(xidHash) { / * Else, there was no xid, rid pair. * /
pblHashTable_t * pageXidHash = pblHtLookup(xidHash, &(rid.page), sizeof(int));
if(pageXidHash) {
@ -162,11 +203,11 @@ static void tripleHashRemove(int xid, recordid rid) {
pblHtRemove(pageXidHash, &rid, sizeof(recordid));
free(delme);
/* We freed a member of pageXidHash. Is it empty? */
/ * We freed a member of pageXidHash. Is it empty? * /
if(!pblHtFirst(pageXidHash)) {
pblHtRemove(xidHash, &(rid.page), sizeof(int));
/* Is xidHash now empty? */
/ * Is xidHash now empty? * /
if(!pblHtFirst(xidHash)) {
pblHtRemove(dirtyBlobs, &xid, sizeof(int));
free(xidHash);
@ -176,92 +217,127 @@ static void tripleHashRemove(int xid, recordid rid) {
}
}
}
}
}*/
void readBlob(int xid, recordid rid, void * buf) {
/* First, determine if the blob is dirty. */
lsn_t * dirty = tripleHashLookup(xid, rid);
blob_rec_t rec;
/* lsn_t * dirty = tripleHashLookup(xid, rid); */
blob_record_t rec;
/* int readcount; */
FILE * fd;
long offset;
readRawRecord(rid, &rec, sizeof(blob_rec_t));
assert(buf);
if(dirty) {
fd = rec.fd ? blobf0 : blobf1; /* Read the updated version */
readRawRecord(xid, rid, &rec, sizeof(blob_record_t));
/* if(dirty) {
DEBUG("Reading dirty blob.\n");
fd = rec.fd ? blobf0 : blobf1; / * Read the updated version * /
} else {
fd = rec.fd ? blobf1 : blobf0; /* Read the clean version */
}
DEBUG("Reading clean blob.\n");
fd = rec.fd ? blobf1 : blobf0; / * Read the clean version * /
} */
assert(rec.offset == myFseek(fd, rec.offset, SEEK_SET));
assert(1 == fread(buf, rec.size, 1, fd));
fd = rec.fd ? blobf1 : blobf0;
offset = myFseek(fd, (long int) rec.offset, SEEK_SET);
DEBUG("reading blob at offset %d (%ld), size %ld, buffer %x\n", rec.offset, offset, rec.size, (unsigned int) buf);
assert(rec.offset == offset);
if(1 != fread(buf, rec.size, 1, fd)) {
if(feof(fd)) { printf("Unexpected eof!\n"); fflush(NULL); abort(); }
if(ferror(fd)) { printf("Error reading stream! %d", ferror(fd)); fflush(NULL); abort(); }
}
}
static int one = 1;
/** @todo dirtyBlobs should contain the highest LSN that wrote to the
current version of the dirty blob, and the lsn field should be
checked to be sure that it increases monotonically. */
void writeBlob(int xid, recordid rid, lsn_t lsn, void * buf) {
void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) {
/* First, determine if the blob is dirty. */
lsn_t * dirty = tripleHashLookup(xid, rid);
blob_rec_t rec;
blob_record_t rec;
long offset;
FILE * fd;
int readcount;
assert(rid.size == BLOB_SLOT);
/* Tread() raw record */
readRawRecord(xid, rid, &rec, sizeof(blob_record_t));
if(dirty) {
assert(lsn > *dirty);
*dirty = lsn; /* Updates value in dirty blobs (works because of pointer aliasing.) */
DEBUG("Blob already dirty.\n");
} else {
DEBUG("Marking blob dirty.\n");
tripleHashInsert(xid, rid, lsn);
/* Flip the fd bit on the record. */
rec.fd = rec.fd ? 0 : 1;
/* Tset() raw record */
writeRawRecord(xid, lsn, rid, &rec, sizeof(blob_record_t));
}
readRawRecord(rid, &rec, sizeof(blob_record_t));
/*
readRawRecord(xid, rid, &rec, sizeof(blob_record_t));
fd = rec.fd ? blobf0 : blobf1; /* Read the slot for the dirty (updated) version. */
fd = rec.fd ? blobf0 : blobf1; / * Read the slot for the dirty (updated) version. * /
assert(rec.offset == myFseek(fd, rec.offset, SEEK_SET));
assert(1 == fwrite(buf, rec.size, 1, fd));
*/
fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */
offset = myFseek(fd, rec.offset, SEEK_SET);
printf("Writing at offset = %d, size = %ld\n", rec.offset, rec.size);
assert(offset == rec.offset);
readcount = fwrite(buf, rec.size, 1, fd);
assert(1 == readcount);
/* No need to update the raw blob record. */
}
/** @todo check return values */
/*
void commitBlobs(int xid, lsn_t lsn) {
lsn_t * dirty = tripleHashLookup(xid, rid);
/* Because this is a commit, we must update each page atomically.
/ * Because this is a commit, we must update each page atomically.
Therefore, we need to re-group the dirtied blobs by page id, and
then issue one write per page. Since we write flip the bits of each
dirty blob record on the page, we can't get away with incrementally
updating things. */
updating things. * /
pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int));
lsn_t * lsn;
int last_page = -1;
Page p;
pblHashTable_t * this_bucket;
if(!rid_buckets) { return; } / * No blobs for this xid. * /
for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) {
blob_record_t buf;
recordid * rid_ptr;
lsn_t * rid_lsn;
int first = 1;
int page_number;
/* All right, this_bucket contains all of the rids for this page. */
/ * All right, this_bucket contains all of the rids for this page. * /
for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) {
/** @todo INTERFACE VIOLATION Can only use bufferManager's
/ ** @todo INTERFACE VIOLATION Can only use bufferManager's
read/write record since we're single threaded, and this calling
sequence cannot possibly call kick page. Really, we sould use
pageReadRecord / pageWriteRecord, and bufferManager should let
us write out the whole page atomically... */
us write out the whole page atomically... * /
rid_ptr = pblCurrentKey(this_bucket);
rid_ptr = pblHtCurrentKey(this_bucket);
if(first) {
page_number = rid_ptr->page;
@ -270,46 +346,58 @@ void commitBlobs(int xid, lsn_t lsn) {
assert(page_number == rid_ptr->page);
}
readRawRecord(*rid_ptr, &buf, sizeof(blob_record_t));
/* This rid is dirty, so swap the fd pointer. */
/ ** @todo For now, we assume that overlapping transactions (from
the Tbegin() to Tcommit() call) do not access the same
blob. * /
readRawRecord(xid, *rid_ptr, &buf, sizeof(blob_record_t));
/ * This rid is dirty, so swap the fd pointer. * /
buf.fd = (buf.fd ? 0 : 1);
writeRawRecord(*rid_ptr, lsn, &buf, sizeof(blob_record_t));
pblHtRemove(rid_ptr);
free(rid_ptr);
writeRawRecord(xid, lsn, *rid_ptr, &buf, sizeof(blob_record_t));
pblHtRemove(this_bucket, rid_ptr, sizeof(recordid));
/ * free(rid_ptr); * /
free(rid_lsn);
}
if(!first) {
pblHtRemove(rid_buckets, &page_number, sizeof(int));
} else {
abort(); /* Bucket existed, but was empty?!? */
abort(); / * Bucket existed, but was empty?!? * /
}
pblHtDelete(this_bucket);
}
}
*/
void commitBlobs(int xid) {
abortBlobs(xid);
}
/**
Just clean up the dirty list for this xid. @todo Check return values.
/** Easier than commit blobs. Just clean up the dirty list for this xid. @todo Check return values. */
(Functionally equivalent to the old rmTouch() function. Just
deletes this xid's dirty list.)
@todo doesn't take lsn_t, since it doesnt write any blobs. Change the api?
*/
void abortBlobs(int xid) {
pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int));
lsn_t * lsn;
int last_page = -1;
Page p;
pblHashTable_t * this_bucket;
if(!rid_buckets) { return; } /* No dirty blobs for this xid.. */
for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) {
blob_record_t buf;
recordid * rid_ptr;
lsn_t * rid_lsn;
int first = 1;
int page_number;
/* All right, this_bucket contains all of the rids for this page. */
for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) {
recordid * rid = pblHtCurrentKey(this_bucket);
page_number = rid->page;
pblHtRemove(this_bucket, rid, sizeof(recordid));
free(rid_lsn);
page_number = rid->page;
}
pblHtRemove(rid_buckets, &page_number, sizeof(int));

View file

@ -5,10 +5,35 @@
#include <lladd/page.h>
BEGIN_C_DECLS
/** blobManager - Provides blob handling @todo Set range??
/**
@file
blobManager - Provides blob handling @todo Set range??
Plan for modularity: Exactly one blob manager per storeFile.
Blob manager interacts with page manger via page manager's
public api.
Blob updates work as follows:
(1) A transaction obtains an exclusive write lock on a blob
(not implemented yet.)
(2) When it requests a write, the blob it writes to is added to
a data structure that lists all dirty blobs by transaction,
and the page containing the blob entry is updated. (The fd
bit in the record is flipped, and the LSN is updated.) The
write to the blob store is not flushed to disk.
(3) All subsequent writes to the same blob just update the
backing file.
(4) On commit and rollback, the data structure containing the xid's
dirty blobs is destroyed.
(5) recovery2.c handles the restoration of the fd bits using
physical logging (this is automatic, since we use Tset()
calls to update the records.)
@todo Should the tripleHash be its own little support library?
*/
@ -17,12 +42,14 @@ BEGIN_C_DECLS
it's dirty (it could have been stolen), an retrieve it from the
appropriate blob file.
*/
void readBlob(recordid rid, void * buf);
void readBlob(int xid, recordid rid, void * buf);
/**
If you write to a blob, call this function to mark it dirty.
*/
void writeBlob(recordid rid, lsn_t lsn, void * buf);
void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf);
/**
Atomically (with respect to recovery) make the dirty version of the

View file

@ -68,8 +68,8 @@ static unsigned int bufferSize = 1; /* < MAX_BUFFER_SIZE */
static Page *repHead, *repMiddle, *repTail; /* replacement policy */
static int stable = -1;
int blobfd0 = -1;
int blobfd1 = -1;
/*int blobfd0 = -1;
int blobfd1 = -1;*/
static void pageMap(Page *ret) {
@ -95,8 +95,8 @@ int bufInit() {
bufferSize = 1;
stable = -1;
blobfd0 = -1;
blobfd1 = -1;
/* blobfd0 = -1;
blobfd1 = -1; */
/* Create STORE_FILE, BLOB0_FILE, BLOB1_FILE if necessary,
@ -332,52 +332,73 @@ Page loadPage (int pageid) {
Page * lastRallocPage = 0;
recordid ralloc(int xid, size_t size) {
recordid ralloc(int xid, lsn_t lsn, size_t size) {
static unsigned int lastFreepage = 0;
recordid ret;
Page p;
int blobSize = 0;
/* int blobSize = 0; */
if (size >= BLOB_THRESHOLD_SIZE) { /* TODO combine this with if below */
blobSize = size;
size = BLOB_REC_SIZE;
}
ret = allocBlob(xid, lsn, size);
/* blobSize = size;
size = BLOB_REC_SIZE; */
} else {
while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; }
while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; }
if (blobSize >= BLOB_THRESHOLD_SIZE) {
int fileSize = (int) lseek(blobfd1, 0 , SEEK_END);
/* fstat(blobfd1, &sb);
fileSize = (int) sb.st_size; */
lseek(blobfd0, fileSize+blobSize-1, SEEK_SET);
/* if (blobSize >= BLOB_THRESHOLD_SIZE) {
int fileSize = (int) lseek(blobfd1, 0 , SEEK_END);
/ * fstat(blobfd1, &sb);
fileSize = (int) sb.st_size; * /
lseek(blobfd0, fileSize+blobSize-1, SEEK_SET);
write(blobfd0, "", 1);
lseek(blobfd1, fileSize+blobSize-1, SEEK_SET);
write(blobfd1, "", 1);
return pageBalloc(p, blobSize, fileSize);
} else {
return pageRalloc(p, size);
}
} else { */
ret = pageRalloc(p, size);
/* } */
}
DEBUG("alloced rid = {%d, %d, %d}\n", ret.page, ret.slot, ret.size);
return ret;
}
long readLSN(int pageid) {
return pageReadLSN(loadPage(pageid));
}
void writeLSN(long LSN, int pageid) {
static void writeLSN(lsn_t LSN, int pageid) {
Page *p = loadPagePtr(pageid);
p->LSN = LSN;
pageWriteLSN(*p);
}
void writeRecord(int xid, recordid rid, const void *dat) {
void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
Page *p = loadPagePtr(rid.page);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
Page *p;
pageWriteRecord(xid, *p, rid, dat); /* Used to attempt to return this. */
if(rid.size > BLOB_THRESHOLD_SIZE) {
DEBUG("Writing blob.\n");
writeBlob(xid, lsn, rid, dat);
} else {
DEBUG("Writing record.\n");
p = loadPagePtr(rid.page);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
pageWriteRecord(xid, *p, rid, dat);
writeLSN(lsn, rid.page);
}
}
void readRecord(int xid, recordid rid, void *buf) {
pageReadRecord(xid, loadPage(rid.page), rid, buf); /* Used to attempt to return this. */
if(rid.size > BLOB_THRESHOLD_SIZE) {
DEBUG("Reading blob. xid = %d rid = { %d %d %d } buf = %x\n", xid, rid.page, rid.slot, rid.size, (unsigned int)buf);
readBlob(xid, rid, buf);
} else {
DEBUG("Reading record xid = %d rid = { %d %d %d } buf = %x\n", xid, rid.page, rid.slot, rid.size, (unsigned int)buf);
pageReadRecord(xid, loadPage(rid.page), rid, buf);
}
}
int flushPage(Page page) {
@ -388,21 +409,27 @@ int flushPage(Page page) {
return 0;
}
int bufTransCommit(int xid) {
int bufTransCommit(int xid, lsn_t lsn) {
commitBlobs(xid);
/** @todo Figure out where the blob files are fsynced() and delete this and the next few lines... */
/*
fdatasync(blobfd0);
fdatasync(blobfd1);
*/
pageCommit(xid);
pageCommit(xid);
return 0;
return 0;
}
int bufTransAbort(int xid) {
int bufTransAbort(int xid, lsn_t lsn) {
abortBlobs(xid); /* abortBlobs doesn't write any log entries, so it doesn't need the lsn. */
pageAbort(xid);
pageAbort(xid);
return 0;
return 0;
}
void bufDeinit() {

View file

@ -44,6 +44,8 @@ terms specified in this license.
#include <lladd/logger/logger2.h>
#include <malloc.h>
#include <stdlib.h>
TransactionLog LogTransBegin(int xid) {
TransactionLog tl;
tl.xid = xid;
@ -53,21 +55,29 @@ TransactionLog LogTransBegin(int xid) {
return tl;
}
static void LogTransCommon(TransactionLog * l, int type) {
static lsn_t LogTransCommon(TransactionLog * l, int type) {
LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type);
lsn_t ret;
writeLogEntry(e);
l->prevLSN = e->LSN;
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
ret = e->LSN;
free(e);
return ret;
}
void LogTransCommit(TransactionLog * l) {
LogTransCommon(l, XCOMMIT);
lsn_t LogTransCommit(TransactionLog * l) {
return LogTransCommon(l, XCOMMIT);
}
void LogTransAbort(TransactionLog * l) {
LogTransCommon(l, XABORT);
lsn_t LogTransAbort(TransactionLog * l) {
return LogTransCommon(l, XABORT);
}
LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte * args) {
@ -82,14 +92,17 @@ LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte
}
if(operationsTable[operation].undo == NO_INVERSE) {
DEBUG("Creating %d byte physical pre-image.\n", rid.size);
preImage = malloc(rid.size);
if(!preImage) { perror("malloc"); abort(); }
readRecord(l->xid, rid, preImage);
DEBUG("got preimage");
}
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage);
writeLogEntry(e);
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize);
if(preImage) {
free(preImage);

View file

@ -48,19 +48,23 @@ terms specified in this license.
Operation operationsTable[MAX_OPERATIONS];
void doUpdate(const LogEntry * e) {
operationsTable[e->contents.update.funcID].run(e->xid, e->contents.update.rid, getUpdateArgs(e));
DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN);
operationsTable[e->contents.update.funcID].run(e->xid, e->LSN, e->contents.update.rid, getUpdateArgs(e));
}
void redoUpdate(const LogEntry * e) {
if(e->type == UPDATELOG) {
lsn_t pageLSN = readLSN(e->contents.update.rid.page);
#ifdef DEBUGGING
recordid rid = e->contents.update.rid;
#endif
if(e->LSN > readLSN(e->contents.update.rid.page)) {
DEBUG("Redo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size);
if(e->LSN > pageLSN) {
DEBUG("OPERATION Redo, %ld > %ld {%d %d %d}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size);
doUpdate(e);
} else {
DEBUG("Skipping redo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size);
DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %d}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size);
}
} else if(e->type == CLRLOG) {
LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN);
@ -71,10 +75,10 @@ void redoUpdate(const LogEntry * e) {
doesn't, then undo the original operation. */
if(f->LSN > readLSN(e->contents.update.rid.page)) {
DEBUG("Undoing for clr, %ld {%d %d %d}\n", f->LSN, rid.page, rid.slot, rid.size);
undoUpdate(f);
DEBUG("OPERATION Undoing for clr, %ld {%d %d %d}\n", f->LSN, rid.page, rid.slot, rid.size);
undoUpdate(f, e->LSN);
} else {
DEBUG("Skiping undo for clr, %ld {%d %d %d}\n", f->LSN, rid.page, rid.slot, rid.size);
DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %d}\n", f->LSN, rid.page, rid.slot, rid.size);
}
} else {
assert(0);
@ -83,30 +87,39 @@ void redoUpdate(const LogEntry * e) {
}
void undoUpdate(const LogEntry * e) {
void undoUpdate(const LogEntry * e, lsn_t clr_lsn) {
int undo = operationsTable[e->contents.update.funcID].undo;
/* printf("FuncID: %d Undo op: %d\n",e->contents.update.funcID, undo); fflush(NULL); */
if(e->LSN <= readLSN(e->contents.update.rid.page)) {
DEBUG("OPERATION FuncID %d Undo op %d LSN %ld\n",e->contents.update.funcID, undo, clr_lsn);
#ifdef DEBUGGING
recordid rid = e->contents.update.rid;
recordid rid = e->contents.update.rid;
#endif
lsn_t page_lsn = readLSN(e->contents.update.rid.page);
if(e->LSN <= page_lsn) {
/* Actually execute the undo */
if(undo == NO_INVERSE) {
/* Physical undo */
DEBUG("Physical undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size);
DEBUG("OPERATION Physical undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size);
/** @todo Why were we passing in RECOVERY_XID? */
writeRecord(e->xid, e->contents.update.rid, getUpdatePreImage(e));
writeRecord(e->xid, clr_lsn, e->contents.update.rid, getUpdatePreImage(e));
} else {
/* @see doUpdate() */
/* printf("Logical undo"); fflush(NULL); */
DEBUG("Logical undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size);
operationsTable[undo].run(e->xid, e->contents.update.rid, getUpdateArgs(e));
DEBUG("OPERATION Logical undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size);
operationsTable[undo].run(e->xid, clr_lsn, e->contents.update.rid, getUpdateArgs(e));
}
} else {
DEBUG("OPERATION Skipping undo, %ld {%d %d %d}\n", e->LSN, rid.page, rid.slot, rid.size);
/* if(page_lsn < clr_lsn) {
/ * It is possible that we are in the process of lazily
propogating this change to the page. If this is the case,
simply update the page lsn to the new clr lsn. (This is no longer true)* /
writeLSN(clr_lsn, e->contents.update.rid.page);
} */
}
/* printf("Undo done."); fflush(NULL); */

View file

@ -18,7 +18,7 @@
*/
static int operate(int xid, recordid rid, const void * dat) {
static int operate(int xid, lsn_t lsn, recordid rid, const void * dat) {
Page loadedPage = loadPage(rid.page);
/** Has no effect during normal operation. */
pageSlotRalloc(loadedPage, rid);
@ -26,7 +26,7 @@ static int operate(int xid, recordid rid, const void * dat) {
}
/** @todo Currently, we just lead store space on dealloc. */
static int deoperate(int xid, recordid rid, const void * dat) {
static int deoperate(int xid, lsn_t lsn, recordid rid, const void * dat) {
Page loadedPage = loadPage(rid.page);
/** Has no effect during normal operation. */
pageSlotRalloc(loadedPage, rid);
@ -43,10 +43,25 @@ Operation getAlloc() {
return o;
}
recordid Talloc(int xid, size_t size) {
recordid rid;
rid = ralloc(xid, size);
/**
@todo we pass lsn -1 into ralloc here. This is a kludge, since we
need to log ralloc's return value, but can't get that return value
until after its executed. When it comes time to perform recovery,
it is possible that this record will be leaked during the undo
phase. We could do a two phase allocation to prevent the leak, but
then we'd need to lock the page that we're allocating a record in,
and that's a pain. Plus, this is a small leak. (There is a similar
problem involving blob allocation, which is more serious, as it may
result in double allocation...)
*/
rid = ralloc(xid, -1, size);
Tupdate(xid,rid, NULL, OPERATION_ALLOC);

View file

@ -48,12 +48,12 @@ terms specified in this license.
#include <lladd/operations/decrement.h>
#include <lladd/bufferManager.h>
static int operate(int xid, recordid r, const void *d) {
static int operate(int xid, lsn_t lsn, recordid r, const void *d) {
int i;
readRecord(xid, r, &i);
i--;
writeRecord(xid, r, &i);
writeRecord(xid, lsn, r, &i);
return 0;
}

View file

@ -47,12 +47,12 @@ terms specified in this license.
#include <lladd/operations/increment.h>
#include <lladd/bufferManager.h>
static int operate(int xid, recordid r, const void *d) {
static int operate(int xid, lsn_t lsn, recordid r, const void *d) {
int i;
readRecord(xid, r, &i);
i++;
writeRecord(xid, r, &i);
writeRecord(xid, lsn, r, &i);
return 0;
}

View file

@ -47,8 +47,8 @@ terms specified in this license.
#include <lladd/operations/set.h>
#include <lladd/bufferManager.h>
static int operate(int xid, recordid rid, const void *dat) {
writeRecord(xid, rid, dat);
static int operate(int xid, lsn_t lsn, recordid rid, const void *dat) {
writeRecord(xid, lsn, rid, dat);
return 0;
}

View file

@ -247,22 +247,22 @@ static int touchBlob(int xid, recordid rid) {
return 0;
}
static void rmTouch(int xid) {
/*static void rmTouch(int xid) {
touchedBlob_t *t = &touched[xid%touchedLen];
if( t ) {
free( t->records );
t->records = NULL;
/* touched[xid%touchedLen].xid = -1; TODO: necessary? */
/ * touched[xid%touchedLen].xid = -1; TODO: necessary? * /
}
}
}*/
void pageCommit(int xid) {
rmTouch(xid);
/* rmTouch(xid); */
}
void pageAbort(int xid) {
rmTouch(xid);
/* rmTouch(xid); */
}
/*#define getFirstHalfOfWord(memAddr) (((*(int*)memAddr) >> (2*BITS_PER_BYTE)) & MASK_0000FFFF) */
@ -548,12 +548,14 @@ int isBlobSlot(byte *pageMemAddr, int slot) {
TODO: BufferManager should pass in file descriptors so that this function doesn't have to open and close the file on each call.
@todo This needs to trust the rid, which is fine, but it could do more to check if the page agrees with the rid...
*/
void pageReadRecord(int xid, Page page, recordid rid, byte *buff) {
byte *recAddress = page.memAddr + getSlotOffset(page.memAddr, rid.slot);
/*look at record, if slot offset == blob_slot, then its a blob, else its normal. */
if(isBlobSlot(page.memAddr, rid.slot)) {
/* if(isBlobSlot(page.memAddr, rid.slot)) {
int fd = -1;
int version;
int offset;
@ -565,33 +567,34 @@ void pageReadRecord(int xid, Page page, recordid rid, byte *buff) {
fd = version == 0 ? blobfd0 : blobfd1;
/* if( (fd = open(version == 0 ? BLOB0_FILE : BLOB1_FILE, O_RDWR, 0)) == -1 ) {
/ * if( (fd = open(version == 0 ? BLOB0_FILE : BLOB1_FILE, O_RDWR, 0)) == -1 ) {
printf("page.c:pageReadRecord error and exiting\n");
exit(-1);
} */
} * /
lseek(fd, offset, SEEK_SET);
read(fd, buff, length);
/* close(fd); */
} else {
int size = getSecondHalfOfWord((int*)slotMemAddr(page.memAddr, rid.slot));
/ * close(fd); * /
} else { */
/* For now, we need page.c to trust the rid. */
/* int size = getSecondHalfOfWord((int*)slotMemAddr(page.memAddr, rid.slot)); */
memcpy(buff, recAddress, size);
}
memcpy(buff, recAddress, rid.size);
/*}*/
}
void pageWriteRecord(int xid, Page page, recordid rid, const byte *data) {
byte *rec;
int version = -1;
/* int version = -1;
int fd = -1;
int blobRec[3];
if (isBlobSlot(page.memAddr, rid.slot)) {
/* TODO: Rusty's wild guess as to what's supposed to happen. Touch blob appears to lookup the blob,
/ * TODO: Rusty's wild guess as to what's supposed to happen. Touch blob appears to lookup the blob,
and allocate it if its not there. It returns 0 if it's a new blob, 1 otherwise, so I think we
just ignore its return value...*/
just ignore its return value...* /
if( !touchBlob(xid, rid) ) {
/* record hasn't been touched yet */
/ * record hasn't been touched yet * /
rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot);
version = *(int *)rec;
blobRec[0] = version == 0 ? 1 : 0;
@ -612,40 +615,42 @@ void pageWriteRecord(int xid, Page page, recordid rid, const byte *data) {
perror("write");
}
/* Flush kernel buffers to hard drive. TODO: the
/ * Flush kernel buffers to hard drive. TODO: the
(standard) fdatasync() call only flushes the data
instead of the data + metadata. Need to have
makefile figure out if it's available, and do some
macro magic in order to use it, if possible...
This is no longer called here, since it is called at commit.
*/
/* fsync(fd); */
} else { /* write a record that is not a blob */
* /
/ * fsync(fd); * /
} else { / * write a record that is not a blob */
assert(rid.size < PAGE_SIZE);
rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot);
if(memcpy(rec, data, rid.size) == NULL ) {
printf("ERROR: MEM_WRITE_ERROR on %s line %d", __FILE__, __LINE__);
exit(MEM_WRITE_ERROR);
}
}
/*}*/
}
/* Currently not called any where, or tested. */
byte * pageMMapRecord(int xid, Page page, recordid rid) {
/*byte * pageMMapRecord(int xid, Page page, recordid rid) {
byte *rec;
int version = -1;
int fd = -1;
int blobRec[3];
byte * ret;
if (isBlobSlot(page.memAddr, rid.slot)) {
/* TODO: Rusty's wild guess as to what's supposed to happen. Touch blob appears to lookup the blob,
/ * TODO: Rusty's wild guess as to what's supposed to happen. Touch blob appears to lookup the blob,
and allocate it if its not there. It returns 0 if it's a new blob, 1 otherwise, so I think we
just ignore its return value...*/
just ignore its return value...* /
if( !touchBlob(xid, rid) ) {
/* record hasn't been touched yet */
/ * record hasn't been touched yet * /
rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot);
version = *(int *)rec;
blobRec[0] = version == 0 ? 1 : 0;
@ -663,32 +668,32 @@ byte * pageMMapRecord(int xid, Page page, recordid rid) {
perror("pageMMapRecord");
}
/* if(-1 == lseek(fd, *(int *)(rec +4), SEEK_SET)) {
/ * if(-1 == lseek(fd, *(int *)(rec +4), SEEK_SET)) {
perror("lseek");
}
if(-1 == write(fd, data, *(int *)(rec +8))) {
perror("write");
} */
} * /
/* Flush kernel buffers to hard drive. TODO: the
/ * Flush kernel buffers to hard drive. TODO: the
(standard) fdatasync() call only flushes the data
instead of the data + metadata. Need to have
makefile figure out if it's available, and do some
macro magic in order to use it, if possible...*/
/* fsync(fd); */
macro magic in order to use it, if possible...* /
/ * fsync(fd); * /
} else { /* write a record that is not a blob */
} else { / * write a record that is not a blob * /
rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot);
ret = rec;
/* if(memcpy(rec, data, rid.size) == NULL ) {
/ * if(memcpy(rec, data, rid.size) == NULL ) {
printf("ERROR: MEM_WRITE_ERROR on %s line %d", __FILE__, __LINE__);
exit(MEM_WRITE_ERROR);
}*/
}* /
}
return ret;
}
*/
void pageRealloc(Page *p, int id) {
p->id = id;
p->LSN = 0;

View file

@ -204,18 +204,32 @@ static void Undo(int recovery) {
/* printf("."); fflush(NULL); */
switch(e->type) {
case UPDATELOG:
/* Sanity check. If this fails, we've already undone this
update, or something is wrong with the redo phase. */
update, or something is wrong with the redo phase or normal operation. */
this_lsn= readLSN(e->contents.update.rid.page);
/* printf("1"); fflush(NULL); */
/* This check was incorrect. Since blobManager may lazily
update the pageManager, it is possible for this test to
fail. In that case, the undo is handled by blob manager
(it removes this page from it's dirty blob list), not
us. (Nope, it is now, again correct--fixed blobManager)*/
assert(e->LSN <= this_lsn);
/* printf("1a"); fflush(NULL); */
/* Need to log a clr here. */
clr_lsn = LogCLR(e);
writeLSN(clr_lsn, e->contents.update.rid.page);
undoUpdate(e);
/* writeLSN(clr_lsn, e->contents.update.rid.page); */
/* Undo update is a no-op if the page does not reflect this
update, but it will write the new clr_lsn. */
undoUpdate(e, clr_lsn);
/* printf("1b"); fflush(NULL); */
break;
case CLRLOG:
@ -227,6 +241,9 @@ static void Undo(int recovery) {
/* Don't use xalloc anymore. */
/*assert(0);*/
break;
case XABORT:
/* Since XABORT is a no-op, we can safely ignore it. (XABORT records may be passed in by undoTrans.)*/
break;
default:
printf ("Unknown log type to undo (TYPE=%d, XID= %d, LSN=%ld), skipping...\n", e->type, e->xid, e->LSN);
break;

View file

@ -68,7 +68,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
/* printf("e->LSN: %ld\n", e->LSN); */
writeLSN(e->LSN, rid.page);
/* writeLSN(e->LSN, rid.page); <-- Handled by doUpdate now */
doUpdate(e);
}
@ -78,9 +78,10 @@ void Tread(int xid, recordid rid, void * dat) {
}
int Tcommit(int xid) {
lsn_t lsn;
assert(numActiveXactions <= MAX_TRANSACTIONS);
LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]);
bufTransCommit(xid); /* unlocks pages */
lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]);
bufTransCommit(xid, lsn); /* unlocks pages */
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
numActiveXactions--;
assert( numActiveXactions >= 0 );
@ -88,10 +89,13 @@ int Tcommit(int xid) {
}
int Tabort(int xid) {
lsn_t lsn;
lsn = LogTransAbort(&XactionTable[xid%MAX_TRANSACTIONS]);
/* should call undoTrans after log trans abort. undoTrans will cause pages to contain CLR values corresponding to */
/* @todo is the order of the next two calls important? */
undoTrans(XactionTable[xid%MAX_TRANSACTIONS]);
LogTransAbort(&XactionTable[xid%MAX_TRANSACTIONS]);
bufTransAbort(xid);
bufTransAbort(xid, lsn);
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
numActiveXactions--;
assert( numActiveXactions >= 0 );

View file

@ -1,11 +1,11 @@
INCLUDES = @CHECK_CFLAGS@
if HAVE_CHECK
## Had to disable check_lht because lht needs to be rewritten.
TESTS = check_logEntry check_logWriter check_operations check_transactional2 check_recovery
TESTS = check_logEntry check_logWriter check_operations check_transactional2 check_recovery check_blobRecovery
else
TESTS =
endif
noinst_PROGRAMS = $(TESTS)
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a -lefence
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -0,0 +1,575 @@
/*---
This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
and that this notice is included verbatim in any distributions. No
written agreement, license, or royalty fee is required for any of the
authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are
acquiring the software on behalf of the Department of Defense, the
software shall be classified as "Commercial Computer Software" and the
Government shall have only "Restricted Rights" as defined in Clause
252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the
authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include <config.h>
#include <check.h>
/*#include <assert.h> */
#include <lladd/transactional.h>
#include <lladd/logger/logWriter.h>
#include "../check_includes.h"
#include <assert.h>
#define LOG_NAME "check_blobRecovery.log"
#define ARRAY_SIZE 20321
static void arraySet(int * a, int mul) {
int i;
for ( i = 0 ; i < ARRAY_SIZE; i++) {
a[i]= mul*i;
}
}
/**
@test
Simple test: Insert some stuff. Commit. Call Tdeinit(). Call
Tinit() (Which initiates recovery), and see if the stuff we
inserted is still there.
Only performs idempotent operations (Tset).
*/
START_TEST (recoverBlob__idempotent) {
int xid;
int j[ARRAY_SIZE];
int k[ARRAY_SIZE];
recordid rid;
Tinit();
xid = Tbegin();
rid = Talloc(xid, ARRAY_SIZE * sizeof(int));
arraySet(j, 1);
Tset(xid, rid, j);
Tread(xid, rid, k);
fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "Get/Set broken?");
arraySet(k, 12312);
Tcommit(xid);
xid = Tbegin();
Tread(xid, rid, k);
fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "commit broken");
Tcommit(xid);
Tdeinit();
Tinit(); /* Runs recoverBlob_.. */
arraySet(k, 12312);
xid = Tbegin();
Tread(xid, rid, k);
fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "Recovery messed something up!");
Tcommit(xid);
Tdeinit();
}
END_TEST
/**
@test
Simple test: Alloc a record, commit. Call Tincrement on it, and
remember its value and commit. Then, call Tdeinit() and Tinit()
(Which initiates recovery), and see if the value changes.
@todo: Until we have a non-idempotent operation on blobs, this test can't be written.
*/
START_TEST (recoverBlob__exactlyOnce) {
int xid;
int j;
int k;
recordid rid;
/* if(1) {
return;
} */
fail_unless(0, "Need to write this test...");
Tinit();
xid = Tbegin();
rid = Talloc(xid, sizeof(int));
Tincrement(xid, rid);
Tread(xid, rid, &j);
Tcommit(xid);
xid = Tbegin();
Tread(xid, rid, &k);
fail_unless(j == k, "Get/Set broken?");
Tcommit(xid);
Tdeinit();
Tinit(); /* Runs recovery.. */
k = 12312;
xid = Tbegin();
Tread(xid, rid, &k);
fail_unless(j == k, "Recovery messed something up!");
Tcommit(xid);
Tdeinit();
}
END_TEST
/**
@test
Makes sure that aborted idempotent operations are correctly undone.
*/
START_TEST (recoverBlob__idempotentAbort) {
int xid;
int j[ARRAY_SIZE];
int k[ARRAY_SIZE];
recordid rid;
Tinit();
xid = Tbegin();
rid = Talloc(xid, ARRAY_SIZE * sizeof(int));
arraySet(j, 1);
Tset(xid, rid, j);
arraySet(k, 2);
Tread(xid, rid, k);
fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "Get/set broken?!");
Tcommit(xid);
xid = Tbegin();
arraySet(k, 3);
Tread(xid, rid, k);
fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)), "commit broken?");
Tcommit(xid);
xid = Tbegin();
arraySet(k, 2);
Tset(xid, rid, k);
arraySet(k, 4);
Tread(xid, rid, k);
arraySet(j, 2);
fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)),NULL);
Tabort(xid);
xid = Tbegin();
arraySet(j, 1);
arraySet(k, 4);
Tread(xid, rid, &k);
Tabort(xid);
fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)),"Didn't abort!");
Tdeinit();
Tinit(); /* Runs recovery.. */
arraySet(k, 12312);
xid = Tbegin();
Tread(xid, rid, k);
fail_unless(!memcmp(j, k, ARRAY_SIZE * sizeof(int)),"recovery messed something up.");
Tcommit(xid);
Tdeinit();
}
END_TEST
/**
@test Makes sure that aborted non-idempotent operations are
correctly undone. Curently, we don't support such operations on
blobs, so this test is not implemented.
@todo logical operations on blobs.
*/
START_TEST (recoverBlob__exactlyOnceAbort) {
int xid;
int j;
int k;
recordid rid;
/* if(1)
return ;
*/
fail_unless(0, "Need to write this test...");
Tinit();
xid = Tbegin();
rid = Talloc(xid, sizeof(int));
j = 1;
Tincrement(xid, rid);
Tread(xid, rid, &j);
Tcommit(xid);
xid = Tbegin();
Tincrement(xid, rid);
Tread(xid, rid, &k);
fail_unless(j == k-1, NULL);
Tabort(xid);
xid = Tbegin();
Tread(xid, rid, &k);
fail_unless(j == k, "didn't abort?");
Tcommit(xid);
Tdeinit();
Tinit();
xid = Tbegin();
Tread(xid, rid, &k);
fail_unless(j == k, "Recovery didn't abort correctly");
Tcommit(xid);
Tdeinit();
}
END_TEST
/**
@test
Check the CLR mechanism with an aborted logical operation, and multipl Tinit()/Tdeinit() cycles.
@todo Devise a way of implementing this for blobs.
*/
START_TEST(recoverBlob__clr) {
recordid rid;
int xid;
int j;
int k;
/* if(1) return; */
fail_unless(0, "Need to write this test...");
DEBUG("\n\nStart CLR test\n\n");
Tinit();
xid = Tbegin();
rid = Talloc(xid, sizeof(int));
Tread(xid, rid, &j);
Tincrement(xid, rid);
Tabort(xid);
xid = Tbegin();
Tread(xid, rid, &k);
Tcommit(xid);
fail_unless(j == k, NULL);
Tdeinit();
Tinit();
Tdeinit();
Tinit();
xid = Tbegin();
Tread(xid, rid, &k);
Tcommit(xid);
fail_unless(j == k, NULL);
Tdeinit();
Tinit();
xid = Tbegin();
Tread(xid, rid, &k);
Tcommit(xid);
fail_unless(j == k, NULL);
Tdeinit();
} END_TEST
void simulateBufferManagerCrash();
extern int numActiveXactions;
/**
@test
Tests the undo phase of recovery by simulating a crash, and calling Tinit().
@todo Really should check logical operations, if they are ever supported for blobs.
*/
START_TEST(recoverBlob__crash) {
int xid;
recordid rid;
int j[ARRAY_SIZE];
int k[ARRAY_SIZE];
Tinit();
xid = Tbegin();
rid = Talloc(xid, sizeof(int)* ARRAY_SIZE);
arraySet(j, 3);
Tset(xid, rid, &j);
arraySet(j, 9);
Tset(xid, rid, &j);
/* RID = 9. */
Tread(xid, rid, &j);
arraySet(k, 9);
fail_unless(!memcmp(j,k,ARRAY_SIZE), "set not working?");
Tcommit(xid);
xid = Tbegin();
arraySet(k, 6);
Tset(xid, rid, &k);
/* RID = 6. */
Tread(xid, rid, &j);
fail_unless(!memcmp(j,k,ARRAY_SIZE), NULL);
simulateBufferManagerCrash();
closeLogWriter();
numActiveXactions = 0;
Tinit();
Tread(xid, rid, &j);
arraySet(k, 9);
fail_unless(!memcmp(j,k,ARRAY_SIZE), "Recovery didn't roll back in-progress xact!");
Tdeinit();
Tinit();
Tread(xid, rid, &j);
fail_unless(!memcmp(j,k,ARRAY_SIZE), "Recovery failed on second re-open.");
Tdeinit();
} END_TEST
/**
@test Tests recovery when more than one transaction is in progress
at the time of the crash. This test is interesting because blob
operations from multiple transactions could hit the same page.
@todo implement this sometime...
*/
START_TEST (recoverBlob__multiple_xacts) {
int xid1, xid2, xid3, xid4;
recordid rid1, rid2, rid3, rid4;
int j1, j2, j3, j4, k;
Tinit();
j1 = 1;
j2 = 2;
j3 = 4;
j4 = 3;
xid1 = Tbegin();
rid1 = Talloc(xid1, sizeof(int));
xid2 = Tbegin();
xid3 = Tbegin();
Tset(xid1, rid1, &j1);
rid2 = Talloc(xid2, sizeof(int));
rid3 = Talloc(xid3, sizeof(int));
Tread(xid3, rid3, &k);
Tset(xid3, rid3, &j3);
Tcommit(xid3);
xid3 = Tbegin();
Tincrement(xid3, rid3);
Tset(xid2, rid2, &j2);
Tcommit(xid1);
xid4 = Tbegin();
Tcommit(xid2);
rid4 = Talloc(xid4, sizeof(int));
Tset(xid4, rid4, &j4);
Tincrement(xid4, rid4);
Tcommit(xid4);
xid1 = Tbegin();
k = 100000;
Tset(xid1, rid1,&k);
xid2 = Tbegin();
Tdecrement(xid2, rid2);
Tdecrement(xid2, rid2);
Tdecrement(xid2, rid2);
Tdecrement(xid2, rid2);
Tdecrement(xid2, rid2);
Tincrement(xid1, rid1);
Tset(xid1, rid1,&k);
/*simulate crash */
simulateBufferManagerCrash();
closeLogWriter();
numActiveXactions = 0;
Tinit();
Tdeinit();
Tinit();
xid1 = Tbegin();
xid2 = Tbegin();
xid3 = Tbegin();
xid4 = Tbegin();
Tread(xid1, rid1, &j1);
Tread(xid2, rid2, &j2);
Tread(xid3, rid3, &j3);
Tread(xid4, rid4, &j4);
fail_unless(j1 == 1, NULL);
fail_unless(j2 == 2, NULL);
fail_unless(j3 == 4, NULL);
fail_unless(j4 == 4, NULL);
Tdeinit();
} END_TEST
/**
Add suite declarations here
*/
Suite * check_suite(void) {
Suite *s = suite_create("recovery_suite");
/* Begin a new test */
TCase *tc = tcase_create("recovery");
void * foobar; /* used to supress warnings. */
/* Sub tests are added, one per line, here */
tcase_add_test(tc, recoverBlob__idempotent);
/* tcase_add_test(tc, recoverBlob__exactlyOnce); */
foobar = (void*)&recoverBlob__exactlyOnce;
tcase_add_test(tc, recoverBlob__idempotentAbort);
/* tcase_add_test(tc, recoverBlob__exactlyOnceAbort); */
foobar = (void*)&recoverBlob__exactlyOnceAbort;
/* tcase_add_test(tc, recoverBlob__clr); */
foobar = (void*)&recoverBlob__clr;
tcase_add_test(tc, recoverBlob__crash);
/* tcase_add_test(tc, recoverBlob__multiple_xacts); */
foobar = (void*)&recoverBlob__multiple_xacts;
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);
return s;
}
#include "../check_setup.h"

View file

@ -52,49 +52,64 @@ terms specified in this license.
Assuming that the Tset() operation is implemented correctly, checks
that doUpdate, redoUpdate and undoUpdate are working correctly, for
operations that use physical logging.
@todo Now that writes + lsn updates are atomic, this test case probably breaks.
*/
START_TEST(operation_physical_do_undo) {
int xid = 1;
recordid rid;
lsn_t lsn = 0;
lsn_t lsn = 2;
int buf;
int arg;
LogEntry * setToTwo;
Tinit();
rid = ralloc(xid, sizeof(int));
rid = ralloc(xid, 1, sizeof(int));
buf = 1;
arg = 2;
DEBUG("A\n");
setToTwo = allocUpdateLogEntry(-1, xid, OPERATION_SET, rid, (void*)&arg, sizeof(int), (void*)&buf);
/* Do, undo and redo operation without updating the LSN field of the page. */
writeLSN(lsn, rid.page);
writeRecord(xid, rid, &buf);
/* writeLSN(lsn, rid.page); */
DEBUG("B\n");
writeRecord(xid, lsn, rid, &buf);
setToTwo->LSN = 1;
setToTwo->LSN = 10;
doUpdate(setToTwo);
DEBUG("C\n");
doUpdate(setToTwo); /* PAGE LSN= 10, value = 2. */
readRecord(xid, rid, &buf);
fail_unless(buf == 2, NULL);
undoUpdate(setToTwo); /* Should fail, LSN wasn't updated. */
DEBUG("D\n");
fail_unless(10 == readLSN(rid.page), "page lsn not set correctly.");
setToTwo->LSN = 5;
undoUpdate(setToTwo, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */
readRecord(xid, rid, &buf);
fail_unless(buf == 2, NULL);
fail_unless(buf == 1, NULL);
DEBUG("E\n");
redoUpdate(setToTwo);
readRecord(xid, rid, &buf);
fail_unless(buf == 2, NULL);
fail_unless(buf == 1, NULL);
writeLSN(3,rid.page);
/* writeLSN(3,rid.page); */
/* Now, simulate scenarios from normal operation:
do the operation, and update the LSN, (update happens)
@ -109,32 +124,50 @@ START_TEST(operation_physical_do_undo) {
lsn = 0;
buf = 1;
writeLSN(lsn, rid.page);
writeRecord(xid, rid, &buf);
/* writeLSN(lsn, rid.page); */
writeRecord(xid, lsn, rid, &buf);
setToTwo->LSN = 1;
/* Trace of test:
doUpdate(setToTwo);
writeLSN(setToTwo->LSN, rid.page);
PAGE LSN LOG LSN CLR LSN TYPE SUCCEED?
2 10 - do write YES (C)
10 5 8 undo write YES (D)
8 5 - redo write NO (E)
8 10 - redo write YES (F)
....... and so on.
*/
setToTwo->LSN = 10;
DEBUG("F\n");
redoUpdate(setToTwo);
/* writeLSN(setToTwo->LSN, rid.page); */
readRecord(xid, rid, &buf);
fail_unless(buf == 2, NULL);
undoUpdate(setToTwo); /* Succeeds */
DEBUG("G undo set to 2\n");
undoUpdate(setToTwo, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
readRecord(xid, rid, &buf);
fail_unless(buf == 1, NULL);
DEBUG("H don't redo set to 2\n");
redoUpdate(setToTwo); /* Fails */
readRecord(xid, rid, &buf);
fail_unless(buf == 1, NULL);
writeLSN(0,rid.page);
writeRecord(xid, 0, rid, &buf); /* reset the page's LSN. */
/* writeLSN(0,rid.page); */
DEBUG("I redo set to 2\n");
redoUpdate(setToTwo); /* Succeeds */
readRecord(xid, rid, &buf);

View file

@ -43,7 +43,7 @@ terms specified in this license.
#include <assert.h>
#include <lladd/transactional.h>
#include "../check_includes.h"
#define LOG_NAME "check_transactional2.log"
/**
Assuming that the Tset() operation is implemented correctly, checks
@ -86,6 +86,62 @@ START_TEST(transactional_smokeTest) {
}
END_TEST
/**
Just like transactional_smokeTest, but check blobs instead.
*/
START_TEST(transactional_blobSmokeTest) {
#define ARRAY_SIZE 10000
int xid;
recordid rid;
int foo[ARRAY_SIZE];
int bar[ARRAY_SIZE];
int i;
for(i = 0; i < ARRAY_SIZE; i++) {
foo[i] = i;
bar[i] = 2 * i;
}
Tinit();
xid = Tbegin();
rid = Talloc(xid, ARRAY_SIZE * sizeof(int));
fail_unless(rid.size == ARRAY_SIZE * sizeof(int), NULL);
printf("TSet starting.\n"); fflush(NULL);
Tset(xid, rid, &foo);
printf("TSet returning.\n"); fflush(NULL);
Tread(xid, rid, &bar);
for(i = 0 ; i < ARRAY_SIZE; i++) {
assert(bar[i] == foo[i]);
fail_unless(bar[i] == foo[i], NULL);
}
Tcommit(xid);
xid = Tbegin();
for(i = 0; i < ARRAY_SIZE; i++) {
bar[i] = 2 * i;
}
Tread(xid, rid, &bar);
for(i = 0 ; i < ARRAY_SIZE; i++) {
fail_unless(bar[i] == foo[i], NULL);
}
Tabort(xid);
Tdeinit();
}
END_TEST
/**
Add suite declarations here
*/
@ -96,7 +152,9 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */
tcase_add_test(tc, transactional_smokeTest);
tcase_add_test(tc, transactional_blobSmokeTest);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);