pageCache.c is now re-entrant.

This commit is contained in:
Sears Russell 2004-07-20 00:15:17 +00:00
parent 12fc5665ab
commit 490dd86c09
16 changed files with 765 additions and 141 deletions

View file

@ -34,7 +34,11 @@ typedef struct {
rwl *initlock (void);
void readlock (rwl *lock, int d);
void writelock (rwl *lock, int d);
void downgradelock(rwl * lock);
void unlock(rwl * lock);
/** @deprecated in favor of unlock() */
void readunlock (rwl *lock);
/** @deprecated in favor of unlock() */
void writeunlock (rwl *lock);
void deletelock (rwl *lock);
/*

View file

@ -98,8 +98,9 @@ extern int errno;
#define byte unsigned char
#define lsn_t long
#define DEBUGGING
/*#define PROFILE_LATCHES*/
/*#define DEBUGGING
#define PROFILE_LATCHES*/
#ifdef DEBUGGING
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */

View file

@ -1,4 +1,9 @@
#include <libdfa/rw.h>
#include <assert.h>
#undef pthread_cond_wait
#undef pthread_cond_timedwait
rwl *initlock (void)
{
@ -33,8 +38,8 @@ rwl *initlock (void)
void readlock (rwl *lock, int d)
{
/* printf("reader %d\n", d); */
fflush(NULL);
/* printf("reader %d\n", d);
fflush(NULL); */
pthread_mutex_lock (lock->mut);
if (lock->writers || lock->waiting) {
@ -46,16 +51,16 @@ void readlock (rwl *lock, int d)
}
lock->readers++;
pthread_mutex_unlock (lock->mut);
/* printf("reader %d done\n", d); */
fflush(NULL);
/* printf("reader %d done\n", d);
fflush(NULL); */
return;
}
void writelock (rwl *lock, int d)
{
/* printf("\nwritelock %d\n", d); */
fflush(NULL);
/* printf("\nwritelock %d\n", d);
fflush(NULL); */
pthread_mutex_lock (lock->mut);
lock->waiting++;
while (lock->readers || lock->writers) {
@ -67,16 +72,46 @@ void writelock (rwl *lock, int d)
lock->writers++;
pthread_mutex_unlock (lock->mut);
/* printf("\nwritelock %d done\n", d); */
fflush(NULL);
/* printf("\nwritelock %d done\n", d);
fflush(NULL); */
return;
}
void downgradelock(rwl * lock) {
pthread_mutex_lock(lock->mut);
assert(lock->writers);
lock->writers--;
lock->readers++;
pthread_cond_broadcast(lock->readOK);
pthread_mutex_unlock(lock->mut);
}
void unlock(rwl * lock) {
pthread_mutex_lock (lock->mut);
if(lock->readers) {
lock->readers--;
pthread_cond_signal (lock->writeOK);
} else {
assert (lock->writers);
lock->writers--;
/* Need this as well (in case there's another writer, which is blocking the all of the readers. */
pthread_cond_signal (lock->writeOK);
pthread_cond_broadcast (lock->readOK);
}
pthread_mutex_unlock (lock->mut);
}
/*void readunlock(rwl *lock) {
writeunlock(lock);
}*/
void readunlock(rwl * lock) {
unlock(lock);
}
void writeunlock(rwl * lock) {
unlock(lock);
}
/*
void readunlock (rwl *lock)
{
pthread_mutex_lock (lock->mut);
@ -94,8 +129,8 @@ void readunlock (rwl *lock)
void writeunlock (rwl *lock)
{
/* printf("writeunlock done\n"); */
fflush(NULL);
/ * printf("writeunlock done\n");
fflush(NULL); * /
pthread_mutex_lock (lock->mut);
lock->writers--;
@ -104,7 +139,7 @@ void writeunlock (rwl *lock)
pthread_cond_broadcast (lock->readOK);
pthread_mutex_unlock (lock->mut);
}
*/
void deletelock (rwl *lock)
{
pthread_mutex_destroy (lock->mut);

View file

@ -47,17 +47,15 @@ terms specified in this license.
#include <config.h>
#include <lladd/common.h>
#include <latches.h>
#include <assert.h>
#include <lladd/bufferManager.h>
#include "blobManager.h"
#include <lladd/pageCache.h>
/*#include "logger/logWriter.h" */
#include "page.h"
#include "pageFile.h"
/**
Invariant: This lock should be held while updating lastFreepage, or
while performing any operation that may decrease the amount of
@ -116,7 +114,9 @@ void simulateBufferManagerCrash() {
/* ** No file I/O below this line. ** */
Page * loadPage (int pageid) {
return loadPagePtr(pageid);
Page * p = loadPagePtr(pageid);
assert (p->id == pageid);
return p;
}
Page * lastRallocPage = 0;
@ -133,10 +133,11 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
pthread_mutex_lock(&lastFreepage_mutex);
while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; }
while(freespace(p = loadPage(lastFreepage)) < size ) { unlock(p->loadlatch); lastFreepage++; }
ret = pageRalloc(p, size);
unlock(p->loadlatch);
pthread_mutex_unlock(&lastFreepage_mutex);
DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size);
@ -147,11 +148,14 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
void slotRalloc(int pageid, lsn_t lsn, recordid rid) {
Page * loadedPage = loadPage(rid.page);
pageSlotRalloc(loadedPage, lsn, rid);
unlock(loadedPage->loadlatch);
}
long readLSN(int pageid) {
return pageReadLSN(loadPage(pageid));
Page *p;
lsn_t lsn = pageReadLSN(p = loadPage(pageid));
unlock(p->loadlatch);
return lsn;
}
void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
@ -159,30 +163,36 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
Page *p;
if(rid.size > BLOB_THRESHOLD_SIZE) {
DEBUG("Writing blob.\n");
/* DEBUG("Writing blob.\n"); */
writeBlob(xid, lsn, rid, dat);
} else {
DEBUG("Writing record.\n");
p = loadPagePtr(rid.page);
/* DEBUG("Writing record.\n"); */
p = loadPage(rid.page); /* loadPagePtr(rid.page); */
assert( (p->id == rid.page) && (p->memAddr != NULL) );
/** @todo This assert should be here, but the tests are broken, so it causes bogus failures. */
/*assert(pageReadLSN(*p) <= lsn);*/
pageWriteRecord(xid, p, rid, lsn, dat);
unlock(p->loadlatch);
}
}
void readRecord(int xid, recordid rid, void *buf) {
if(rid.size > BLOB_THRESHOLD_SIZE) {
DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf);
/* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
readBlob(xid, rid, buf);
} else {
DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf);
pageReadRecord(xid, loadPage(rid.page), rid, buf);
Page * p = loadPage(rid.page);
assert(rid.page == p->id);
/* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
pageReadRecord(xid, p, rid, buf);
assert(rid.page == p->id);
unlock(p->loadlatch);
}
}
@ -205,6 +215,7 @@ int bufTransAbort(int xid, lsn_t lsn) {
void setSlotType(int pageid, int slot, int type) {
Page * p = loadPage(pageid);
pageSetSlotType(p, slot, type);
unlock(p->loadlatch);
}
/**
@ -232,6 +243,8 @@ void addPendingEvent(int pageid){
pthread_mutex_unlock(&(p->pending_mutex));
unlock(p->loadlatch);
}
/**
@ -263,6 +276,8 @@ void removePendingEvent(int pageid) {
}
pthread_mutex_unlock(&(p->pending_mutex));
unlock(p->loadlatch);
}

View file

@ -14,7 +14,8 @@
#undef pthread_mutex_lock
#undef pthread_mutex_trylock
#undef pthread_mutex_unlock
#undef pthread_cond_timedwait
#undef pthread_cond_wait
int __lladd_pthread_mutex_init(lladd_pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr,
const char * file, int line, const char * name) {
@ -44,8 +45,8 @@ int __lladd_pthread_mutex_lock(lladd_pthread_mutex_t *mutex, char * file, int li
blockCount ++;
pthread_yield();
if(blockCount > 10000) {
DEBUG("Spinning at %s:%d, %ld times\n", file, line, blockCount);
if(blockCount >= 10000 && ! (blockCount % 10000)) {
DEBUG("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at);
}
}
@ -108,6 +109,20 @@ int __lladd_pthread_mutex_destroy(lladd_pthread_mutex_t *mutex) {
}
/**
@todo The profiled version of pthread_cond_wait isn't really implemented, so it throws off the mutex statistics.
*/
int __lladd_pthread_cond_wait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex,
char * file, int line, char * cond_name, char * mutex_name) {
return pthread_cond_wait(cond, &mutex->mutex);
}
int __lladd_pthread_cond_timedwait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, void *abstime,
char * file, int line, char * cond_name, char * mutex_name) {
return pthread_cond_timedwait(cond, &mutex->mutex, abstime);
}
#undef rwl
#undef initlock
#undef readlock
@ -115,6 +130,8 @@ int __lladd_pthread_mutex_destroy(lladd_pthread_mutex_t *mutex) {
#undef readunlock
#undef writeunlock
#undef deletelock
#undef unlock
#undef downgradelock
__profile_rwl *__profile_rw_initlock (char * file, int line) {
__profile_rwl * ret = malloc(sizeof(__profile_rwl));
@ -143,6 +160,7 @@ void __profile_readlock (__profile_rwl *lock, int d, char * file, int line) {
readlock(lock->lock, d);
}
void __profile_writelock (__profile_rwl *lock, int d, char * file, int line) {
@ -187,7 +205,6 @@ void __profile_readunlock (__profile_rwl *lock) {
readunlock(lock->lock);
}
void __profile_writeunlock (__profile_rwl *lock) {
@ -201,6 +218,26 @@ void __profile_writeunlock (__profile_rwl *lock) {
writeunlock(lock->lock);
}
void __profile_unlock (__profile_rwl * lock) {
if(lock->lock->writers) {
__profile_writeunlock(lock);
} else {
__profile_readunlock(lock);
}
}
void __profile_downgradelock (__profile_rwl * lock) {
profile_tuple * tup = pblHtLookup(lock->lockpoints, lock->last_acquired_at, strlen(lock->last_acquired_at)+1);
released_lock(tup);
released_lock(&(lock->tup));
free(lock->last_acquired_at);
downgradelock(lock->lock);
}
void __profile_deletelock (__profile_rwl *lock) {
profile_tuple * tup;

View file

@ -4,6 +4,9 @@
/** @todo threading should be moved into its own header file. */
#include <pthread.h>
/*#include <pbl/pbl.h> -- Don't want everything that touches threading to include pbl... */
#include <lladd/stats.h>
@ -50,17 +53,26 @@ typedef struct {
#define pthread_mutex_lock(x) __lladd_pthread_mutex_lock((x), __FILE__, __LINE__)
#define pthread_mutex_unlock(x) __lladd_pthread_mutex_unlock((x))
#define pthread_mutex_trylock(x) NO_PROFILING_EQUIVALENT_TO_PTHREAD_TRYLOCK
#define pthread_cond_wait(x, y) __lladd_pthread_cond_wait((x), (y), __FILE__, __LINE__, #x, #y);
#define pthread_cond_timedwait(x, y, z) __lladd_pthread_cond_timedwait((x), (y), (z), __FILE__, __LINE__, #x, #y);
int __lladd_pthread_mutex_init(lladd_pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr, const char * file, int line, const char * mutex_name);
int __lladd_pthread_mutex_lock(lladd_pthread_mutex_t *mutex, char * file, int line);
int __lladd_pthread_mutex_unlock(lladd_pthread_mutex_t *mutex);
int __lladd_pthread_mutex_destroy(lladd_pthread_mutex_t *mutex);
int __lladd_pthread_cond_wait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex,
char * file, int line, char * cond_name, char * mutex_name);
/** @param abstime should be const struct timespec, but GCC won't take that. */
int __lladd_pthread_cond_timedwait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, void *abstime,
char * file, int line, char * cond_name, char * mutex_name);
#define initlock() __profile_rw_initlock(__FILE__, __LINE__)
#define readlock(x, y) __profile_readlock((x),(y), __FILE__, __LINE__)
#define writelock(x, y) __profile_writelock((x), (y), __FILE__, __LINE__)
#define readunlock(x) __profile_readunlock((x))
#define writeunlock(x) __profile_writeunlock((x))
#define unlock(x) __profile_unlock((x))
#define downgradelock(x) __profile_downgradelock((x))
#define deletelock(x) __profile_deletelock((x))
#define rwl __profile_rwl
@ -70,9 +82,13 @@ void __profile_readlock (rwl *lock, int d, char * file, int line);
void __profile_writelock (rwl *lock, int d, char * file, int line);
void __profile_readunlock (rwl *lock);
void __profile_writeunlock (rwl *lock);
void __profile_unlock (rwl *lock);
void __profile_downgradelock (rwl *lock);
void __profile_deletelock (rwl *lock);
#endif
#endif /* __LATCHES_H */

View file

@ -46,7 +46,6 @@ terms specified in this license.
#include "logWriter.h"
#include "logHandle.h"
#include "../latches.h"
#include "../pageFile.h"
#include <assert.h>
#include <stdio.h>

View file

@ -89,12 +89,12 @@ terms specified in this license.
#include <config.h>
#include <lladd/common.h>
#include "latches.h"
#include "page.h"
#include <assert.h>
#include <stdio.h>
/*#include "latches.h" */
#include <lladd/constants.h>
/* TODO: Combine with buffer size... */
@ -140,7 +140,8 @@ static int MASK_FFFF0000;
/* ------ */
static pthread_mutex_t pageAllocMutex;
Page pool[MAX_BUFFER_SIZE];
/** We need one dummy page for locking purposes, so this array has one extra page in it. */
Page pool[MAX_BUFFER_SIZE+1];
int isValidSlot(byte *memAddr, int slot);
@ -247,9 +248,9 @@ static const byte *slotMemAddr(const byte *memAddr, int slotNum) {
lsn_t pageReadLSN(const Page * page) {
lsn_t ret;
readlock(page->rwlatch, 259);
/* readlock(page->rwlatch, 259); */
ret = *(long *)(page->memAddr + START_OF_LSN);
readunlock(page->rwlatch);
/* readunlock(page->rwlatch); */
return ret;
}
@ -539,6 +540,8 @@ void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) {
byte *recAddress;
readlock(page->rwlatch, 519);
assert(page->id == rid.page);
recAddress = page->memAddr + getSlotOffset(page->memAddr, rid.slot);
memcpy(buff, recAddress, rid.size);
readunlock(page->rwlatch);
@ -564,13 +567,17 @@ void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *
}
void pageRealloc(Page *p, int id) {
writelock(p->rwlatch, 10);
void pageReallocNoLock(Page *p, int id) {
p->id = id;
p->LSN = 0;
p->dirty = 0;
p->pending = 0;
p->waiting = 0;
}
void pageRealloc(Page *p, int id) {
writelock(p->rwlatch, 10);
pageReallocNoLock(p,id);
writeunlock(p->rwlatch);
}
@ -591,12 +598,15 @@ Page *pageAlloc(int id) {
/* We have an implicit lock on rwlatch, since we allocated it, but
haven't returned yet. */
page->rwlatch = initlock();
page->loadlatch = initlock();
pthread_mutex_init(&page->pending_mutex, NULL);
pthread_cond_init(&page->noMorePending, NULL);
page->memAddr = malloc(PAGE_SIZE);
nextPage++;
assert(nextPage <= MAX_BUFFER_SIZE);
assert(nextPage <= MAX_BUFFER_SIZE + 1); /* There's a dummy page that we need to keep around, thus the +1 */
/* uggh. Really just want to pass pages by reference */
/* page->pending = malloc(sizeof(int)); */
@ -604,8 +614,6 @@ Page *pageAlloc(int id) {
pthread_mutex_unlock(&pageAllocMutex);
/* pageRealloc does its own locking... */
pageRealloc(page, id);
return page;

View file

@ -54,14 +54,14 @@ terms specified in this license.
#ifndef __PAGE_H__
#define __PAGE_H__
#include <config.h>
#include <lladd/common.h>
#include "latches.h"
/** @todo page.h includes things that it shouldn't! (Or, page.h shouldn't be an installed header.) */
#include <lladd/transactional.h>
#include <config.h>
#include "latches.h"
BEGIN_C_DECLS
/**
@ -90,6 +90,9 @@ typedef struct Page_s {
struct Page_s *prev;
/** Which queue is the page in? */
int queue;
/** Is the page in the cache at all? */
int inCache;
/** Used for page-level latching.
Each page has an associated read/write lock. This lock only
@ -113,7 +116,9 @@ typedef struct Page_s {
writing the locked page to disk)
*/
void * rwlatch;
rwl * rwlatch;
rwl * loadlatch;
/** This mutex protects the pending field. We don't use rwlatch for
this, since we also need to use a condition variable to update
@ -207,6 +212,8 @@ void pageCommit(int xid);
void pageAbort(int xid);
void pageReallocNoLock(Page * p, int id);
/** @todo Do we need a locking version of pageRealloc? */
void pageRealloc(Page * p, int id);
Page* pageAlloc(int id);

View file

@ -7,9 +7,9 @@
*/
#include <config.h>
#include <lladd/common.h>
#include "latches.h"
#include <lladd/pageCache.h>
#include <lladd/bufferManager.h>
#include "latches.h"
#include <assert.h>
#include <pbl/pbl.h>
@ -20,17 +20,39 @@
static pblHashTable_t *activePages; /* page lookup */
static unsigned int bufferSize; /* < MAX_BUFFER_SIZE */
static Page *repHead, *repMiddle, *repTail; /* replacement policy */
static pthread_mutex_t loadPagePtr_mutex;
#define INITIAL 0
#define FULL 1
static int state;
/* These three functions are for internal use. They are not declared
static so that their test case can compile. */
static void cacheHitOnPage(Page * ret);
static void cacheRemovePage(Page * ret);
static void cacheInsertPage (Page * ret);
static Page * dummy_page;
void pageCacheInit() {
Page *first;
bufferSize = 1;
state = INITIAL;
pthread_mutex_init(&loadPagePtr_mutex, NULL);
activePages = pblHtCreate();
assert(activePages);
DEBUG("pageCacheInit()");
first = pageAlloc(0);
dummy_page = pageAlloc(-1);
pageRealloc(first, 0);
pageRealloc(dummy_page, -1);
first->inCache = 1;
pblHtInsert(activePages, &first->id, sizeof(int), first);
first->prev = first->next = NULL;
@ -40,6 +62,7 @@ void pageCacheInit() {
repHead = repTail = first;
repMiddle = NULL;
}
void pageCacheDeinit() {
@ -57,6 +80,10 @@ void pageCacheDeinit() {
pageWrite(p);
}
pthread_mutex_destroy(&loadPagePtr_mutex);
pblHtDelete(activePages);
}
@ -74,7 +101,9 @@ static void headInsert(Page *ret) {
static void middleInsert(Page *ret) {
assert( bufferSize == MAX_BUFFER_SIZE );
assert(state == FULL);
/* assert( bufferSize == MAX_BUFFER_SIZE ); */
assert(ret != repMiddle);
assert(ret != repTail);
@ -90,9 +119,12 @@ static void middleInsert(Page *ret) {
assert(ret->next != ret && ret->prev != ret);
}
/** @todo Under high contention, the buffer pool can empty. What should be done about this, other than making sure that # threads > buffer size? */
static void qRemove(Page *ret) {
assert( bufferSize == MAX_BUFFER_SIZE );
assert(state == FULL);
/* assert( bufferSize == MAX_BUFFER_SIZE ); */
assert(ret->next != ret && ret->prev != ret);
if( ret->prev )
@ -114,6 +146,260 @@ static void qRemove(Page *ret) {
assert(ret != repHead);
}
static Page *getFreePage() {
Page *ret;
if( state == FULL ) { /* kick */
ret = repTail;
/** Make sure no one else will try to reuse this page. */
cacheRemovePage(ret);
/** Temporarily drop the mutex while we wait for outstanding
operations on the page to complete. */
pthread_mutex_unlock(&loadPagePtr_mutex);
/** @ todo getFreePage (finalize) needs to yield the getPage mutex,
but also needs to remove a page from the kick list before
doing so. If there is a cache hit on the page that's been
removed from the kick list, then the cache eviction policy
code needs o know this, and ignore the hit. -- Done. */
finalize(ret); /* This cannot deadlock because each thread can
only have outstanding pending events on the
page that it's accessing, but they can only
hold that lock if the page is in cache. If the
page is in cache, then the thread surely isn't
here! Therefore any threads that finalize will
block on can not possibly be blocking on this
thread's latches. */
/* writelock(ret->loadlatch, 181); */ /* Don't need the lock here--No one else has a pointer to this page! */
pthread_mutex_lock(&loadPagePtr_mutex);
/* Now that finalize returned, pull ret out of the cache's lookup table. */
/* pblHtRemove(activePages, &ret->id, sizeof(int)); */
} else {
ret = pageAlloc(-1);
ret->id = -1;
ret->inCache = 0;
/* writelock(ret->loadlatch, 166); */
}
return ret;
}
#define RO 0
#define RW 1
Page * getPage(int pageid, int locktype) {
Page * ret;
assert(locktype == RO);
pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
// Unfortunately, this is a heuristic, as a race condition exists.
// (Until we obtain a readlock on ret, we have no way of knowing if
// we've gotten the correct page.)
if(ret) {
cacheHitOnPage(ret);
assert(ret->id == -1 || ret->id == pageid);
}
pthread_mutex_unlock(&loadPagePtr_mutex);
if(!ret) {
ret = dummy_page;
}
writelock(ret->loadlatch, 217);
while(ret->id != pageid) { /* Either we got a stale mapping from the HT, or no mapping at all. */
unlock(ret->loadlatch);
pthread_mutex_lock(&loadPagePtr_mutex);
ret = getFreePage();
pblHtRemove(activePages, &(ret->id), sizeof(int));
pthread_mutex_unlock(&loadPagePtr_mutex);
writelock(ret->loadlatch, 231);
if(ret->id != -1) {
assert(ret != dummy_page);
pageWrite(ret);
}
pageRealloc(ret, pageid); /* Do we need any special lock here? */
pageRead(ret);
unlock(ret->loadlatch);
pthread_mutex_lock(&loadPagePtr_mutex);
/* By inserting ret into the cache, we give up the implicit write lock. */
cacheInsertPage(ret);
pblHtInsert(activePages, &pageid, sizeof(int), ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
writelock(ret->loadlatch, 217);
}
assert(ret->id == pageid);
return ret;
}
/* Page * getPageOld(int pageid, int locktype) {
Page * ret;
int spin = 0;
assert(0);
/ ** This wonderful bit of code avoids deadlocks.
Locking works like this:
a) get a HT mutex, lookup pageid, did we get a pointer?
- yes, release the mutex, so we don't deadlock getting a page lock.
- no, keep the mutex, move on to the next part of the function.
b) lock whatever pointer the HT returned. (Safe, since the memory + mutex are allocated exactly one time.)
c) did we get the right page?
- yes, success!
- no, goto (a)
* /
pthread_mutex_lock(&loadPagePtr_mutex);
do {
do {
if(spin) {
sched_yield();
}
spin ++;
if(spin > 1000 && (spin % 10000 == 0)) {
DEBUG("Spinning in pageCache's hashtable lookup: %d\n", spin);
}
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
/ * pthread_mutex_unlock(&loadPagePtr_mutex); * /
if(locktype == RO) {
readlock(ret->loadlatch, 147);
} else {
writelock(ret->loadlatch, 149);
}
/ * pthread_mutex_lock(&loadPagePtr_mutex); * /
}
} while (ret && (ret->id != pageid));
if(ret) {
cacheHitOnPage(ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
assert(ret->id == pageid);
return ret;
}
/ * OK, we need to handle a cache miss. This is also tricky.
If getFreePage needs to kick a page, then it will need a
writeLock on the thing it kicks, so we drop our mutex here.
But, before we do that, we need to make sure that no one else
tries to load our page. We do this by inserting a dummy entry in
the cache. Since it's pageid won't match the pageid that we're
inserting, other threads will spin in the do..while loop untile
we've loaded the page.
* /
pblHtInsert(activePages, &pageid, sizeof(int), dummy_page);
ret = getFreePage();
/ * ret is now a pointer that no other thread has access to, and we
hold a write lock on it * /
pblHtRemove(activePages, &pageid, sizeof(int));
pblHtInsert(activePages, &pageid, sizeof(int), ret);
/ * writes were here... * /
/ * pthread_mutex_unlock(&loadPagePtr_mutex); * /
if(ret->id != -1) {
pageWrite(ret);
}
pageRealloc(ret, pageid);
pageRead(ret);
/ * pthread_mutex_lock(&loadPagePtr_mutex); * /
assert(ret->inCache == 0);
cacheInsertPage(ret);
assert(ret->inCache == 1);
pthread_mutex_unlock(&loadPagePtr_mutex);
if(locktype == RO) {
readlock(ret->loadlatch, 314);
} else {
writelock(ret->loadlatch, 316);
}
if(locktype == RO) {
downgradelock(ret->loadlatch);
}
} while (ret->id != pageid);
return ret;
}*/
/*
static Page *kickPage(int pageid) {
Page *ret = repTail;
@ -140,15 +426,10 @@ static Page *kickPage(int pageid) {
/ *pthread_mutex_unlock(loadPagePtr_mutex);* /
pageWrite(ret);
/ *pthread_mutex_lock(loadPagePtr_mutex);* /
writelock(ret->rwlatch, 121);
pageRealloc(ret, pageid);
middleInsert(ret);
/ * pblHtInsert(activePages, &pageid, sizeof(int), ret); * /
return ret;
@ -156,10 +437,124 @@ static Page *kickPage(int pageid) {
int lastPageId = -1;
Page * lastPage = 0;
*/
/*
static void noteRead(Page * ret) {
if( bufferSize == MAX_BUFFER_SIZE ) { / * we need to worry about page sorting * /
/ * move to head * /
if( ret != repHead ) {
qRemove(ret);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
static pthread_mutex_t loadPagePtr_mutex;
if( ret->queue == 2 ) {
/ * keep first queue same size * /
repMiddle = repMiddle->prev;
repMiddle->queue = 2;
ret->queue = 1;
}
}
}
}
void loadPagePtrFoo(int pageid, int readOnly) {
Page * ret;
pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
getPage(
if(ret) {
if(readOnly) {
readlock(ret->rwlatch, 178);
} else {
writelock(ret->rwlatch, 180);
}
noteRead(ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
} else if(bufferSize == MAX_BUFFER_SIZE - 1) {
}
}
*/
static void cacheInsertPage (Page * ret) {
bufferSize++;
assert(!ret->inCache);
ret->inCache ++;
if(state == FULL) {
middleInsert(ret);
} else {
if(bufferSize == MAX_BUFFER_SIZE/* - 1*/) { /* Set up page kick mechanism. */
int i;
Page *iter;
state = FULL;
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
/* split up queue:
* "in all cases studied ... fixing the primary region to 30% ...
* resulted in the best performance"
*/
repMiddle = repHead;
for( i = 0; i < MAX_BUFFER_SIZE / 3; i++ ) {
repMiddle->queue = 1;
repMiddle = repMiddle->next;
}
for( iter = repMiddle; iter; iter = iter->next ) {
iter->queue = 2;
}
} else { /* Just insert it. */
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
assert(ret->next != ret && ret->prev != ret);
}
}
}
static void cacheRemovePage(Page * ret) {
assert(ret->inCache);
qRemove(ret);
ret->inCache--;
bufferSize --;
}
static void cacheHitOnPage(Page * ret) {
/* The page may not be in cache if it is about to be freed. */
if(ret->inCache && state == FULL) { /* we need to worry about page sorting */
/* move to head */
if( ret != repHead ) {
qRemove(ret);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
if( ret->queue == 2 ) {
/* keep first queue same size */
repMiddle = repMiddle->prev;
repMiddle->queue = 2;
ret->queue = 1;
}
}
}
}
void *loadPagePtr(int pageid) {
Page * ret = getPage(pageid, RO);
return ret;
}
/** @todo loadPagePtr needs to aquire the page read/write lock -- if it does, then should page.c do any locking? */
/*void *loadPagePtr(int pageid) {
/ * lock activePages, bufferSize * /
Page *ret;
@ -203,20 +598,23 @@ void *loadPagePtr(int pageid) {
return ret;
} else if( bufferSize == MAX_BUFFER_SIZE ) { / * we need to kick * /
ret = kickPage(pageid);
pageWrite(ret);
pageRealloc(ret, pageid);
middleInsert(ret);
} else if( bufferSize == MAX_BUFFER_SIZE-1 ) { / * we need to setup kickPage mechanism * /
int i;
Page *iter;
ret = pageAlloc(pageid);
bufferSize++;
pageRealloc(ret, pageid);
writelock(ret->rwlatch, 224);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
/* pblHtInsert( activePages, &pageid, sizeof(int), ret ); */
bufferSize++;
/ * split up queue:
* "in all cases studied ... fixing the primary region to 30% ...
* resulted in the best performance"
@ -236,11 +634,14 @@ void *loadPagePtr(int pageid) {
bufferSize++;
ret = pageAlloc(pageid);
pageRealloc(ret, pageid);
writelock(ret->rwlatch, 224);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
assert(ret->next != ret && ret->prev != ret);
/* pblHtInsert( activePages, &pageid, sizeof(int), ret ); */
}
@ -267,7 +668,8 @@ void *loadPagePtr(int pageid) {
writeunlock(ret->rwlatch);
DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr);
/ * DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); * /
return ret;
}
*/

View file

@ -20,7 +20,7 @@ static FILE * stable = NULL;
*/
static void finalize(Page * p) {
void finalize(Page * p) {
pthread_mutex_lock(&(p->pending_mutex));
p->waiting++;
@ -38,26 +38,39 @@ static void finalize(Page * p) {
/* This function is declared in page.h */
void pageRead(Page *ret) {
long fileSize = myFseek(stable, 0, SEEK_END);
long pageoffset = ret->id * PAGE_SIZE;
long fileSize;
long pageoffset;
long offset;
DEBUG("Reading page %d\n", ret->id);
if(!ret->memAddr) {
/** @todo pageRead() is using fseek to calculate the file size on each read, which is inefficient. */
pageoffset = ret->id * PAGE_SIZE;
flockfile(stable);
fileSize = myFseekNoLock(stable, 0, SEEK_END);
/* DEBUG("Reading page %d\n", ret->id); */
/* if(!ret->memAddr) {
ret->memAddr = malloc(PAGE_SIZE);
}
assert(ret->memAddr);
if(!ret->memAddr) {
perror("pageFile.c");
fflush(NULL);
}
assert(ret->memAddr); */
if ((ret->id)*PAGE_SIZE >= fileSize) {
myFseek(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET);
myFseekNoLock(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET);
if(1 != fwrite("", 1, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
offset = myFseek(stable, pageoffset, SEEK_SET);
offset = myFseekNoLock(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
if(1 != fread(ret->memAddr, PAGE_SIZE, 1, stable)) {
@ -67,27 +80,27 @@ void pageRead(Page *ret) {
}
funlockfile(stable);
}
/* This function is declared in page.h */
void pageWrite(Page * ret) {
long pageoffset = ret->id * PAGE_SIZE;
long offset = myFseek(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
assert(ret->memAddr);
DEBUG("Writing page %d\n", ret->id);
/* Need to call finalize before checking the LSN. Once finalize
returns, we have exclusive access to this page, and can safely
write it to disk. */
finalize(ret);
long offset ;
if(flushedLSN() < pageReadLSN(ret)) {
DEBUG("pageWrite is calling syncLog()!\n");
syncLog();
}
flockfile(stable);
offset = myFseekNoLock(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
assert(ret->memAddr);
/* DEBUG("Writing page %d\n", ret->id); */
if(1 != fwrite(ret->memAddr, PAGE_SIZE, 1, stable)) {
@ -95,6 +108,8 @@ void pageWrite(Page * ret) {
if(ferror(stable)) { printf("Error writing stream! %d", ferror(stable)); fflush(NULL); abort(); }
}
funlockfile(stable);
}
void openPageFile() {
@ -125,9 +140,15 @@ void closePageFile() {
long myFseek(FILE * f, long offset, int whence) {
long ret;
flockfile(f);
ret = myFseekNoLock(f, offset, whence);
funlockfile(f);
return ret;
}
long myFseekNoLock(FILE * f, long offset, int whence) {
long ret;
if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); }
if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); }
funlockfile(f);
return ret;
}

View file

@ -43,6 +43,9 @@ void openPageFile();
void closePageFile();
long myFseek(FILE * f, long offset, int whence);
long myFseekNoLock(FILE * f, long offset, int whence);
void myFwrite(const void * dat, long size, FILE * f);
void finalize(Page * p);
#endif /* __PAGE_FILE_H */

View file

@ -24,8 +24,13 @@
please see: http://mission.base.com/.
$Log$
Revision 1.1 2004/06/24 21:11:54 sears
Initial revision
Revision 1.2 2004/07/20 00:15:17 sears
pageCache.c is now re-entrant.
Revision 1.1.1.1 2004/06/24 21:11:54 sears
Need to send laptop in for warranty service, so it's time to put this code into CVS. :)
Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation.
Revision 1.4 2004/05/26 09:55:31 sears
Misc bugfixes / optimizations.
@ -63,8 +68,8 @@ static int rcsid_fct() { return( rcsid ? 0 : rcsid_fct() ); }
/*****************************************************************************/
/* #defines */
/*****************************************************************************/
/*#define PBL_HASHTABLE_SIZE 1019*/
#define PBL_HASHTABLE_SIZE 100003
#define PBL_HASHTABLE_SIZE 1019
/*#define PBL_HASHTABLE_SIZE 100003 */
/*****************************************************************************/
/* typedefs */

View file

@ -6,6 +6,6 @@ else
TESTS =
endif
noinst_PROGRAMS = $(TESTS)
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a -lefence
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a #-lefence
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -7,16 +7,66 @@
/*#include <lladd/logger/logEntry.h> */
#include "../../src/lladd/logger/logHandle.h"
#include "../../src/lladd/logger/logWriter.h"
#include "../../src/lladd/latches.h"
#include "../../src/lladd/page.h"
#include <lladd/bufferManager.h>
#include <sched.h>
#include <assert.h>
#include "../check_includes.h"
#define LOG_NAME "check_bufferMananger.log"
#define NUM_PAGES 1000
#define THREAD_COUNT 5
#define READS_PER_THREAD 50000
void initializePages() {
int i;
for(i = 0 ; i < NUM_PAGES; i++) {
recordid rid;
rid.page = i;
rid.slot = 0;
rid.size = sizeof(int);
writeRecord(1, 1, rid, &i);
}
}
void * workerThread(void * p) {
int i;
for(i = 0 ; i < READS_PER_THREAD; i++) {
recordid rid;
int j;
int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0));
if(! (i % 5000) ) {
printf("%d", i / 5000); fflush(NULL);
}
rid.page = k;
rid.slot = 0;
rid.size = sizeof(int);
readRecord(1, rid, &j);
assert(k == j);
}
return NULL;
}
START_TEST(pageSingleThreadTest) {
Tinit();
initializePages();
/* sleep(100000000); */
workerThread(NULL);
Tdeinit();
} END_TEST
/**
@test
@ -31,9 +81,24 @@
pages.
*/
START_TEST(pageLoadTest)
{
fail_unless(0, "Write this test!");
START_TEST(pageLoadTest) {
pthread_t workers[THREAD_COUNT];
int i;
/* fail_unless(0, "Broken for now.");
assert(0); */
Tinit();
initializePages();
for(i = 0; i < THREAD_COUNT; i++) {
pthread_create(&workers[i], NULL, workerThread, NULL);
}
for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);
}
Tdeinit();
} END_TEST
Suite * check_suite(void) {
@ -43,6 +108,7 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */
/*tcase_add_test(tc, pageSingleThreadTest); */
tcase_add_test(tc, pageLoadTest);
/* --------------------------------------------- */

View file

@ -129,6 +129,8 @@ START_TEST(pageNoThreadTest)
worker_thread(p);
unlock(p->loadlatch);
Tdeinit();
}
@ -154,6 +156,9 @@ START_TEST(pageThreadTest) {
for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);
}
unlock(p->loadlatch);
Tdeinit();
} END_TEST