Continued cleaning up allocation code. It seems really slow all of a sudden, so this commit probably introduced a performance bug.

This commit is contained in:
Sears Russell 2006-06-17 00:25:09 +00:00
parent 209a6916e5
commit 3e4cac0fe3
11 changed files with 168 additions and 109 deletions

View file

@ -144,16 +144,16 @@ terms specified in this license.
/* number above should be less than number below */ /* number above should be less than number below */
#define MAX_OPERATIONS 40 #define MAX_OPERATIONS 40
#define SLOT_TYPE_BASE PAGE_SIZE //#define SLOT_TYPE_BASE PAGE_SIZE
/** This constant is used as a placeholder to mark slot locations that are invalid. /** This constant is used as a placeholder to mark slot locations that are invalid.
@see slotted.c, indirect.c @see slotted.c, indirect.c
*/ */
#define INVALID_SLOT (SLOT_TYPE_BASE+0) #define INVALID_SLOT (-1)
/** This constant is used as a placeholder to mark slot locations that contain blobs. /** This constant is used as a placeholder to mark slot locations that contain blobs.
@see slotted.c, indirect.c, blobManager.c */ @see slotted.c, indirect.c, blobManager.c */
#define BLOB_SLOT (SLOT_TYPE_BASE + 1) #define BLOB_SLOT (-2)
#define SLOT_TYPE_END (SLOT_TYPE_BASE + 2) #define SLOT_TYPE_END (-3)
/** Initialized statically in transactional2.c */ /** Initialized statically in transactional2.c */
extern const short SLOT_TYPE_LENGTHS[]; extern const short SLOT_TYPE_LENGTHS[];

View file

@ -32,7 +32,7 @@ void TallocInit();
@return the recordid of the new record. @return the recordid of the new record.
*/ */
compensated_function recordid Talloc(int xid, long size); compensated_function recordid Talloc(int xid, unsigned long size);
compensated_function recordid TallocFromPage(int xid, long page, unsigned long size); compensated_function recordid TallocFromPage(int xid, long page, unsigned long size);

View file

@ -352,7 +352,7 @@ int Tcommit(int xid);
* @returns A new recordid. On success, this recordid's size will be * @returns A new recordid. On success, this recordid's size will be
* the requested size. On failure, its size will be zero. * the requested size. On failure, its size will be zero.
*/ */
recordid Talloc(int xid, long size); //recordid Talloc(int xid, long size);
/* @function Tabort /* @function Tabort
* @param xid transaction ID * @param xid transaction ID

View file

@ -29,7 +29,9 @@ int main(int argc, char** argv) {
int count = 0; int count = 0;
// bleah; gcc would warn without the casts, since it doesn't understand that %as = Allocate String // bleah; gcc would warn without the casts, since it doesn't understand that %as = Allocate String
while(EOF != (ret=scanf("%as\t%as\n", (float*)&key, (float*)&value))) { char ** keyp = &key; // The extra garbage is to avoid type punning warnings...
char ** valuep = &value;
while(EOF != (ret=scanf("%as\t%as\n", (float*)keyp, (float*)valuep))) {
if(!ret) { if(!ret) {
printf("Could not parse input!\n"); printf("Could not parse input!\n");
Tabort(xid); Tabort(xid);

View file

@ -9,9 +9,10 @@ int main(int argc, char** argv) {
Tinit(); Tinit();
recordid hash = {1, 0, 48}; recordid hash = {1, 0, 48};
char * val; byte * val;
if(-1 != ThashLookup(-1, hash, (byte*)argv[1], strlen(argv[1]), (byte**)&val)) { byte ** argvb = (byte**) argv;
printf("%s\n", val); if(-1 != ThashLookup(-1, hash, argvb[1], strlen(argv[1]), &val)) {
printf("%s\n", (char*)val);
free(val); free(val);
} }
Tdeinit(); Tdeinit();

View file

@ -26,24 +26,6 @@ static FILE * blobf0 = NULL, * blobf1 = NULL;
*/ */
static pblHashTable_t * dirtyBlobs; static pblHashTable_t * dirtyBlobs;
/** Plays a nasty trick on bufferManager to force it to read and write
blob_record_t items for us. Relies upon bufferManager (and
page.c's) trust in the rid.size field... */
static void readRawRecord(int xid, Page * p, recordid rid, void * buf, int size) {
recordid blob_rec_rid = rid;
blob_rec_rid.size = size;
readRecord(xid, p, blob_rec_rid, buf);
/* T read(xid, blob_rec_rid, buf); */
}
static void writeRawRecord(int xid, Page * p, recordid rid, lsn_t lsn, const void * buf, int size) {
recordid blob_rec_rid = rid;
blob_rec_rid.size = size;
writeRecord(xid, p, lsn, blob_rec_rid, buf);
/* T set(xid, blob_rec_rid, buf); - We no longer need to write a log
record out here, since we're called by something that is the
result of a log record.*/
}
static lsn_t * tripleHashLookup(int xid, recordid rid) { static lsn_t * tripleHashLookup(int xid, recordid rid) {
lsn_t * ret; lsn_t * ret;
pthread_mutex_lock(&blob_hash_mutex); pthread_mutex_lock(&blob_hash_mutex);
@ -253,6 +235,8 @@ void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid) {
DEBUG("post Allocing blob (size %ld)\n", rid.size); DEBUG("post Allocing blob (size %ld)\n", rid.size);
assert(rid.size > 0); /* Don't support zero length blobs right now... */
/** Finally, fix up the fields in the record that points to the blob. /** Finally, fix up the fields in the record that points to the blob.
The rest of this also should go into alloc.c The rest of this also should go into alloc.c
*/ */
@ -271,8 +255,6 @@ void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid) {
/* Allocate space for the blob entry. */ /* Allocate space for the blob entry. */
assert(rid.size > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */ /* First in buffer manager. */
/* Read in record to get the correct offset, size for the blob*/ /* Read in record to get the correct offset, size for the blob*/
@ -298,14 +280,13 @@ void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid) {
funlockfile(blobf0); funlockfile(blobf0);
funlockfile(blobf1); funlockfile(blobf1);
/* T set() needs to know to 'do the right thing' here, since we've /*
changed the size it has recorded for this record, and (This call must be after the files have been extended, and synced
writeRawRecord makes sure that that is the case. to disk, since it marks completion of the blob allocation.)
(This call must be after the files have been extended, and synced to disk, since it marks completion of the blob allocation.)
*/ */
writeRawRecord (xid, p, rid, lsn, &blob_rec, sizeof(blob_record_t)); rid.size = BLOB_SLOT;
assert(blob_rec.size);
writeRecord(xid, p, lsn, rid, &blob_rec);
} }
@ -320,8 +301,10 @@ void readBlob(int xid, Page * p, recordid rid, void * buf) {
assert(buf); assert(buf);
readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t)); recordid blob_rec_rid = rid;
blob_rec_rid.size = BLOB_SLOT;
readRecord(xid, p, blob_rec_rid, &rec);
fd = rec.fd ? blobf1 : blobf0; fd = rec.fd ? blobf1 : blobf0;
@ -355,8 +338,9 @@ static FILE * getDirtyFD(int xid, Page * p, lsn_t lsn, recordid rid) {
/* First, determine if the blob is dirty. */ /* First, determine if the blob is dirty. */
/* T read() raw record */ recordid tmp = rid;
readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t)); tmp.size = BLOB_SLOT;
readRecord(xid, p, tmp, &rec);
assert(rec.size == rid.size); assert(rec.size == rid.size);
@ -370,8 +354,8 @@ static FILE * getDirtyFD(int xid, Page * p, lsn_t lsn, recordid rid) {
/* Flip the fd bit on the record. */ /* Flip the fd bit on the record. */
rec.fd = rec.fd ? 0 : 1; rec.fd = rec.fd ? 0 : 1;
/* T set() raw record */ rid.size = BLOB_SLOT;
writeRawRecord(xid, p, rid, lsn, &rec, sizeof(blob_record_t)); writeRecord(xid, p, lsn, rid, &rec);
} }
fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */ fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */
@ -422,7 +406,10 @@ void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) {
DEBUG("Writing blob (size %ld)\n", rid.size); DEBUG("Writing blob (size %ld)\n", rid.size);
fd = getDirtyFD(xid, p, lsn, rid); fd = getDirtyFD(xid, p, lsn, rid);
readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t));
recordid tmp = rid;
tmp.size = BLOB_SLOT;
readRecord(xid, p, tmp, &rec);
DEBUG("Writing at offset = %d, size = %ld\n", rec.offset, rec.size); DEBUG("Writing at offset = %d, size = %ld\n", rec.offset, rec.size);
@ -452,8 +439,8 @@ void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) {
@todo when writeBlob is fixed, add the fdatasync calls back into commitBlobs(). @todo when writeBlob is fixed, add the fdatasync calls back into commitBlobs().
*/ */
void commitBlobs(int xid) { void commitBlobs(int xid) {
flockfile(blobf0);
flockfile(blobf1); flockfile(blobf1);
flockfile(blobf0);
/* fdatasync(fileno(blobf0)); /* fdatasync(fileno(blobf0));
fdatasync(fileno(blobf1)); */ fdatasync(fileno(blobf1)); */
funlockfile(blobf0); funlockfile(blobf0);

View file

@ -78,7 +78,9 @@
//}end //}end
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
if(rid.size >= BLOB_THRESHOLD_SIZE && rid.size != BLOB_SLOT) { if(rid.size >= BLOB_THRESHOLD_SIZE) {
// printf("Calling allocBlob\n");
slottedPostRalloc(xid, p, lsn, rid); // need to allocate record before writing to it (allocBlob writes to it.)
allocBlob(xid, p, lsn, rid); allocBlob(xid, p, lsn, rid);
} else { } else {
slottedPostRalloc(xid, p, lsn, rid); slottedPostRalloc(xid, p, lsn, rid);
@ -96,10 +98,10 @@ static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * da
static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) { static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
if(rid.size >= BLOB_THRESHOLD_SIZE && rid.size != BLOB_SLOT) { // if(rid.size >= BLOB_THRESHOLD_SIZE) { // && rid.size != BLOB_SLOT) {
// rid.size = BLOB_REC_SIZE; /* Don't reuse blob space yet... */ // rid.size = BLOB_REC_SIZE; /* Don't reuse blob space yet... */
rid.size = sizeof(blob_record_t); // rid.size = BLOB_SLOT; //sizeof(blob_record_t);
} // }
slottedPostRalloc(xid, p, lsn, rid); slottedPostRalloc(xid, p, lsn, rid);
/** @todo dat should be the pointer to the space in the blob store. */ /** @todo dat should be the pointer to the space in the blob store. */
@ -149,7 +151,7 @@ void TallocInit() {
lastFreepage = UINT64_MAX; lastFreepage = UINT64_MAX;
} }
compensated_function recordid Talloc(int xid, long size) { compensated_function recordid TallocOld(int xid, long size) {
recordid rid; recordid rid;
@ -212,35 +214,105 @@ compensated_function recordid Talloc(int xid, long size) {
} }
static compensated_function recordid TallocFromPageInternal(int xid, Page * p, unsigned long size);
compensated_function recordid Talloc(int xid, unsigned long size) {
short type;
if(size >= BLOB_THRESHOLD_SIZE) {
type = BLOB_SLOT;
} else {
type = size;
}
recordid rid;
begin_action_ret(pthread_mutex_unlock, &talloc_mutex, NULLRID) {
pthread_mutex_lock(&talloc_mutex);
Page * p;
if(lastFreepage == UINT64_MAX) {
try_ret(NULLRID) {
lastFreepage = TpageAlloc(xid);
} end_ret(NULLRID);
try_ret(NULLRID) {
p = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
assert(*page_type_ptr(p) == UNINITIALIZED_PAGE);
slottedPageInitialize(p);
} else {
try_ret(NULLRID) {
p = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
}
if(slottedFreespace(p) < physical_slot_length(type) ) {
// XXX compact page?!?
releasePage(p);
try_ret(NULLRID) {
lastFreepage = TpageAlloc(xid);
} end_ret(NULLRID);
try_ret(NULLRID) {
p = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
slottedPageInitialize(p);
}
rid = TallocFromPageInternal(xid, p, size);
} compensate_ret(NULLRID);
return rid;
}
compensated_function recordid TallocFromPage(int xid, long page, unsigned long size) { compensated_function recordid TallocFromPage(int xid, long page, unsigned long size) {
Page * p = loadPage(xid, page);
recordid ret = TallocFromPageInternal(xid, p, size);
releasePage(p);
return ret;
}
static compensated_function recordid TallocFromPageInternal(int xid, Page * p, unsigned long size) {
recordid rid; recordid rid;
// Does TallocFromPage need to understand blobs? This function // Does TallocFromPage need to understand blobs? This function
// seems to be too complex; all it does it delegate the allocation // seems to be too complex; all it does it delegate the allocation
// request to the page type's implementation. (Does it really need // request to the page type's implementation. (Does it really need
// to check for freespace?) // to check for freespace?)
if(size >= BLOB_THRESHOLD_SIZE && size != BLOB_SLOT) {
try_ret(NULLRID) {
rid = preAllocBlobFromPage(xid, page, size); // <--- ICK!!! Kill this function.
Tupdate(xid,rid, NULL, OPERATION_ALLOC);
} end_ret(NULLRID);
} else {
begin_action_ret(pthread_mutex_unlock, &talloc_mutex, NULLRID) {
Page * p = loadPage(xid, page);
pthread_mutex_lock(&talloc_mutex);
if(slottedFreespace(p) < size) { short type;
slottedCompact(p); if(size >= BLOB_THRESHOLD_SIZE) {
} type = BLOB_SLOT;
if(slottedFreespace(p) < size) { } else {
rid = NULLRID; type = size;
} else { }
rid = slottedRawRalloc(p, size);
assert(rid.size == size); // begin_action_ret(pthread_mutex_unlock, &talloc_mutex, NULLRID) {
Tupdate(xid, rid, NULL, OPERATION_ALLOC);
}
releasePage(p); unsigned long slotSize = INVALID_SLOT;
} compensate_ret(NULLRID);
// pthread_mutex_lock(&talloc_mutex);
slotSize = physical_slot_length(type);
assert(slotSize < PAGE_SIZE && slotSize > 0);
if(slottedFreespace(p) < slotSize) {
slottedCompact(p);
}
if(slottedFreespace(p) < slotSize) {
rid = NULLRID;
} else {
rid = slottedRawRalloc(p, type);
assert(rid.size == type);
rid.size = size;
Tupdate(xid, rid, NULL, OPERATION_ALLOC);
rid.size = type;
}
// } compensate_ret(NULLRID);
// }
if(rid.size == type && // otherwise TallocFromPage failed
type == BLOB_SLOT // only special case blobs (for now)
) {
rid.size = size;
} }
return rid; return rid;
} }

View file

@ -142,7 +142,7 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
writelock(p->rwlatch, 225); writelock(p->rwlatch, 225);
pageWriteLSN(xid, p, lsn); pageWriteLSN(xid, p, lsn);
unlock(p->rwlatch); unlock(p->rwlatch);
if(rid.size > BLOB_THRESHOLD_SIZE) { if(rid.size > BLOB_THRESHOLD_SIZE) {
writeBlob(xid, p, lsn, rid, dat); writeBlob(xid, p, lsn, rid, dat);
} else if(*page_type_ptr(p) == SLOTTED_PAGE) { } else if(*page_type_ptr(p) == SLOTTED_PAGE) {
@ -161,7 +161,7 @@ int readRecord(int xid, Page * p, recordid rid, void *buf) {
int page_type = *page_type_ptr(p); int page_type = *page_type_ptr(p);
if(rid.size > BLOB_THRESHOLD_SIZE) { if(rid.size > BLOB_THRESHOLD_SIZE) {
readBlob(xid, p, rid, buf); readBlob(xid, p, rid, buf);
} else if(page_type == SLOTTED_PAGE) { } else if(page_type == SLOTTED_PAGE) {
slottedRead(xid, p, rid, buf); slottedRead(xid, p, rid, buf);

View file

@ -61,7 +61,7 @@ void slottedCompact(Page * page) {
really_do_ralloc(&bufPage, rid); really_do_ralloc(&bufPage, rid);
memcpy(record_ptr(&bufPage, rid.slot), record_ptr(page, rid.slot), rid.size); memcpy(record_ptr(&bufPage, rid.slot), record_ptr(page, rid.slot), physical_slot_length(rid.size));
} }
} }
@ -187,12 +187,8 @@ size_t slottedFreespace(Page * page) {
recordid slottedRawRalloc(Page * page, int size) { recordid slottedRawRalloc(Page * page, int size) {
int type = size; int type = size;
if(type >= SLOT_TYPE_BASE) { size = physical_slot_length(type);
assert(type < SLOT_TYPE_END);
size = SLOT_TYPE_LENGTHS[type-SLOT_TYPE_BASE];
}
assert(type != INVALID_SLOT); assert(type != INVALID_SLOT);
assert(size < SLOT_TYPE_BASE && size >= 0);
writelock(page->rwlatch, 342); writelock(page->rwlatch, 342);
assert(*page_type_ptr(page) == SLOTTED_PAGE); assert(*page_type_ptr(page) == SLOTTED_PAGE);
@ -201,7 +197,7 @@ recordid slottedRawRalloc(Page * page, int size) {
rid.page = page->id; rid.page = page->id;
rid.slot = *numslots_ptr(page); rid.slot = *numslots_ptr(page);
rid.size = size; rid.size = type; // The rid should reflect the fact that this is a special slot.
/* The freelist_ptr points to the first free slot number, which /* The freelist_ptr points to the first free slot number, which
is the head of a linked list of free slot numbers.*/ is the head of a linked list of free slot numbers.*/
@ -213,10 +209,9 @@ recordid slottedRawRalloc(Page * page, int size) {
really_do_ralloc(page, rid); really_do_ralloc(page, rid);
assert(size == *slot_length_ptr(page, rid.slot)); assert(type == *slot_length_ptr(page, rid.slot));
assert(size == physical_slot_length(*slot_length_ptr(page, rid.slot)));
*slot_length_ptr(page, rid.slot) = type;
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */ /* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
assert(slottedFreespaceUnlocked(page) >= 0); assert(slottedFreespaceUnlocked(page) >= 0);
@ -240,21 +235,19 @@ static void really_do_ralloc(Page * page, recordid rid) {
short freeSpace; short freeSpace;
int isBlob = 0; /* int isBlob = 0;
if(rid.size == BLOB_SLOT) { if(rid.size == BLOB_SLOT) {
isBlob = 1; isBlob = 1;
rid.size = sizeof(blob_record_t); rid.size = sizeof(blob_record_t);
} } */
assert(rid.size > 0);
// Compact the page if we don't have enough room. // Compact the page if we don't have enough room.
if(slottedFreespaceForSlot(page, rid.slot) < rid.size) { if(slottedFreespaceForSlot(page, rid.slot) < physical_slot_length(rid.size)) {
slottedCompact(page); slottedCompact(page);
// Make sure we have enough enough free space for the new record // Make sure we have enough enough free space for the new record
assert (slottedFreespaceForSlot(page, rid.slot) >= rid.size); assert (slottedFreespaceForSlot(page, rid.slot) >= physical_slot_length(rid.size));
} }
freeSpace = *freespace_ptr(page); freeSpace = *freespace_ptr(page);
@ -329,15 +322,15 @@ static void really_do_ralloc(Page * page, recordid rid) {
// Reserve space for this record and record the space's offset in // Reserve space for this record and record the space's offset in
// the slot header. // the slot header.
*freespace_ptr(page) = freeSpace + rid.size; *freespace_ptr(page) = freeSpace + physical_slot_length(rid.size);
*slot_ptr(page, rid.slot) = freeSpace; *slot_ptr(page, rid.slot) = freeSpace;
// Remember how long this record is // Remember how long this record is
if(isBlob) { // if(isBlob) {
*slot_length_ptr(page, rid.slot = BLOB_SLOT); //*slot_length_ptr(page, rid.slot = BLOB_SLOT);
} else { //} else {
*slot_length_ptr(page, rid.slot) = rid.size; *slot_length_ptr(page, rid.slot) = rid.size;
} //}
} }
@ -388,7 +381,8 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
// @todo Check to see that the blob is the right size? // @todo Check to see that the blob is the right size?
assert((rid.size == *slot_length_ptr(page, rid.slot)) || assert((rid.size == *slot_length_ptr(page, rid.slot)) ||
(*slot_length_ptr(page, rid.slot) >= PAGE_SIZE)); (*slot_length_ptr(page, rid.slot) >= PAGE_SIZE) ||
(rid.size >= BLOB_THRESHOLD_SIZE));
} }
@ -419,9 +413,9 @@ void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) {
assert(page->id == rid.page); assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot); slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE)); assert((rid.size == slot_length)); // || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) { if(!memcpy(buff, record_ptr(page, rid.slot), physical_slot_length(rid.size))) {
perror("memcpy"); perror("memcpy");
abort(); abort();
} }
@ -440,6 +434,8 @@ void slottedRead(int xid, Page * page, recordid rid, byte *buff) {
int slot_length; int slot_length;
readlock(page->rwlatch, 519); readlock(page->rwlatch, 519);
// printf("Reading from rid = {%d,%d,%d (%d)}\n", rid.page, rid.slot, rid.size, physical_slot_length(rid.size));
assert(page->id == rid.page); assert(page->id == rid.page);
// DELETE THIS // DELETE THIS
@ -450,10 +446,10 @@ void slottedRead(int xid, Page * page, recordid rid, byte *buff) {
// END DELETE THIS // END DELETE THIS
slot_length = *slot_length_ptr(page, rid.slot); slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE)); assert((rid.size == slot_length)); // || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) { if(!memcpy(buff, record_ptr(page, rid.slot), physical_slot_length(rid.size))) {
perror("memcpy"); perror("memcpy");
abort(); abort();
} }
@ -467,34 +463,33 @@ void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat
readlock(page->rwlatch, 529); readlock(page->rwlatch, 529);
// printf("Writing to rid = {%d,%d,%d}\n", rid.page, rid.slot, rid.size);
assert(rid.size < PAGE_SIZE);
// assert(rid.size < PAGE_SIZE);
assert(page->id == rid.page); assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot); slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE)); assert((rid.size == slot_length)); // || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
if(!memcpy(record_ptr(page, rid.slot), data, rid.size)) { if(!memcpy(record_ptr(page, rid.slot), data, physical_slot_length(rid.size))) {
perror("memcpy"); perror("memcpy");
abort(); abort();
} }
/*page->LSN = lsn;
*lsn_ptr(page) = lsn * /
pageWriteLSN-page); */
unlock(page->rwlatch); unlock(page->rwlatch);
} }
void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) { void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) {
int slot_length; int slot_length;
assert(rid.size < PAGE_SIZE); // assert(rid.size < PAGE_SIZE);
assert(page->id == rid.page); assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot); slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE)); assert((rid.size == slot_length)); // || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
if(!memcpy(record_ptr(page, rid.slot), data, rid.size)) { if(!memcpy(record_ptr(page, rid.slot), data, physical_slot_length(rid.size))) {
perror("memcpy"); perror("memcpy");
abort(); abort();
} }

View file

@ -79,6 +79,8 @@ void slottedPageInitialize(Page * p);
#define record_ptr(page, n) bytes_from_start((page), *slot_ptr((page), (n))) #define record_ptr(page, n) bytes_from_start((page), *slot_ptr((page), (n)))
#define isValidSlot(page, n) ((*slot_ptr((page), (n)) == INVALID_SLOT) ? 0 : 1) #define isValidSlot(page, n) ((*slot_ptr((page), (n)) == INVALID_SLOT) ? 0 : 1)
#define physical_slot_length(size) ((size) >= 0 ? (size) : SLOT_TYPE_LENGTHS[-1*size])
/** /**
* allocate a record. This must be done in two phases. The first * allocate a record. This must be done in two phases. The first
* phase reserves a slot, and produces a log entry. The second phase * phase reserves a slot, and produces a log entry. The second phase

View file

@ -22,7 +22,7 @@ int xidCount = 0;
const recordid ROOT_RECORD = {1, 0, -1}; const recordid ROOT_RECORD = {1, 0, -1};
const recordid NULLRID = {0,0,-1}; const recordid NULLRID = {0,0,-1};
const short SLOT_TYPE_LENGTHS[] = { 0, sizeof(blob_record_t), -1}; const short SLOT_TYPE_LENGTHS[] = { 0, 0, sizeof(blob_record_t), -1};
/** /**
Locking for transactional2.c works as follows: Locking for transactional2.c works as follows: