bufferHash now seems to be correct, and makes use of the stasis_handle_t

interface.
This commit is contained in:
Sears Russell 2007-03-13 08:39:29 +00:00
parent 2e0ad972fd
commit 6e749b93a4
5 changed files with 139 additions and 78 deletions

View file

@ -4,7 +4,7 @@ lib_LIBRARIES=liblladd.a
#liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a
# removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c
liblladd_a_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \
pageFile.c pageCache.c page.c bufferPool.c blobManager.c recovery2.c truncation.c \
pageHandle.c pageFile.c pageCache.c page.c bufferPool.c blobManager.c recovery2.c truncation.c \
transactional2.c allocationPolicy.c \
lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\
logger/logEntry.c logger/logWriter.c logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \

View file

@ -30,6 +30,7 @@ static int pageCount;
static Page ** freeList;
// A page is in LRU iff !pending, !pinned
static replacementPolicy * lru;
static int running;
@ -49,48 +50,90 @@ static void pageSetNode(void * page, node_t * n, void * ignore) {
#define pagePendingPtr(p) ((intptr_t*)(&((p)->next)))
#define pagePinCountPtr(p) ((intptr_t*)(&((p)->queue)))
#ifdef LONG_RUN
inline static void checkPageState(Page * p) {
Page * check = LH_ENTRY(find)(cachedPages, &(p->id), sizeof(int));
if(check) {
int pending = *pagePendingPtr(p);
int pinned = *pagePinCountPtr(p);
if((!pinned) && (!pending)) {
assert(pageGetNode(p, 0));
} else {
assert(!pageGetNode(p,0));
}
int notfound = 1;
for(int i = 0; i < freeCount; i++) {
if(freeList[i] == p) { notfound = 0; }
}
assert(notfound);
} else {
assert(!pageGetNode(p,0));
assert(!*pagePendingPtr(p));
assert(!*pagePinCountPtr(p));
int found = 0;
for(int i = 0; i < freeCount; i++) {
if(freeList[i] == p) { found = 1; }
}
assert(found);
}
}
#else
inline static void checkPageState(Page * p) { }
#endif
/** You need to hold mut before calling this.
@return the page that was just written back. It will not be in
lru or cachedPages after the call returns.
*/
inline static Page * writeBackOnePage() {
Page * victim = lru->getStale(lru);
// Make sure we have an exclusive lock on victim.
assert(victim);
assert(!*pagePendingPtr(victim));
assert(! *pagePendingPtr(victim));
assert(! *pagePinCountPtr(victim));
#ifdef LATCH_SANITY_CHECKING
int latched = trywritelock(victim->loadlatch,0);
assert(latched);
#endif
// We have an exclusive lock on victim.
assert(! *pagePinCountPtr(victim));
checkPageState(victim);
// XXX this can double free with (*)
lru->remove(lru, victim);
Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(int));
assert(old == victim);
// printf("Write(%ld)\n", (long)victim->id);
pageWrite(victim);
// Make sure that no one mistakenly thinks this is still a live copy.
victim->id = -1;
#ifdef LATCH_SANITY_CHECKING
// We can release the lock since we just grabbed it to see if
// anyone else has pinned the page... the caller holds mut, so
// no-one will touch the page for now.
#ifdef LATCH_SANITY_CHECKING
unlock(victim->loadlatch);
#endif
return victim;
}
/** Returns a free page. The page will not be cachedPages or lru. */
/** Returns a free page. The page will not be in freeList,
cachedPages or lru. */
inline static Page * getFreePage() {
Page * ret;
if(pageCount < MAX_BUFFER_SIZE) {
ret = pageMalloc();
pageCount++;
} else {
pageFree(ret,-1);
(*pagePinCountPtr(ret)) = 0;
(*pagePendingPtr(ret)) = 0;
pageSetNode(ret,0,0);
pageCount++;
} else {
if(!freeCount) {
ret = writeBackOnePage();
} else {
@ -98,11 +141,13 @@ inline static Page * getFreePage() {
freeList[freeCount-1] = 0;
freeCount--;
}
assert(ret);
if(freeCount < freeLowWater) {
pthread_cond_signal(&needFree);
}
}
assert(!*pagePinCountPtr(ret));
assert(!*pagePendingPtr(ret));
assert(!pageGetNode(ret,0));
return ret;
}
@ -114,13 +159,10 @@ static void * writeBackWorker(void * ignored) {
}
if(!running) { break; }
Page * victim = writeBackOnePage();
assert(freeCount < freeListLength);
freeList[freeCount] = victim;
freeCount++;
// pthread_mutex_unlock(&mut);
// pthread_mutex_lock(&mut);
checkPageState(victim);
}
pthread_mutex_unlock(&mut);
return 0;
@ -137,12 +179,11 @@ static Page * bhLoadPageImpl(int xid, const int pageid) {
pthread_mutex_lock(&mut);
// Is the page in cache?
Page * ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(int));
// Is the page already being read from disk? (If ret == 0, then no...)
while(ret) {
checkPageState(ret);
if(*pagePendingPtr(ret)) {
pthread_cond_wait(&readComplete, &mut);
if(ret->id != pageid) {
@ -153,10 +194,13 @@ static Page * bhLoadPageImpl(int xid, const int pageid) {
int locked = tryreadlock(ret->loadlatch,0);
assert(locked);
#endif
// XXX this can double free with (*)
if(! *pagePinCountPtr(ret) ) {
// Then ret is in lru (otherwise it would be pending, or not cached); remove it.
lru->remove(lru, ret);
}
(*pagePinCountPtr(ret))++;
checkPageState(ret);
pthread_mutex_unlock(&mut);
assert(ret->id == pageid);
return ret;
@ -170,25 +214,28 @@ static Page * bhLoadPageImpl(int xid, const int pageid) {
ret = getFreePage();
// Add a pending entry to cachedPages to block like-minded threads and writeback
*pagePendingPtr(ret) = 1;
(*pagePendingPtr(ret)) = 1;
check = LH_ENTRY(insert)(cachedPages,&pageid,sizeof(int), ret);
assert(!check);
ret->id = pageid;
// Now, it is safe to release the mutex; other threads won't
// try to read this page from disk.
pthread_mutex_unlock(&mut);
ret->id = pageid;
pageRead(ret);
pthread_mutex_lock(&mut);
*pagePendingPtr(ret) = 0;
(*pagePinCountPtr(ret))++;
// Would remove rom lru, but getFreePage() guarantees that it isn't
// Would remove from lru, but getFreePage() guarantees that it isn't
// there.
//lru->remove(lru, ret);
assert(!pageGetNode(ret, 0));
assert(!(*pagePinCountPtr(ret)));
(*pagePinCountPtr(ret))++;
#ifdef LATCH_SANITY_CHECKING
int locked = tryreadlock(ret->loadlatch, 0);
@ -196,16 +243,18 @@ static Page * bhLoadPageImpl(int xid, const int pageid) {
#endif
pthread_mutex_unlock(&mut);
pthread_cond_broadcast(&readComplete);
assert(ret->id == pageid);
checkPageState (ret);
return ret;
}
static void bhReleasePage(Page * p) {
pthread_mutex_lock(&mut);
checkPageState(p);
(*pagePinCountPtr(p))--;
if(!(*pagePinCountPtr(p))) {
assert(!pageGetNode(p, 0));
lru->insert(lru,p);
}
#ifdef LATCH_SANITY_CHECKING
@ -236,7 +285,7 @@ static void bhBufDeinit() {
free(freeList);
closePageFile();
// closePageFile();
lru->deinit(lru);
bufferPoolDeInit();
}
@ -244,6 +293,10 @@ void bhBufInit() {
assert(!running);
#ifdef LONG_RUN
printf("Using expensive bufferHash sanity checking.\n");
#endif
loadPageImpl = bhLoadPageImpl;
releasePage = bhReleasePage;
writeBackPage = bhWriteBackPage;
@ -253,7 +306,7 @@ void bhBufInit() {
bufferPoolInit();
openPageFile();
// openPageFile();
lru = lruFastInit(pageGetNode, pageSetNode, 0);
cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE);

View file

@ -24,14 +24,15 @@
static int stable = -1;
static pthread_mutex_t stable_mutex;
static pageid_t myLseekNoLock(int f, pageid_t offset, int whence);
static void pfForcePageFile();
static void pfClosePageFile();
inline static pageid_t myLseekNoLock(int f, pageid_t offset, int whence);
static int oldOffset = -1;
int pageFile_isDurable = 1;
void pageRead(Page *ret) {
static void pfPageRead(Page *ret) {
pageid_t pageoffset;
pageid_t offset;
@ -72,7 +73,7 @@ void pageRead(Page *ret) {
}
/** @todo need to sync the page file to disk occasionally, so that the
dirty page table can be kept up to date. */
void pageWrite(Page * ret) {
static void pfPageWrite(Page * ret) {
/** If the page is clean, there's no reason to write it out. */
assert(ret->dirty == dirtyPages_isDirty(ret));
if(!ret->dirty) {
@ -128,6 +129,11 @@ void pageWrite(Page * ret) {
/** @todo O_DIRECT is broken in older linuxes (eg 2.4). The build script should disable it on such platforms. */
void openPageFile() {
pageRead = pfPageRead;
pageWrite = pfPageWrite;
forcePageFile = pfForcePageFile;
closePageFile = pfClosePageFile;
DEBUG("Opening storefile.\n");
#ifdef PAGE_FILE_O_DIRECT
@ -150,7 +156,7 @@ void openPageFile() {
}
void forcePageFile() {
static void pfForcePageFile() {
if(pageFile_isDurable) {
#ifndef PAGE_FILE_O_DIRECT
#ifdef HAVE_FDATASYNC
@ -162,7 +168,7 @@ void forcePageFile() {
}
}
void closePageFile() {
static void pfClosePageFile() {
forcePageFile();
int ret = close(stable);

View file

@ -1,50 +1,13 @@
#ifndef __PAGE_FILE_H
#define __PAGE_FILE_H
/**
@todo this #include should be removed; almost nothing should
include pageFile.h
*/
#include <lladd/pageHandle.h>
#include "page.h"
/**
* Write page to disk, including correct LSN. Doing so may require a
* call to logSync(). There is not much that can be done to avoid
* this call right now. In the future, it might make sense to check
* to see if some other page can be kicked, in order to avoid the log
* flush.
*
* This funciton is automatically called immediately before a page is
* evicted from cache. Operation implementors, and normal users
* should never have to call this routine.
*
* @see bufferManager.c for the implementation of pageWrite
*
* @param dat The page to be flushed to disk.
*/
void pageWrite(Page * dat);
extern int pageFile_isDurable;
/**
Read a page from disk. This bypassess the cache, and should only be
called by bufferManager and blobManager. To retrieve a page under
normal circumstances, use loadPage() instead.
Operation implementors and normal users should never need to call
this routine.
@param ret A page struct, with id set correctly. The rest of this
struct will be overwritten by pageMap.
@see bufferManager.c for the implementation of pageRead.
@todo pageRead and pageWrite should be static, but pageCache needs
to call them.
*/
void pageRead(Page * ret);
void forcePageFile();
void openPageFile();
void closePageFile();
void finalize(Page * p);
#endif /* __PAGE_FILE_H */

View file

@ -1,16 +1,21 @@
#include <config.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <lladd/common.h>
#include "latches.h"
#include <lladd/transactional.h>
#include <lladd/recovery.h>
#include <lladd/bufferManager.h>
#include <lladd/consumer.h>
#include <lladd/lockManager.h>
#include <lladd/compensations.h>
#include "pageFile.h"
#include "page.h"
#include <lladd/logger/logger2.h>
#include <lladd/truncation.h>
#include <lladd/io/handle.h>
#include <stdio.h>
#include <assert.h>
#include "page/indirect.h"
@ -101,7 +106,25 @@ void setupOperationsTable() {
*/
}
// @todo this factory stuff doesn't really belong here...
static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) {
stasis_handle_t * h = stasis_handle(open_memory)(off);
//h = stasis_handle(open_debug)(h);
stasis_write_buffer_t * w = h->append_buffer(h, len);
w->h->release_write_buffer(w);
return h;
}
typedef struct sf_args {
char * filename;
int openMode;
int filePerm;
} sf_args;
static stasis_handle_t * slow_factory(void * argsP) {
sf_args * args = (sf_args*) argsP;
stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm);
//h = stasis_handle(open_debug)(h);
return h;
}
int Tinit() {
pthread_mutex_init(&transactional_2_mutex, NULL);
numActiveXactions = 0;
@ -112,6 +135,21 @@ int Tinit() {
dirtyPagesInit();
LogInit(loggerType);
pageInit();
struct sf_args * slow_arg = malloc(sizeof(sf_args));
slow_arg->filename = STORE_FILE;
#ifdef PAGE_FILE_O_DIRECT
slow_arg->openMode = O_CREAT | O_RDWR | O_DIRECT;
#else
slow_arg->openMode = O_CREAT | O_RDWR;
#endif
slow_arg->filePerm = FILE_PERM;
// Allow 4MB of outstanding writes.
stasis_handle_t * pageFile =
stasis_handle(open_non_blocking)(slow_factory, slow_arg, fast_factory,
NULL, 20, PAGE_SIZE * 1024, 1024);
pageHandleOpen(pageFile);
// openPageFile();
bufInit(bufferManagerType);
pageOperationsInit();
initNestedTopActions();
@ -319,6 +357,7 @@ int Tdeinit() {
truncationDeinit();
ThashDeinit();
bufDeinit();
closePageFile();
pageDeinit();
LogDeinit();
dirtyPagesDeinit();