bufferMananger is *really* reentrant now! (I think) However, the same thread may not load a page more than once. So, LLADD has to be altered so that it never calls loadPage() more than once per operation (more efficient), and/or pin/unpin need to be implemented. An easy way to do pin/unpin is to have them pull things out of the cache replacement list, and then re-insert them.

This commit is contained in:
Sears Russell 2004-07-21 02:13:28 +00:00
parent 0ce77903fb
commit 126ae31392
16 changed files with 333 additions and 142 deletions

140
ChangeLog
View file

@ -0,0 +1,140 @@
2004-07-20 sears <sears@Morphix>
* bufferManager.c, common.c, operations/alloc.c, page.c, page.h, pageCache.c, pageFile.c, transactional2.c:
Continuing work on multi-threading. r/w access to buffer manager getting close, but still buggy.
* logger/logWriter.c, pageFile.c, pageFile.h, bufferManager.c, common.c, latches.h, page.c, page.h, pageCache.c:
pageCache.c is now re-entrant.
2004-07-15 sears <sears@Morphix>
* Makefile.am, blobManager.c, bufferManager.c, logger/logWriter.c, page.h, pageCache.c, pageFile.c, pageFile.h:
pageCache is now re-entrant, in theory.
2004-07-14 sears <sears@Morphix>
* blobManager.c, blobManager.h, bufferManager.c, logger/logHandle.c, logger/logHandle.h, logger/logWriter.c, logger/logWriter.h, logger/logger2.c, operations.c, operations/alloc.c, operations/prepare.c, page.c, page.h, pageCache.c, recovery2.c, transactional2.c:
Moved page.h and some of the logging headers out of the public API.
* blobManager.c, bufferManager.c, latches.h, logger/logEntry.c, logger/logWriter.c, logger/logger2.c, operations.c, operations/alloc.c, page.c, pageCache.c, recovery2.c, transactional2.c:
Cleaning up bufferManager / page for locking. Want to limit access to the Page struct.
2004-07-13 sears <sears@Morphix>
* page.c, pageCache.c:
page.c is re-entrant (mostly), and now reuses DeRalloced space properly. (For now, BufferManager still is not re-entrant, and also prevents space from being reused.)
2004-07-09 sears <sears@Morphix>
* bufferManager.c, page.c: Made pageWriteLSN static.
2004-07-06 sears <sears@Morphix>
* bufferManager.c, page.c: More documentation fixes.
* blobManager.h: Fixed groupings for LLADD API docs.
* operations/alloc.c, operations/lladdhash.c, logger/logWriter.c, logger/logger2.c, logger/logEntry.c, operations.c, page.c, pageCache.c, recovery2.c, stats.c, transactional2.c, Makefile.am, blobManager.c, blobManager.h, bufferManager.c, common.c, latches.h, linkedlist.c:
Ported LLADD to Fedora, cleaned up autoconf setup, and numerouse #includes that were problematic.
2004-07-04 sears <sears@Morphix>
* Makefile.am, blobManager.c, blobManager.h, bufferManager.c, common.c, logger/logHandle.c, logger/logWriter.c, logger/logger2.c, operations.c, operations/alloc.c, page.c, pageCache.c, recovery2.c, stats.c, transactional2.c:
Documentation update, added latch profiling tools.
2004-07-01 sears <sears@Morphix>
* logger/logHandle.c, logger/logWriter.c, operations.c:
Log truncation. (But no checkpoints, so it doesn't get called... it does pass testing though. :)
2004-06-30 sears <sears@Morphix>
* Makefile.am, blobManager.c, blobManager.h, bufferManager.c, logger/logWriter.c, operations/alloc.c, operations/prepare.c, page.c, pageCache.c, transactional2.c:
Logwriter can now handler partial log entries correctly (it ignores them)
Buffer Mananger no longer steals pages pre-maturely
Alloc is an operation, and correct.
Caching broken out into a new module
Wrote a utility to translate a log file into human-readable ascii.
2004-06-28 sears <sears@Morphix>
* logger/logstreamer.h, operations/alloc.c, operations/prepare.c, blobManager.c, blobManager.h, bufferManager.c, logger/logEntry.c, logger/logWriter.c, logger/logger.c, logger/logger2.c, logger/logparser.c, logger/logparser.h, logger/logstreamer.c, page.c, recovery.c, recovery.h, recovery2.c, transactional.c, transactional2.c:
Cleaned out old cruft, such as commented out code, dead data structures, and dead files. sloccount went from $75,000 to $50,000 on src/lladd. ;)
* blobManager.c, blobManager.h, bufferManager.c, logger/logger2.c, operations.c, operations/alloc.c, operations/decrement.c, operations/increment.c, operations/set.c, page.c, recovery2.c, transactional2.c:
Bugfixes ; blobs pass regression. Next stop: Delete old cruft.
2004-06-26 sears <sears@Morphix>
* blobManager.c: Wrote blobmanager, didn't compile it yet.
2004-06-25 sears <sears@Morphix>
* blobManager.c, blobManager.h: blob manager commit.
* Makefile.am, bufferManager.c, page.c:
Preliminary implementation of blobManager
2004-06-24 sears <sears@Morphix>
* logger/.deps/logger.Po, logger/.deps/logparser.Po, logger/.deps/logstreamer.Po:
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
* logger/.deps/logger.Po, logger/.deps/logparser.Po, logger/.deps/logstreamer.Po:
New file.
* logger/logEntry.c, logger/logWriter.c, logger/logger.c, logger/logparser.c, logger/logparser.h:
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
* logger/logEntry.c, logger/logWriter.c, logger/logger.c, logger/logparser.c, logger/logparser.h:
New file.
* logger/Makefile.am-old, logger/logHandle.c, logger/logger2.c, logger/logstreamer.c, logger/logstreamer.h, operations/.deps/increment.Po, operations/.deps/set.Po:
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
* logger/Makefile.am-old, logger/logHandle.c, logger/logger2.c, logger/logstreamer.c, logger/logstreamer.h, operations/.deps/increment.Po, operations/.deps/set.Po:
New file.
* operations/.deps/decrement.Po, operations/.deps/lladdhash.Po, operations/.deps/prepare.Po, operations/alloc.c, operations/decrement.c, operations/increment.c, operations/lladdhash.c, operations/prepare.c:
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
* operations/.deps/decrement.Po, operations/.deps/lladdhash.Po, operations/.deps/prepare.Po, operations/alloc.c, operations/decrement.c, operations/increment.c, operations/lladdhash.c, operations/prepare.c:
New file.
* linkedlist.c, linkedlist.h, operations.c, operations/Makefile.am-old, operations/set.c, transactional.c:
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
* linkedlist.c, linkedlist.h, operations.c, operations/Makefile.am-old, operations/set.c, transactional.c:
New file.
* page.c, transactional2.c:
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
* page.c, transactional2.c: New file.
* Makefile.am, bufferManager.c, recovery.c, recovery.h:
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
* Makefile.am, bufferManager.c, recovery.c, recovery.h: New file.
* 1, recovery2.c:
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
* 1, recovery2.c: New file.

View file

@ -99,7 +99,7 @@ extern int errno;
#define lsn_t long #define lsn_t long
/*#define DEBUGGING */ /*#define DEBUGGING */
#define PROFILE_LATCHES #define PROFILE_LATCHES
#ifdef DEBUGGING #ifdef DEBUGGING

View file

@ -83,6 +83,7 @@ void downgradelock(rwl * lock) {
assert(lock->writers); assert(lock->writers);
lock->writers--; lock->writers--;
lock->readers++; lock->readers++;
pthread_cond_signal (lock->writeOK);
pthread_cond_broadcast(lock->readOK); pthread_cond_broadcast(lock->readOK);
pthread_mutex_unlock(lock->mut); pthread_mutex_unlock(lock->mut);
} }

View file

@ -135,13 +135,17 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT); assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT);
pthread_mutex_lock(&lastFreepage_mutex);
pthread_mutex_lock(&lastFreepage_mutex);
while(freespace(p = loadPage(lastFreepage)) < size ) { unlock(p->loadlatch); lastFreepage++; } while(freespace(p = loadPage(lastFreepage)) < size ) {
unlock(p->loadlatch);
lastFreepage++;
}
ret = pageRalloc(p, size); ret = pageRalloc(p, size);
unlock(p->loadlatch); unlock(p->loadlatch);
pthread_mutex_unlock(&lastFreepage_mutex); pthread_mutex_unlock(&lastFreepage_mutex);
/* DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); */ /* DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); */
@ -235,7 +239,7 @@ void setSlotType(int pageid, int slot, int type) {
@see finalize, removePendingEvent @see finalize, removePendingEvent
*/ */
void addPendingEvent(int pageid){ /*void addPendingEvent(int pageid){
Page * p; Page * p;
@ -266,7 +270,7 @@ void addPendingEvent(int pageid){
unlock(p->loadlatch); unlock(p->loadlatch);
} }*/
/** /**
@ -281,7 +285,7 @@ void addPendingEvent(int pageid){
@todo as implemented, loadPage() ... doOperation is not atomic! @todo as implemented, loadPage() ... doOperation is not atomic!
*/ */
void removePendingEvent(int pageid) { /*void removePendingEvent(int pageid) {
Page * p; Page * p;
@ -303,6 +307,6 @@ void removePendingEvent(int pageid) {
unlock(p->loadlatch); unlock(p->loadlatch);
} }*/

View file

@ -46,7 +46,8 @@ int __lladd_pthread_mutex_lock(lladd_pthread_mutex_t *mutex, char * file, int li
pthread_yield(); pthread_yield();
if(blockCount >= 10000 && ! (blockCount % 10000)) { if(blockCount >= 10000 && ! (blockCount % 10000)) {
DEBUG("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at); printf("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at);
fflush(NULL);
} }
} }

View file

@ -201,10 +201,10 @@ int writeLogEntry(LogEntry * e) {
const long size = sizeofLogEntry(e); const long size = sizeofLogEntry(e);
if(e->type == UPDATELOG) { if(e->type == UPDATELOG) {
addPendingEvent(e->contents.update.rid.page); /* addPendingEvent(e->contents.update.rid.page); */
} }
if(e->type == CLRLOG) { if(e->type == CLRLOG) {
addPendingEvent(e->contents.clr.rid.page); /* addPendingEvent(e->contents.clr.rid.page); */
} }
if(e->xid == -1) { /* Don't write log entries for recovery xacts. */ if(e->xid == -1) { /* Don't write log entries for recovery xacts. */

View file

@ -51,11 +51,12 @@ Operation operationsTable[MAX_OPERATIONS];
void doUpdate(const LogEntry * e) { void doUpdate(const LogEntry * e) {
DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN); DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN);
operationsTable[e->contents.update.funcID].run(e->xid, e->LSN, e->contents.update.rid, getUpdateArgs(e)); operationsTable[e->contents.update.funcID].run(e->xid, e->LSN, e->contents.update.rid, getUpdateArgs(e));
removePendingEvent(e->contents.update.rid.page); /* removePendingEvent(e->contents.update.rid.page); */
} }
@ -70,7 +71,7 @@ void redoUpdate(const LogEntry * e) {
doUpdate(e); doUpdate(e);
} else { } else {
DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %ld}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size); DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %ld}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size);
removePendingEvent(e->contents.update.rid.page); /* removePendingEvent(e->contents.update.rid.page); */
} }
} else if(e->type == CLRLOG) { } else if(e->type == CLRLOG) {
LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN); LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN);
@ -85,7 +86,7 @@ void redoUpdate(const LogEntry * e) {
undoUpdate(f, e->LSN); undoUpdate(f, e->LSN);
} else { } else {
DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %ld}\n", f->LSN, rid.page, rid.slot, rid.size); DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %ld}\n", f->LSN, rid.page, rid.slot, rid.size);
removePendingEvent(e->contents.update.rid.page); /* removePendingEvent(e->contents.update.rid.page); */
} }
} else { } else {
assert(0); assert(0);
@ -121,7 +122,7 @@ void undoUpdate(const LogEntry * e, lsn_t clr_lsn) {
DEBUG("OPERATION Skipping undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size); DEBUG("OPERATION Skipping undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size);
} }
removePendingEvent(e->contents.update.rid.page); /* removePendingEvent(e->contents.update.rid.page); */
/* printf("Undo done."); fflush(NULL); */ /* printf("Undo done."); fflush(NULL); */

View file

@ -96,7 +96,7 @@ terms specified in this license.
#include <stdio.h> #include <stdio.h>
#include <lladd/constants.h> #include <lladd/constants.h>
#include <assert.h>
/* TODO: Combine with buffer size... */ /* TODO: Combine with buffer size... */
static int nextPage = 0; static int nextPage = 0;
@ -144,8 +144,8 @@ static pthread_mutex_t pageAllocMutex;
Page pool[MAX_BUFFER_SIZE+1]; Page pool[MAX_BUFFER_SIZE+1];
int isValidSlot(byte *memAddr, int slot); static int isValidSlot(byte *memAddr, int slot);
void invalidateSlot(byte *memAddr, int slot); static void invalidateSlot(byte *memAddr, int slot);
void pageDeRalloc(Page * page, recordid rid); void pageDeRalloc(Page * page, recordid rid);
/** /**
@ -248,9 +248,9 @@ static const byte *slotMemAddr(const byte *memAddr, int slotNum) {
lsn_t pageReadLSN(const Page * page) { lsn_t pageReadLSN(const Page * page) {
lsn_t ret; lsn_t ret;
/* readlock(page->rwlatch, 259); */ readlock(page->rwlatch, 259);
ret = *(long *)(page->memAddr + START_OF_LSN); ret = *(long *)(page->memAddr + START_OF_LSN);
/* readunlock(page->rwlatch); */ readunlock(page->rwlatch);
return ret; return ret;
} }
@ -263,8 +263,9 @@ lsn_t pageReadLSN(const Page * page) {
* @param page You must have a writelock on page before calling this function. * @param page You must have a writelock on page before calling this function.
*/ */
static void pageWriteLSN(Page * page) { static void pageWriteLSN(Page * page) {
/* unlocked since we're only called by a function that holds the writelock. */
*(long *)(page->memAddr + START_OF_LSN) = page->LSN; *(long *)(page->memAddr + START_OF_LSN) = page->LSN;
} }
static int unlocked_freespace(Page * page); static int unlocked_freespace(Page * page);
@ -420,19 +421,19 @@ recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) {
return rid; return rid;
} }
int isValidSlot(byte *memAddr, int slot) { static int isValidSlot(byte *memAddr, int slot) {
return (getSlotOffset(memAddr, slot) != INVALID_SLOT) ? 1 : 0; return (getSlotOffset(memAddr, slot) != INVALID_SLOT) ? 1 : 0;
} }
void invalidateSlot(byte *memAddr, int slot) { static void invalidateSlot(byte *memAddr, int slot) {
setSlotOffset(memAddr, slot, INVALID_SLOT); setSlotOffset(memAddr, slot, INVALID_SLOT);
} }
void pageDeRalloc(Page * page, recordid rid) { void pageDeRalloc(Page * page, recordid rid) {
writelock(page->rwlatch, 416); /* Don't need any locking, since we don't support concurrent access to the same slot.. */
invalidateSlot(page->memAddr, rid.slot); invalidateSlot(page->memAddr, rid.slot);
writeunlock(page->rwlatch);
} }
/** /**
@ -524,10 +525,10 @@ static void setSlotOffset(byte *memAddr, int slot, int offset) {
static void setSlotLength(byte *memAddr, int slot, int length) { static void setSlotLength(byte *memAddr, int slot, int length) {
setSecondHalfOfWord((int*)(unsigned int*)slotMemAddr(memAddr, slot), length); setSecondHalfOfWord((int*)(unsigned int*)slotMemAddr(memAddr, slot), length);
} }
/*
int isBlobSlot(byte *pageMemAddr, int slot) { static int isBlobSlot(byte *pageMemAddr, int slot) {
return BLOB_SLOT == getSlotLength(pageMemAddr, slot); return BLOB_SLOT == getSlotLength(pageMemAddr, slot);
} }*/
/* /*
This needs should trust the rid (since the caller needs to This needs should trust the rid (since the caller needs to
@ -551,7 +552,7 @@ void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) {
void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data) { void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data) {
byte *rec; byte *rec;
readlock(page->rwlatch, 529); writelock(page->rwlatch, 529);
assert(rid.size < PAGE_SIZE); assert(rid.size < PAGE_SIZE);
rec = page->memAddr + getSlotOffset(page->memAddr, rid.slot); rec = page->memAddr + getSlotOffset(page->memAddr, rid.slot);
@ -563,7 +564,7 @@ void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *
page->LSN = lsn; page->LSN = lsn;
pageWriteLSN(page); pageWriteLSN(page);
readunlock(page->rwlatch); writeunlock(page->rwlatch);
} }
@ -571,8 +572,9 @@ void pageReallocNoLock(Page *p, int id) {
p->id = id; p->id = id;
p->LSN = 0; p->LSN = 0;
p->dirty = 0; p->dirty = 0;
p->pending = 0; /* assert(p->pending == 0);
p->waiting = 0; assert(p->waiting == 1);
p->waiting = 0;*/
} }
void pageRealloc(Page *p, int id) { void pageRealloc(Page *p, int id) {
@ -601,7 +603,7 @@ Page *pageAlloc(int id) {
page->loadlatch = initlock(); page->loadlatch = initlock();
/* pthread_mutex_init(&page->pending_mutex, NULL);*/ /* pthread_mutex_init(&page->pending_mutex, NULL);*/
pthread_cond_init(&page->noMorePending, NULL); /* pthread_cond_init(&page->noMorePending, NULL); */
page->memAddr = malloc(PAGE_SIZE); page->memAddr = malloc(PAGE_SIZE);
@ -613,9 +615,9 @@ Page *pageAlloc(int id) {
pthread_mutex_unlock(&pageAllocMutex); pthread_mutex_unlock(&pageAllocMutex);
/**@todo if re-implement pending event thing, these lines need to be protected by a lock!? */
page->pending = 0; /* page->pending = 0;
page->waiting = 0; page->waiting = 1; */
return page; return page;
} }
@ -678,16 +680,20 @@ int pageTest() {
return 0; return 0;
} }
/** @todo: Should the caller need to obtain the writelock when calling pageSetSlotType? */
void pageSetSlotType(Page * p, int slot, int type) { void pageSetSlotType(Page * p, int slot, int type) {
assert(type > PAGE_SIZE); assert(type > PAGE_SIZE);
writelock(p->rwlatch, 686);
/* setSlotLength does the locking for us. */
setSlotLength(p->memAddr, slot, type); setSlotLength(p->memAddr, slot, type);
unlock(p->rwlatch);
} }
int pageGetSlotType(Page * p, int slot, int type) { int pageGetSlotType(Page * p, int slot, int type) {
int ret = getSlotLength(p->memAddr, slot); int ret;
readlock(p->rwlatch, 693);
ret = getSlotLength(p->memAddr, slot);
unlock(p->rwlatch);
/* getSlotType does the locking for us. */ /* getSlotType does the locking for us. */
return ret > PAGE_SIZE ? ret : NORMAL_SLOT; return ret > PAGE_SIZE ? ret : NORMAL_SLOT;
} }

View file

@ -125,9 +125,9 @@ typedef struct Page_s {
this properly, and there are no read-only functions for the this properly, and there are no read-only functions for the
pending field. */ pending field. */
pthread_cond_t noMorePending; /* pthread_cond_t */ /* pthread_cond_t noMorePending; */ /* pthread_cond_t */
int waiting; /* int waiting; */
/** /**
In the multi-threaded case, before we steal a page, we need to In the multi-threaded case, before we steal a page, we need to
@ -149,7 +149,7 @@ typedef struct Page_s {
carefully. carefully.
*/ */
int pending; /* int pending; */
} Page; } Page;
extern pthread_cond_t addPendingOK; extern pthread_cond_t addPendingOK;

View file

@ -145,45 +145,47 @@ static void qRemove(Page *ret) {
assert(ret != repTail); assert(ret != repTail);
assert(ret != repHead); assert(ret != repHead);
} }
/*
static Page *getFreePage() { static Page *getFreePage() {
Page *ret; Page *ret;
if( state == FULL ) { /* kick */ if( state == FULL ) { / * kick * /
ret = repTail; ret = repTail;
/** Make sure no one else will try to reuse this page. */ / ** Make sure no one else will try to reuse this page. * /
cacheRemovePage(ret); cacheRemovePage(ret);
/** Temporarily drop the mutex while we wait for outstanding / ** Temporarily drop the mutex while we wait for outstanding
operations on the page to complete. */ operations on the page to complete. * /
pthread_mutex_unlock(&loadPagePtr_mutex); pthread_mutex_unlock(&loadPagePtr_mutex);
/** @ todo getFreePage (finalize) needs to yield the getPage mutex, / ** @ todo getFreePage (finalize) needs to yield the getPage mutex,
but also needs to remove a page from the kick list before but also needs to remove a page from the kick list before
doing so. If there is a cache hit on the page that's been doing so. If there is a cache hit on the page that's been
removed from the kick list, then the cache eviction policy removed from the kick list, then the cache eviction policy
code needs o know this, and ignore the hit. -- Done. */ code needs o know this, and ignore the hit. -- Done. * /
finalize(ret); /* This cannot deadlock because each thread can / * finalize(ret); * /
/ * ret->waiting++; * / / * @todo remove waiting / pending fields.. * /
/ * This cannot deadlock because each thread can
only have outstanding pending events on the only have outstanding pending events on the
page that it's accessing, but they can only page that it's accessing, but they can only
hold that lock if the page is in cache. If the hold that lock if the page is in cache. If the
page is in cache, then the thread surely isn't page is in cache, then the thread surely isn't
here! Therefore any threads that finalize will here! Therefore any threads that finalize will
block on can not possibly be blocking on this block on can not possibly be blocking on this
thread's latches. */ thread's latches. * /
/* writelock(ret->loadlatch, 181); */ /* Don't need the lock here--No one else has a pointer to this page! */ / * writelock(ret->loadlatch, 181); * / / * Don't need the lock here--No one else has a pointer to this page! * /
pthread_mutex_lock(&loadPagePtr_mutex); pthread_mutex_lock(&loadPagePtr_mutex);
/* Now that finalize returned, pull ret out of the cache's lookup table. */ / * Now that finalize returned, pull ret out of the cache's lookup table. * /
/* pblHtRemove(activePages, &ret->id, sizeof(int)); */ / * pblHtRemove(activePages, &ret->id, sizeof(int)); * /
@ -194,81 +196,93 @@ static Page *getFreePage() {
ret->id = -1; ret->id = -1;
ret->inCache = 0; ret->inCache = 0;
/* writelock(ret->loadlatch, 166); */ / * writelock(ret->loadlatch, 166); * /
} }
return ret; return ret;
} }
*/
#define RO 0 #define RO 0
#define RW 1 #define RW 1
Page * getPage(int pageid, int locktype) { Page * getPage(int pageid, int locktype) {
Page * ret; Page * ret;
int spin = 0;
assert(locktype == RO);
pthread_mutex_lock(&loadPagePtr_mutex); pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int)); ret = pblHtLookup(activePages, &pageid, sizeof(int));
/* Unfortunately, this is a heuristic, as a race condition exists. if(ret) {
(Until we obtain a readlock on ret, we have no way of knowing if readlock(ret->loadlatch, 217);
we've gotten the correct page.) */ }
while (ret && ret->id != pageid) {
unlock(ret->loadlatch);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
readlock(ret->loadlatch, 217);
}
spin++;
if(spin > 10000) {
printf("GetPage stuck!");
}
}
if(ret) { if(ret) {
cacheHitOnPage(ret); cacheHitOnPage(ret);
assert(ret->id == -1 || ret->id == pageid); assert(ret->id == -1 || ret->id == pageid);
} } else {
pthread_mutex_unlock(&loadPagePtr_mutex);
if(!ret) {
ret = dummy_page; ret = dummy_page;
readlock(ret->loadlatch, 232);
} }
readlock(ret->loadlatch, 217); if(ret->id != pageid) {
while(ret->id != pageid) { /* Either we got a stale mapping from the HT, or no mapping at all. */
unlock(ret->loadlatch); unlock(ret->loadlatch);
pthread_mutex_lock(&loadPagePtr_mutex); if( state == FULL ) {
ret = getFreePage(); ret = repTail;
cacheRemovePage(ret);
} else {
pblHtRemove(activePages, &(ret->id), sizeof(int)); ret = pageAlloc(-1);
ret->id = -1;
ret->inCache = 0;
pthread_mutex_unlock(&loadPagePtr_mutex); }
writelock(ret->loadlatch, 231); writelock(ret->loadlatch, 217);
if(ret->id != -1) { cacheInsertPage(ret);
pblHtInsert(activePages, &pageid, sizeof(int), ret);
pblHtRemove(activePages, &(ret->id), sizeof(int));
pthread_mutex_unlock(&loadPagePtr_mutex);
/*new*/ /*writelock(ret->loadlatch, 217);*/
if(ret->id != -1) {
assert(ret != dummy_page); assert(ret != dummy_page);
pageWrite(ret); pageWrite(ret);
} }
pageRealloc(ret, pageid); /* Do we need any special lock here? */ pageRealloc(ret, pageid);
pageRead(ret); pageRead(ret);
/*new*/
/* pthread_mutex_lock(&loadPagePtr_mutex);
pblHtRemove(activePages, &(ret->id), sizeof(int));
pthread_mutex_unlock(&loadPagePtr_mutex); */
/*new*/
downgradelock(ret->loadlatch);
unlock(ret->loadlatch); } else {
pthread_mutex_lock(&loadPagePtr_mutex);
/* By inserting ret into the cache, we give up the implicit write lock. */
cacheInsertPage(ret);
pblHtInsert(activePages, &pageid, sizeof(int), ret);
pthread_mutex_unlock(&loadPagePtr_mutex); pthread_mutex_unlock(&loadPagePtr_mutex);
readlock(ret->loadlatch, 217);
} }
assert(ret->id == pageid); assert(ret->id == pageid);
return ret; return ret;

View file

@ -24,7 +24,7 @@ extern pthread_mutex_t add_pending_mutex;
*/ */
void finalize(Page * p) { /*void finalize(Page * p) {
pthread_mutex_lock(&(add_pending_mutex)); pthread_mutex_lock(&(add_pending_mutex));
p->waiting++; p->waiting++;
@ -32,12 +32,15 @@ void finalize(Page * p) {
DEBUG("A"); DEBUG("A");
pthread_cond_wait(&(p->noMorePending), &(add_pending_mutex)); pthread_cond_wait(&(p->noMorePending), &(add_pending_mutex));
} }
pthread_mutex_unlock(&(add_pending_mutex)); assert(p->pending == 0);
assert(p->waiting == 1);
pthread_cond_broadcast(&addPendingOK); pthread_cond_broadcast(&addPendingOK);
pthread_mutex_unlock(&(add_pending_mutex));
return; return;
} }*/
@ -95,7 +98,7 @@ void pageWrite(Page * ret) {
long pageoffset = ret->id * PAGE_SIZE; long pageoffset = ret->id * PAGE_SIZE;
long offset ; long offset ;
assert(ret->pending == 0); /* assert(ret->pending == 0); */
if(flushedLSN() < pageReadLSN(ret)) { if(flushedLSN() < pageReadLSN(ret)) {
DEBUG("pageWrite is calling syncLog()!\n"); DEBUG("pageWrite is calling syncLog()!\n");

View file

@ -155,9 +155,9 @@ static void Redo() {
don't need to check to see if the page is newer than this don't need to check to see if the page is newer than this
log entry. */ log entry. */
if(e->type == UPDATELOG) { if(e->type == UPDATELOG) {
addPendingEvent(e->contents.update.rid.page); /* addPendingEvent(e->contents.update.rid.page); */
} else { } else {
addPendingEvent(e->contents.clr.rid.page); /* addPendingEvent(e->contents.clr.rid.page); */
} }
redoUpdate(e); redoUpdate(e);
} }

View file

@ -11,6 +11,8 @@
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#include <page.h>
TransactionLog XactionTable[MAX_TRANSACTIONS]; TransactionLog XactionTable[MAX_TRANSACTIONS];
int numActiveXactions = 0; int numActiveXactions = 0;
int xidCount = 0; int xidCount = 0;
@ -47,7 +49,6 @@ int Tinit() {
setupOperationsTable(); setupOperationsTable();
/* pageInit(); */
bufInit(); bufInit();
openLogWriter(); openLogWriter();
@ -92,20 +93,24 @@ int Tbegin() {
void Tupdate(int xid, recordid rid, const void *dat, int op) { void Tupdate(int xid, recordid rid, const void *dat, int op) {
LogEntry * e; LogEntry * e;
Page * p;
#ifdef DEBUGGING #ifdef DEBUGGING
pthread_mutex_lock(&transactional_2_mutex); pthread_mutex_lock(&transactional_2_mutex);
assert(numActiveXactions <= MAX_TRANSACTIONS); assert(numActiveXactions <= MAX_TRANSACTIONS);
pthread_mutex_unlock(&transactional_2_mutex); pthread_mutex_unlock(&transactional_2_mutex);
#endif #endif
p = loadPage(rid.page);
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], rid, op, dat); e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], rid, op, dat);
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
doUpdate(e); doUpdate(e, p);
unlock(p->loadlatch);
} }
/* @todo what about locking? */ /* @todo what about locking? */

View file

@ -34,13 +34,14 @@ void initializePages() {
rid.slot = 0; rid.slot = 0;
rid.size = sizeof(int); rid.size = sizeof(int);
p = loadPage(rid.page); p = loadPage(rid.page);
assert(p->id != -1); assert(p->id != -1);
pageSlotRalloc(p, 0, rid); pageSlotRalloc(p, 0, rid);
/* addPendingEvent(rid.page); */
writeRecord(1, 1, rid, &i);
/* removePendingEvent(rid.page); */
assert(p->pending == 0);
unlock(p->loadlatch); unlock(p->loadlatch);
/* addPendingEvent(rid.page); */
writeRecord(1, 1, rid, &i);
/* removePendingEvent(rid.page); */
/* assert(p->pending == 0); */
} }
printf("Initialization complete.\n"); fflush(NULL); printf("Initialization complete.\n"); fflush(NULL);
@ -55,7 +56,7 @@ void * workerThread(void * p) {
int j; int j;
int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0)); int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0));
Page * p;
if(! (i % 500) ) { if(! (i % 500) ) {
printf("%d", i / 500); fflush(NULL); printf("%d", i / 500); fflush(NULL);
} }
@ -64,39 +65,53 @@ void * workerThread(void * p) {
rid.slot = 0; rid.slot = 0;
rid.size = sizeof(int); rid.size = sizeof(int);
addPendingEvent(rid.page); /* addPendingEvent(rid.page); */
readRecord(1, rid, &j); readRecord(1, rid, &j);
assert(rid.page == k); assert(rid.page == k);
removePendingEvent(rid.page); /* removePendingEvent(rid.page); */
assert(k == j); assert(k == j);
} }
return NULL; return NULL;
} }
void * workerThreadWriting(void * p) { void * workerThreadWriting(void * q) {
int offset = *(int*)p; int offset = *(int*)q;
recordid rids[RECORDS_PER_THREAD]; recordid rids[RECORDS_PER_THREAD];
for(int i = 0 ; i < RECORDS_PER_THREAD; i++) { for(int i = 0 ; i < RECORDS_PER_THREAD; i++) {
/* addPendingEvent(rids[i].page); */
rids[i] = ralloc(1, sizeof(int)); rids[i] = ralloc(1, sizeof(int));
/* removePendingEvent(rids[i].page); */
/* printf("\nRID:\t%d,%d\n", rids[i].page, rids[i].slot); */
fflush(NULL);
if(! (i % 1000) ) {
printf("A%d", i / 1000); fflush(NULL);
}
sched_yield();
} }
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int val = i + offset; int val = (i * 10000) + offset;
int oldpage = rids[i].page; int oldpage = rids[i].page;
addPendingEvent(rids[i].page);
writeRecord(1, 0, rids[i], &val); writeRecord(1, 0, rids[i], &val);
assert(oldpage == rids[i].page);
removePendingEvent(rids[i].page);
if(! (i % 1000) ) { if(! (i % 1000) ) {
printf("W%d", i / 1000); fflush(NULL); printf("W%d", i / 1000); fflush(NULL);
} }
sched_yield();
} }
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int val; int val;
Page * p;
addPendingEvent(rids[i].page);
readRecord(1, rids[i], &val); readRecord(1, rids[i], &val);
if(! (i % 1000) ) { if(! (i % 1000) ) {
@ -104,10 +119,9 @@ void * workerThreadWriting(void * p) {
} }
assert(val == i+offset); assert(val == (i * 10000)+offset);
removePendingEvent(rids[i].page);
sched_yield();
} }
return NULL; return NULL;
@ -177,7 +191,9 @@ START_TEST(pageThreadedWritersTest) {
Tinit(); Tinit();
for(i = 0; i < RECORD_THREAD_COUNT; i++) { for(i = 0; i < RECORD_THREAD_COUNT; i++) {
pthread_create(&workers[i], NULL, workerThreadWriting, &i); int * j = malloc(sizeof(int));
*j = i;
pthread_create(&workers[i], NULL, workerThreadWriting, j);
} }
for(i = 0; i < RECORD_THREAD_COUNT; i++) { for(i = 0; i < RECORD_THREAD_COUNT; i++) {
pthread_join(workers[i], NULL); pthread_join(workers[i], NULL);
@ -193,9 +209,9 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, pageSingleThreadTest); /* tcase_add_test(tc, pageSingleThreadTest); */
tcase_add_test(tc, pageLoadTest); tcase_add_test(tc, pageLoadTest);
tcase_add_test(tc, pageSingleThreadWriterTest); /* tcase_add_test(tc, pageSingleThreadWriterTest); */
tcase_add_test(tc, pageThreadedWritersTest); tcase_add_test(tc, pageThreadedWritersTest);
/* --------------------------------------------- */ /* --------------------------------------------- */

View file

@ -82,7 +82,7 @@ START_TEST(operation_physical_do_undo) {
setToTwo->LSN = 10; setToTwo->LSN = 10;
DEBUG("C\n"); DEBUG("C\n");
addPendingEvent(rid.page); /* addPendingEvent(rid.page); */
doUpdate(setToTwo); /* PAGE LSN= 10, value = 2. */ doUpdate(setToTwo); /* PAGE LSN= 10, value = 2. */
readRecord(xid, rid, &buf); readRecord(xid, rid, &buf);
@ -97,7 +97,7 @@ START_TEST(operation_physical_do_undo) {
setToTwo->LSN = 5; setToTwo->LSN = 5;
addPendingEvent(rid.page); /* addPendingEvent(rid.page); */
undoUpdate(setToTwo, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */ undoUpdate(setToTwo, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */
readRecord(xid, rid, &buf); readRecord(xid, rid, &buf);
@ -105,7 +105,7 @@ START_TEST(operation_physical_do_undo) {
fail_unless(buf == 1, NULL); fail_unless(buf == 1, NULL);
DEBUG("E\n"); DEBUG("E\n");
addPendingEvent(rid.page); /* addPendingEvent(rid.page); */
redoUpdate(setToTwo); redoUpdate(setToTwo);
@ -146,7 +146,7 @@ START_TEST(operation_physical_do_undo) {
setToTwo->LSN = 10; setToTwo->LSN = 10;
DEBUG("F\n"); DEBUG("F\n");
addPendingEvent(rid.page); /* addPendingEvent(rid.page); */
redoUpdate(setToTwo); redoUpdate(setToTwo);
/* writeLSN(setToTwo->LSN, rid.page); */ /* writeLSN(setToTwo->LSN, rid.page); */
@ -155,7 +155,7 @@ START_TEST(operation_physical_do_undo) {
fail_unless(buf == 2, NULL); fail_unless(buf == 2, NULL);
DEBUG("G undo set to 2\n"); DEBUG("G undo set to 2\n");
addPendingEvent(rid.page); /* addPendingEvent(rid.page); */
undoUpdate(setToTwo, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/ undoUpdate(setToTwo, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
readRecord(xid, rid, &buf); readRecord(xid, rid, &buf);
@ -163,7 +163,7 @@ START_TEST(operation_physical_do_undo) {
fail_unless(buf == 1, NULL); fail_unless(buf == 1, NULL);
DEBUG("H don't redo set to 2\n"); DEBUG("H don't redo set to 2\n");
addPendingEvent(rid.page); /* addPendingEvent(rid.page); */
redoUpdate(setToTwo); /* Fails */ redoUpdate(setToTwo); /* Fails */
readRecord(xid, rid, &buf); readRecord(xid, rid, &buf);
@ -174,7 +174,7 @@ START_TEST(operation_physical_do_undo) {
/* writeLSN(0,rid.page); */ /* writeLSN(0,rid.page); */
DEBUG("I redo set to 2\n"); DEBUG("I redo set to 2\n");
addPendingEvent(rid.page); /* addPendingEvent(rid.page); */
redoUpdate(setToTwo); /* Succeeds */ redoUpdate(setToTwo); /* Succeeds */
readRecord(xid, rid, &buf); readRecord(xid, rid, &buf);

View file

@ -57,8 +57,8 @@ void * writingWorkerThread ( void * v ) {
int xid = Tbegin(); int xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
rids[i] = Talloc(xid, sizeof(int)); rids[i] = Talloc(xid, sizeof(int));
if(! (i %100)) { if(! (i %1000)) {
printf("A%d", i/100);fflush(NULL); printf("A%d", i/1000);fflush(NULL);
} }
} }
@ -66,8 +66,8 @@ void * writingWorkerThread ( void * v ) {
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int tmp = i + offset; int tmp = i + offset;
Tset(xid, rids[i], &tmp); Tset(xid, rids[i], &tmp);
if(! (i %100)) { if(! (i %1000)) {
printf("W%d", i/100); fflush(NULL); printf("W%d", i/1000); fflush(NULL);
} }
@ -77,8 +77,8 @@ void * writingWorkerThread ( void * v ) {
int j; int j;
Tread(xid, rids[i], &j); Tread(xid, rids[i], &j);
assert(i + offset == j); assert(i + offset == j);
if(! (i %100)) { if(! (i %1000)) {
printf("R%d", i/100);fflush(NULL); printf("R%d", i/1000);fflush(NULL);
} }
} }