New "default" buffer manager that aims to replace the current one, but

without deadlocks.  Still need to track down a memory corruption bug
that this introduces, or exercises.
This commit is contained in:
Sears Russell 2007-03-08 07:56:53 +00:00
parent eba02ee311
commit 61249c29a7
8 changed files with 324 additions and 7 deletions

View file

@ -94,6 +94,7 @@ terms specified in this license.
#define BUFFER_MANAGER_REOPEN 0
#define BUFFER_MANAGER_HASH 1
#define BUFFER_MANAGER_MEM_ARRAY 2
#define BUFFER_MANAGER_DEPRECATED_HASH 3
#define LOG_TO_FILE 0
#define LOG_TO_MEMORY 1

View file

@ -3,9 +3,6 @@
#include <lladd/bufferManager.h>
#define RO 0
#define RW 1
//Page * getPage(int pageid, int locktype);
/**
Implements lladd's caching policy. Looks up pageid in the cache.
@ -18,14 +15,17 @@
Engine Results"
If you would like to implement your own caching policy, implement
the three functions below. They are relatively straightforward.
the functions below. They are relatively straightforward.
Note that pageCache does not perform any file I/O of its own.
The implementation of this module does not need to be threadsafe.
@todo pageCache should not include page.h at all. It should treat
pages as (int, void*) pairs. (But the page struct contains the
pointers that pageCache manipulates..)
*/
void pageCacheInit(Page * first);
void pageCacheDeinit();

35
lladd/replacementPolicy.h Normal file
View file

@ -0,0 +1,35 @@
/**
@file
Implements cache replacement policies. Eventually, this could be
extended to support application specific caching schemes.
@todo Stasis used to use LRU-2S. LRU-2S is described in Markatos
"On Caching Searching Engine Results". (This needs to be
re-implemented properly.)
For now, Stasis uses plain-old LRU. DB-MIN would be an interesting
extension.
If you would like to implement your own caching policy, implement
the functions below. They are relatively straightforward. Note
that replacementPolicy implementations do not perform any file I/O
of their own.
The implementation of this module does not need to be threadsafe.
*/
typedef struct replacementPolicy {
struct replacementPolicy* (*init)();
void (*deinit) (struct replacementPolicy* impl);
void (*hit) (struct replacementPolicy* impl, int id);
void*(*getStale)(struct replacementPolicy* impl);
void*(*remove) (struct replacementPolicy* impl, int id);
void (*insert) (struct replacementPolicy* impl, int id, void * page);
void * impl;
} replacementPolicy;
replacementPolicy * lruInit();

View file

@ -19,7 +19,8 @@ liblladd_a_SOURCES=crc32.c redblack.c lhtable.c common.c stats.c io.c bufferMana
operations/pageOrientedListNTA.c operations/bTree.c \
operations/regions.c \
io/rangeTracker.c io/memory.c io/file.c io/non_blocking.c io/debug.c \
bufferManager/pageArray.c
bufferManager/pageArray.c bufferManager/bufferHash.c \
replacementPolicy/lru.c
# page/header.c logger/logMemory.c \ ringbuffer.c \ asdfas
#operations/lladdhash.c
#AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -47,8 +47,13 @@ terms specified in this license.
#include <config.h>
#define RO 0
#define RW 1
#ifdef PROFILE_LATCHES_WRITE_ONLY
#define _GNU_SOURCE
#include <stdio.h> // Need _GNU_SOURCE for asprintf
#include <lladd/lhtable.h>
@ -61,7 +66,7 @@ terms specified in this license.
#include <lladd/bufferManager.h>
#include "bufferManager/pageArray.h"
#include "bufferManager/bufferHash.h"
#include <lladd/bufferPool.h>
#include <lladd/lockManager.h>
@ -522,12 +527,15 @@ int bufInit(int type) {
type = lastType;
}
lastType = type;
if(type == BUFFER_MANAGER_HASH) {
if(type == BUFFER_MANAGER_DEPRECATED_HASH) {
bufManBufInit();
return 0;
} else if (type == BUFFER_MANAGER_MEM_ARRAY) {
paBufInit();
return 0;
} else if (type == BUFFER_MANAGER_HASH) {
bhBufInit();
return 0;
} else {
// XXX error handling
abort();

View file

@ -0,0 +1,262 @@
#include <pthread.h>
#include <config.h>
#include "bufferManager/bufferHash.h"
//#include <lladd/transactional.h>
#include <lladd/bufferPool.h>
#include <lladd/redblack.h>
#include <lladd/lhtable.h>
#include "latches.h"
#include <lladd/bufferPool.h>
#include "pageFile.h"
#include <lladd/replacementPolicy.h>
#include <lladd/bufferManager.h>
#include <assert.h>
static struct LH_ENTRY(table) * cachedPages;
static struct LH_ENTRY(table) * pendingPages;
static pthread_t worker;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t readComplete = PTHREAD_COND_INITIALIZER;
static pthread_cond_t haveFree = PTHREAD_COND_INITIALIZER;
static pthread_cond_t needFree = PTHREAD_COND_INITIALIZER;
static int freeLowWater;
static int freeListLength;
static int freeCount;
static int pageCount;
static Page ** freeList;
static replacementPolicy * lru;
static int running;
static Page * getFreePage() {
Page * ret;
if(pageCount < MAX_BUFFER_SIZE) {
ret = pageMalloc();
pageCount++;
} else {
while(!freeCount) {
pthread_cond_signal(&needFree);
pthread_cond_wait(&haveFree, &mut);
}
ret = freeList[freeCount-1];
freeList[freeCount-1] = 0;
freeCount--;
assert(ret);
if(freeCount < freeLowWater) {
pthread_cond_signal(&needFree);
}
}
return ret;
}
static void * writeBackWorker(void * ignored) {
pthread_mutex_lock(&mut);
while(1) {
while(running && (freeCount == freeListLength || pageCount < MAX_BUFFER_SIZE)) {
pthread_cond_wait(&needFree, &mut);
}
if(!running) { break; }
assert(freeCount < freeListLength);
Page * victim = lru->getStale(lru);
assert(victim);
int i = 0;
while(!trywritelock(victim->loadlatch,0)) {
// someone's pinned this page...
lru->hit(lru, victim->id);
victim = lru->getStale(lru);
i++;
if(i > MAX_BUFFER_SIZE) {
printf("Couldn't find a non-pinned page to write back. Aborting.\n");
abort();
}
if(! i % 100 ) {
printf("Severe thrashing detected. :(\n");
}
}
if(victim) {
// We have a write lock on victim.
lru->remove(lru, victim->id);
Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(int));
assert(old == victim);
pageWrite(victim);
freeList[freeCount] = victim;
freeCount++;
unlock(victim->loadlatch);
pthread_mutex_unlock(&mut);
pthread_cond_signal(&haveFree);
pthread_mutex_lock(&mut);
}
}
pthread_mutex_unlock(&mut);
return 0;
}
static Page * bhLoadPageImpl(int xid, int pageid) {
// Note: Calls to loadlatch in this function violate lock order, but
// should be safe, since we make sure no one can have a writelock
// before we grab the readlock.
void* check;
pthread_mutex_lock(&mut);
// Is the page in cache?
Page * ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(int));
if(ret) {
int locked = tryreadlock(ret->loadlatch, 0);
assert(locked);
pthread_mutex_unlock(&mut);
assert(ret->id == pageid);
return ret;
}
// Is the page already being read from disk?
intptr_t * pending = (intptr_t*) LH_ENTRY(find)(pendingPages,&pageid,sizeof(int));
while(pending) {
pthread_cond_wait(&readComplete, &mut);
ret = LH_ENTRY(find)(cachedPages, &pageid, sizeof(int));
pending = LH_ENTRY(find)(pendingPages,&pageid,sizeof(int));
if(ret) {
int locked = tryreadlock(ret->loadlatch,0);
assert(locked);
pthread_mutex_unlock(&mut);
assert(ret->id == pageid);
return ret;
}
}
assert(!pending && ! ret);
// Either:
// The page is not in cache, and was not pending.
// -or-
// The page was read then evicted since this function was
// called. It is now this thread's responsibility to read
// the page from disk.
// Add an entry to pending to block like-minded threads.
check = LH_ENTRY(insert)(pendingPages,&pageid,sizeof(int), (void*)1);
assert(!check);
// Now, it is safe to release the mutex; other threads won't
// try to read this page from disk.
// Remove a page from the freelist.
ret = getFreePage();
pthread_mutex_unlock(&mut);
ret->id = pageid;
pageRead(ret);
pthread_mutex_lock(&mut);
check = LH_ENTRY(remove)(pendingPages, &pageid,sizeof(int));
assert(check);
check = LH_ENTRY(insert)(cachedPages, &pageid, sizeof(int), ret);
assert(!check);
pthread_cond_broadcast(&readComplete);
lru->insert(lru, ret->id, ret);
int locked = tryreadlock(ret->loadlatch, 0);
assert(locked);
pthread_mutex_unlock(&mut);
assert(ret->id == pageid);
return ret;
}
static void bhReleasePage(Page * p) {
pthread_mutex_lock(&mut);
lru->hit(lru, p->id);
unlock(p->loadlatch);
pthread_mutex_unlock(&mut);
}
static void bhWriteBackPage(Page * p) {
pageWrite(p); //XXX Correct?!?
}
static void bhForcePages() {
forcePageFile(); // XXX Correct?!?
}
static void bhBufDeinit() {
running = 0;
pthread_cond_signal(&needFree); // Wake up the writeback thread so it will exit.
pthread_join(worker, 0);
struct LH_ENTRY(list) iter;
const struct LH_ENTRY(pair_t) * next;
LH_ENTRY(openlist)(cachedPages, &iter);
while((next = LH_ENTRY(readlist)(&iter))) {
pageWrite((next->value));
}
LH_ENTRY(closelist)(&iter);
LH_ENTRY(destroy)(cachedPages);
LH_ENTRY(openlist(pendingPages, &iter));
if((next = LH_ENTRY(readlist)(&iter))) {
abort(); // Pending loadPage during Tdeinit()!
}
LH_ENTRY(closelist)(&iter);
LH_ENTRY(destroy)(pendingPages);
free(freeList);
closePageFile();
lru->deinit(lru);
bufferPoolDeInit();
}
void bhBufInit() {
assert(!running);
loadPageImpl = bhLoadPageImpl;
releasePage = bhReleasePage;
writeBackPage = bhWriteBackPage;
forcePages = bhForcePages;
bufDeinit = bhBufDeinit;
simulateBufferManagerCrash = bhBufDeinit;
bufferPoolInit();
openPageFile();
lru = lruInit();
cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE);
pendingPages = LH_ENTRY(create)(MAX_BUFFER_SIZE/10);
freeListLength = MAX_BUFFER_SIZE / 10;
freeLowWater = freeListLength / 2;
freeCount = 0;
pageCount = 0;
freeList = calloc(freeListLength, sizeof(Page*));
running = 1;
pthread_create(&worker, 0, writeBackWorker, 0);
}

View file

@ -0,0 +1 @@
void bhBufInit();

View file

@ -0,0 +1,9 @@
#include <config.h>
#include <lladd/constants.h>
#include <lladd/replacementPolicy.h>
replacementPolicy * initReplacementPolicy(int impl) {
if(IMPL = REPLACEMENT_POLICY_LRU) {
}
}