Completely new blob implementation. Slower than the old one was supposed to be, but is correct, and extremeley

simple.  It simply writes the pre-image and post-image of each blob to the log using TpageSet.
This commit is contained in:
Sears Russell 2006-06-21 00:45:18 +00:00
parent eabb8010f4
commit d26263262b
4 changed files with 65 additions and 503 deletions

View file

@ -267,7 +267,7 @@ typedef struct {
typedef struct {
size_t offset;
size_t size;
unsigned fd : 1;
// unsigned fd : 1;
} blob_record_t;

View file

@ -1,509 +1,79 @@
#include <unistd.h>
#include <config.h>
#include <lladd/common.h>
#include <string.h>
#include <lladd/transactional.h>
#include "page/raw.h"
#include <assert.h>
#include <lladd/transactional.h>
#include <lladd/bufferManager.h>
#include <lladd/constants.h>
#include "latches.h"
#include "blobManager.h"
#include "io.h"
#include <pbl/pbl.h>
#include "page.h"
#include <stdio.h>
pthread_mutex_t blob_hash_mutex;
static FILE * blobf0 = NULL, * blobf1 = NULL;
/**
This is a hash of hash tables. The outer hash maps from xid to
inner hash. The inner hash maps from rid to lsn.
*/
static pblHashTable_t * dirtyBlobs;
static lsn_t * tripleHashLookup(int xid, recordid rid) {
lsn_t * ret;
pthread_mutex_lock(&blob_hash_mutex);
pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(xid));
if(xidHash == NULL) {
ret = NULL;
} else {
pblHashTable_t * pageXidHash = pblHtLookup(xidHash, &(rid.page), sizeof(int));
if(pageXidHash == NULL) {
ret = NULL;
} else {
ret = pblHtLookup(pageXidHash, &rid, sizeof(recordid));
}
}
pthread_mutex_unlock(&blob_hash_mutex);
return ret;
void allocBlob(int xid, recordid rid) {
assert(rid.size>0);
int pageCount = (rid.size / USABLE_SIZE_OF_PAGE) + ((rid.size % USABLE_SIZE_OF_PAGE) ? 1 : 0);
long startPage = TpageAllocMany(xid, pageCount);
blob_record_t rec;
rec.offset = startPage;
rec.size = rid.size;
recordid rid2 = rid;
rid2.size = BLOB_SLOT;
Tset(xid, rid2, &rec);
// printf("Page start = %d, count = %d, rid.size=%d\n", rec.offset, pageCount, rid.size);
// printf("rid = {%d %d %d}\n", rid.page, rid.slot, rid.size);
}
static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) {
pblHashTable_t * xidHash;
pblHashTable_t * pageXidHash;
lsn_t * copy;
pthread_mutex_lock(&blob_hash_mutex);
xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); /* Freed in doubleHashRemove */
if(xidHash == NULL) {
xidHash = pblHtCreate();
DEBUG("New blob xact: xid = %d\n", xid);
pblHtInsert(dirtyBlobs, &xid, sizeof(int), xidHash);
}
pageXidHash = pblHtLookup(xidHash, &(rid.page), sizeof(int));
if(pageXidHash == NULL) {
pageXidHash = pblHtCreate();
pblHtInsert(xidHash, &(rid.page), sizeof(int), pageXidHash);
}
copy = malloc(sizeof(lsn_t)); /* Freed in doubleHashRemove */
*copy = newLSN;
pblHtInsert(pageXidHash, &rid, sizeof(recordid), copy);
pthread_mutex_unlock(&blob_hash_mutex);
}
/*
static void tripleHashRemove(int xid, recordid rid) {
pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int));
if(xidHash) { / * Else, there was no xid, rid pair. * /
pblHashTable_t * pageXidHash = pblHtLookup(xidHash, &(rid.page), sizeof(int));
if(pageXidHash) {
lsn_t * delme = pblHtLookup(pageXidHash, &rid, sizeof(recordid));
pblHtRemove(pageXidHash, &rid, sizeof(recordid));
free(delme);
/ * We freed a member of pageXidHash. Is it empty? * /
if(!pblHtFirst(pageXidHash)) {
pblHtRemove(xidHash, &(rid.page), sizeof(int));
/ * Is xidHash now empty? * /
if(!pblHtFirst(xidHash)) {
pblHtRemove(dirtyBlobs, &xid, sizeof(int));
free(xidHash);
}
free(pageXidHash);
}
}
}
}*/
/* moved verbatim from bufferManger.c, then hacked up to use FILE * instead of ints. */
void openBlobStore() {
/* the r+ mode opens an existing file read /write */
if( ! (blobf0 = fopen(BLOB0_FILE, "r+"))) { /* file may not exist */
/* the w+ mode truncates, creates, and opens read / write */
if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); }
}
DEBUG("blobf0 opened.\n");
if( ! (blobf1 = fopen(BLOB1_FILE, "r+"))) { /* file may not exist */
if(!(blobf1 = fopen(BLOB1_FILE, "w+"))) { perror("Couldn't open or create blob 1 file"); abort(); }
}
DEBUG("blobf1 opened.\n");
dirtyBlobs = pblHtCreate();
pthread_mutex_init(&blob_hash_mutex, NULL);
}
/** Discards all changes to dirty blobs, and closes the blob store.
@todo memory leak: Will leak memory if there are any outstanding
xacts that have written to blobs. Should explicitly abort them
instead of just invalidating the dirtyBlobs hash.
(If the you fix the above todo, don't forget to fix
bufferManager's simulateBufferManagerCrash.)
*/
void closeBlobStore() {
int ret = fclose(blobf0);
assert(!ret);
ret = fclose(blobf1);
assert(!ret);
blobf0 = NULL;
blobf1 = NULL;
pblHashTable_t * xidhash;
for(xidhash = pblHtFirst(dirtyBlobs); xidhash; xidhash = pblHtNext(dirtyBlobs)) {
fflush(NULL);
sync();
printf("WARNING!: Found list of dirty blobs for transaction: %ld\nIt is possible that these blobs were not synced to disk properly.\n\nThe data has now been flushed to disk, but this warning could indicate a bug that could cause data corruption.", *(lsn_t*)pblHtCurrentKey(dirtyBlobs));
fflush(NULL);
sync();
pblHtRemove(dirtyBlobs, 0, 0);
}
void readBlob(int xid, Page * p2, recordid rid, byte * buf) {
int chunk;
recordid rawRid = rid;
rawRid.size = BLOB_SLOT;
byte * pbuf = alloca(PAGE_SIZE);
blob_record_t rec;
readRecord(xid, p2, rawRid, &rec);
for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(buf + (chunk * USABLE_SIZE_OF_PAGE), pbuf, USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf);
}
pblHtDelete(dirtyBlobs);
pthread_mutex_destroy(&blob_hash_mutex);
}
/**
blob allocation:
generate rid
log rid (+ alloc)
alloc space in store
log rid write
perform rid write
write space allocation to store
(It used to be:)
allocate space in store
generate rid
write space in store
log rid
write rid alloc
log rid
The trick here is to make sure that the write to the record
happens after the record's allocation has been logged.
*/
compensated_function recordid preAllocBlob(int xid, long blobSize) {
/* Allocate space for the blob entry. */
recordid rid;
try_ret(NULLRID) {
DEBUG("Allocing blob (size %ld)\n", blobSize);
assert(blobSize > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
rid = Talloc(xid, BLOB_SLOT); //sizeof(blob_record_t));
rid.size = blobSize;
} end_ret(NULLRID);
return rid;
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(buf + (chunk * USABLE_SIZE_OF_PAGE), pbuf, rid.size % USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf);
// printf("Chunk = %d\n", chunk);
}
compensated_function recordid preAllocBlobFromPage(int xid, long page, long blobSize) {
recordid rid;
/* Allocate space for the blob entry. */
try_ret(NULLRID) {
DEBUG("Allocing blob (size %ld)\n", blobSize);
assert(blobSize > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
rid = TallocFromPage(xid, page, BLOB_SLOT); //sizeof(blob_record_t));
rid.size = blobSize;
} end_ret(NULLRID);
return rid;
}
void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid) {
long fileSize;
blob_record_t blob_rec;
char zero = 0;
DEBUG("post Allocing blob (size %ld)\n", rid.size);
assert(rid.size > 0); /* Don't support zero length blobs right now... */
/** Finally, fix up the fields in the record that points to the blob.
The rest of this also should go into alloc.c
*/
blob_rec.fd = 0;
blob_rec.size = rid.size;
flockfile(blobf1);
flockfile(blobf0);
fileSize = myFseek(blobf1, 0, SEEK_END);
blob_rec.offset = fileSize;
/* setRecordType(p, rid, BLOB_SLOT); */
/* rid.size = BLOB_SLOT; */
/* releasePage(p); */
rid.size = blob_rec.size;
/* Allocate space for the blob entry. */
/* First in buffer manager. */
/* Read in record to get the correct offset, size for the blob*/
/* * @ todo blobs deadlock... */
/* readRawRecord(xid, p, rid, &blob_rec, sizeof(blob_record_t)); */
myFseek(blobf0, fileSize + rid.size - 1, SEEK_SET);
myFseek(blobf1, fileSize + rid.size - 1, SEEK_SET);
if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); }
if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); }
#ifdef HAVE_FDATASYNC
fdatasync(fileno(blobf0));
fdatasync(fileno(blobf1));
#else
//#warn Not using fdatasync
fsync(fileno(blobf0));
fsync(fileno(blobf1));
#endif
funlockfile(blobf0);
funlockfile(blobf1);
/*
(This call must be after the files have been extended, and synced
to disk, since it marks completion of the blob allocation.)
*/
rid.size = BLOB_SLOT;
assert(blob_rec.size);
writeRecord(xid, p, lsn, rid, &blob_rec);
}
void readBlob(int xid, Page * p, recordid rid, void * buf) {
/* We don't care if the blob is dirty, since the record from the
buffer manager will reflect that if it is.. */
void writeBlob(int xid, Page * p2, lsn_t lsn, recordid rid, const byte * buf) {
int chunk;
recordid rawRid = rid;
rawRid.size = BLOB_SLOT;
byte * pbuf = alloca(PAGE_SIZE);
blob_record_t rec;
FILE * fd;
long offset;
assert(buf);
recordid blob_rec_rid = rid;
blob_rec_rid.size = BLOB_SLOT;
readRecord(xid, p, blob_rec_rid, &rec);
fd = rec.fd ? blobf1 : blobf0;
DEBUG("reading blob at offset %d, size %ld, buffer %x\n", rec.offset, rec.size, (unsigned int) buf);
flockfile(fd);
offset = myFseek(fd, (long int) rec.offset, SEEK_SET);
assert(rec.offset == offset);
if(1 != fread(buf, rec.size, 1, fd)) {
if(feof(fd)) { printf("Unexpected eof!\n"); fflush(NULL); abort(); }
if(ferror(fd)) { printf("Error reading stream! %d", ferror(fd)); fflush(NULL); abort(); }
readRecord(xid, p2, rawRid, &rec);
assert(rec.offset);
for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf);
}
funlockfile(fd);
}
/**
Examines the blob in question, marks it dirty, and returns the
appropriate file descriptor.
*/
static FILE * getDirtyFD(int xid, Page * p, lsn_t lsn, recordid rid) {
lsn_t * dirty = tripleHashLookup(xid, rid);
FILE * fd;
blob_record_t rec;
/* First, determine if the blob is dirty. */
recordid tmp = rid;
tmp.size = BLOB_SLOT;
readRecord(xid, p, tmp, &rec);
assert(rec.size == rid.size);
if(dirty) {
assert(lsn > *dirty);
*dirty = lsn; /* Updates value in triple hash (works because of pointer aliasing.) */
DEBUG("Blob already dirty.\n");
} else {
DEBUG("Marking blob dirty.\n");
tripleHashInsert(xid, rid, lsn);
/* Flip the fd bit on the record. */
rec.fd = rec.fd ? 0 : 1;
rid.size = BLOB_SLOT;
writeRecord(xid, p, lsn, rid, &rec);
}
fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */
return fd;
}
/* This function cannot be safely implemented on top of the current
blob implementation since at recovery, we have no way of knowing
whether or not a future write to the blob was performed. (This is
the same reason why we cannot steal pages whose LSN's may be too
low.
void setRangeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf, long offset, long length) {
FILE * fd;
int readcount;
blob_record_t rec;
fd = getDirtyFD(xid, p, lsn, rid);
readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t));
flockfile(fd);
offset = myFseek(fd, rec.offset + offset, SEEK_SET);
assert(offset == rec.offset);
readcount = fwrite(buf, length, 1, fd);
assert(1 == readcount);
fdatasync(fileno(fd));
funlockfile(fd);
} */
/** @todo dirtyBlobs should contain the highest LSN that wrote to the
current version of the dirty blob, and the lsn field should be
checked to be sure that it increases monotonically.
@todo Correctness / performance problem: Currently, we cannot
manually pin pages in memory, so the record pointing to the blob
may be stolen. Therefore, we must fdatasync() the blob file's
updates to disk each time writeBlob is called.
If we could pin the page, this problem would be solved, and
writeblob would not have to call fdatasync(). The same problem
applies to setRangeBlob.
*/
void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) {
long offset;
FILE * fd;
int readcount;
blob_record_t rec;
DEBUG("Writing blob (size %ld)\n", rid.size);
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), rid.size % USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf);
// printf("Write Chunk = %d (%d)\n", chunk, rec.offset+chunk);
fd = getDirtyFD(xid, p, lsn, rid);
recordid tmp = rid;
tmp.size = BLOB_SLOT;
readRecord(xid, p, tmp, &rec);
DEBUG("Writing at offset = %d, size = %ld\n", rec.offset, rec.size);
flockfile(fd);
offset = myFseek(fd, rec.offset, SEEK_SET);
assert(offset == rec.offset);
readcount = fwrite(buf, rec.size, 1, fd);
assert(1 == readcount);
#ifdef HAVE_FDATASYNC
fdatasync(fileno(fd));
#else
//#warn Not using fdatasync()
fsync(fileno(fd));
#endif
funlockfile(fd);
/* No need to update the raw blob record. */
}
/** @todo check to see if commitBlobs actually needs to flush blob
files when it's called (are there any dirty blobs associated with
this transaction?
@todo when writeBlob is fixed, add the fdatasync calls back into commitBlobs() Currently, it calls fdatasync on each blob write...
*/
void commitBlobs(int xid) {
flockfile(blobf1);
flockfile(blobf0);
/* fdatasync(fileno(blobf0));
fdatasync(fileno(blobf1)); */
funlockfile(blobf0);
funlockfile(blobf1);
abortBlobs(xid);
}
/**
Just clean up the dirty list for this xid. @todo Check return values.
(Functionally equivalent to the old rmTouch() function. Just
deletes this xid's dirty list.)
@todo The tripleHash data structure is overkill here. We only
need two layers of hash tables, but it works, and it would be a
pain to change it, unless we need to touch this file for some
other reason.
*/
void abortBlobs(int xid) {
/*
At first glance, it may seem easier to keep track of which blobs
are dirty only in blobManager, and then propogate those updates to
bufferManager later. It turns out that it's much easier to
propogate the changes to bufferManger, since otherwise, recovery
and undo have to reason about lazy propogation of values to the
bufferManager, and also have to preserve *write* ordering, even
though the writes may be across many transactions, and could be
propogated in the wrong order. If we generate a T set() (for the
blob record in bufferManager) for each write, things become much
easier.
*/
pthread_mutex_lock(&blob_hash_mutex);
pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int));
pblHashTable_t * this_bucket;
DEBUG("Blob cleanup xid=%d\n", xid);
if(rid_buckets) { /* Otherwise, there are no dirty blobs for this xid.. */
for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) {
lsn_t * rid_lsn;
int page_number = *(int*)pblHtCurrentKey(rid_buckets);
/* All right, this_bucket contains all of the rids for this page. */
for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) {
recordid * rid = pblHtCurrentKey(this_bucket);
/* page_number = rid->page; */
assert(page_number == rid->page);
pblHtRemove(this_bucket, 0, 0);/*rid, sizeof(recordid)); */
free(rid_lsn);
}
pblHtRemove(rid_buckets, 0, 0);
pblHtDelete(this_bucket);
}
pblHtDelete(rid_buckets);
pblHtRemove(dirtyBlobs, &xid, sizeof(int));
}
pthread_mutex_unlock(&blob_hash_mutex);
}
void abortBlobs(int xid) {
}

View file

@ -83,7 +83,7 @@ compensated_function recordid preAllocBlobFromPage(int xid, long page, long blob
*/
void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid);
void allocBlob(int xid, recordid rid);
void openBlobStore();
void closeBlobStore();

View file

@ -77,14 +77,7 @@
*/
//}end
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
if(rid.size >= BLOB_THRESHOLD_SIZE) {
// printf("Calling allocBlob\n");
slottedPostRalloc(xid, p, lsn, rid); // need to allocate record before writing to it (allocBlob writes to it.)
allocBlob(xid, p, lsn, rid);
} else {
slottedPostRalloc(xid, p, lsn, rid);
}
slottedPostRalloc(xid, p, lsn, rid);
return 0;
}
@ -151,7 +144,7 @@ void TallocInit() {
lastFreepage = UINT64_MAX;
}
compensated_function recordid TallocOld(int xid, long size) {
/*compensated_function recordid TallocOld(int xid, long size) {
recordid rid;
@ -205,14 +198,14 @@ compensated_function recordid TallocOld(int xid, long size) {
rid = slottedRawRalloc(p, size); // <--- Important part.
Tupdate(xid, rid, NULL, OPERATION_ALLOC); // <--- This hardcodes "slotted" Should we use TallocFromPage() instead?
/** @todo does releasePage do the correct error checking? <- Why is this comment here?*/
// @todo does releasePage do the correct error checking? <- Why is this comment here?
releasePage(p);
} compensate_ret(NULLRID);
}
return rid;
}
}*/
static compensated_function recordid TallocFromPageInternal(int xid, Page * p, unsigned long size);
@ -283,13 +276,8 @@ static compensated_function recordid TallocFromPageInternal(int xid, Page * p, u
type = size;
}
// begin_action_ret(pthread_mutex_unlock, &talloc_mutex, NULLRID) {
unsigned long slotSize = INVALID_SLOT;
// pthread_mutex_lock(&talloc_mutex);
slotSize = physical_slot_length(type);
assert(slotSize < PAGE_SIZE && slotSize > 0);
@ -304,10 +292,14 @@ static compensated_function recordid TallocFromPageInternal(int xid, Page * p, u
assert(rid.size == type);
rid.size = size;
Tupdate(xid, rid, NULL, OPERATION_ALLOC);
if(type == BLOB_SLOT) {
allocBlob(xid, rid);
}
rid.size = type;
}
// } compensate_ret(NULLRID);
// }
if(rid.size == type && // otherwise TallocFromPage failed
type == BLOB_SLOT // only special case blobs (for now)