pageCache is now re-entrant, in theory.

This commit is contained in:
Sears Russell 2004-07-15 00:42:36 +00:00
parent 9712e291e6
commit 12fc5665ab
11 changed files with 371 additions and 237 deletions

View file

@ -98,48 +98,28 @@ terms specified in this license.
int bufInit();
/**
* allocate a record
* allocate a record. This must be done in two phases. The first
* phase reserves a slot, and produces a log entry. The second phase
* sets up the slot according to the contents of the log entry.
*
* Ralloc implements the first phase.
*
* @param xid The active transaction.
* @param size The size of the new record
* @return allocated record
*
* @see slotRalloc the implementation of the second phase.
*/
recordid ralloc(int xid, long size);
/**
* allocate a record at a given slot. (Useful for recovery.)
*
* @see ralloc
*/
void slotRalloc(int pageid, lsn_t lsn, recordid rid);
/**
* Find a page with some free space.
*
*/
/* *
* This function updates the LSN of a page.
*
* This is needed by the
* recovery process to make sure that each action is undone or redone
* exactly once.
*
* @ param LSN The new LSN of the page.
* @ param pageid ID of the page you want to write
*
* @ todo This needs to be handled by ralloc and writeRecord for
* correctness. Right now, there is no way to atomically update a
* page(!) To fix this, we need to change bufferManager's
* implementation to use read/write (to prevent the OS from stealing
* pages in the middle of updates), and alter kickPage to see what the
* last LSN synced to disk was. If the log is too far behind, it will
* need to either choose a different page, or call flushLog(). We may
* need to implement a special version of fwrite() to do this
* atomically. (write does not have to write all of the data that was
* passed to it...)
*/
/*void writeLSN(long LSN, int pageid); */
/**
* @param pageid ID of page you want to read
* @return LSN found on disk
@ -164,39 +144,6 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat);
*/
void readRecord(int xid, recordid rid, void *dat);
/**
* Write page to disk, including correct LSN. Doing so may require a
* call to logSync(). There is not much that can be done to avoid
* this call right now. In the future, it might make sense to check
* to see if some other page can be kicked, in order to avoid the log
* flush.
*
* @param dat The page to be flushed to disk.
*/
/* void pageWrite(Page * dat); */
/**
Read a page from disk.
@param ret A page struct, with id set correctly. The rest of this
struct will be overwritten by pageMap.
*/
/* void pageRead(Page * ret); */
/* int flushPage(Page page); */
/*void pageMap(Page * page); */
/*
* this function does NOT write to disk, just drops the page from the active
* pages
* @param page to take out of buffer manager
* @return 0 on success
* @return error code on failure
int dropPage(Page page);
*/
/**
* all actions necessary when committing a transaction. Can assume that the log
* has been written as well as any other actions that do not depend on the
@ -236,8 +183,9 @@ int bufTransAbort(int xid, lsn_t lsn);
*/
void bufDeinit();
void addPendingEvent(int pageid);
void removePendingEvent(int pageid);
void setSlotType(int pageid, int slot, int type);
void addPendingEvent(int pageid);
void removePendingEvent(int pageid);
#endif

View file

@ -3,6 +3,6 @@
lib_LIBRARIES=liblladd.a
#liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a
# removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c
liblladd_a_SOURCES=common.c stats.c bufferManager.c linkedlist.c operations.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c #operations/lladdhash.c
liblladd_a_SOURCES=common.c stats.c bufferManager.c linkedlist.c operations.c pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c #operations/lladdhash.c
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -8,6 +8,7 @@
#include <lladd/constants.h>
#include "blobManager.h"
#include "pageFile.h"
#include <pbl/pbl.h>
#include <stdio.h>
@ -77,15 +78,6 @@ void closeBlobStore() {
pblHtDelete(dirtyBlobs);
}
long myFseek(FILE * f, long offset, int whence) {
long ret;
flockfile(f);
if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); }
if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); }
funlockfile(f);
return ret;
}
recordid preAllocBlob(int xid, long blobSize) {
long fileSize = myFseek(blobf1, 0, SEEK_END);
blob_record_t blob_rec;

View file

@ -47,23 +47,14 @@ terms specified in this license.
#include <config.h>
#include <lladd/common.h>
#include <assert.h>
#include <stdio.h>
#include <lladd/bufferManager.h>
#include "blobManager.h"
#include <lladd/pageCache.h>
#include "logger/logWriter.h"
/*#include "logger/logWriter.h" */
#include "page.h"
static FILE * stable = NULL;
/**
This function blocks until there are no events pending for this page.
*/
static void finalize(Page * page);
#include "pageFile.h"
@ -90,104 +81,18 @@ Page * loadPage(int pageid);
/* ** File I/O functions ** */
/* Defined in blobManager.c, but don't want to export this in any public .h files... */
long myFseek(FILE * f, long offset, int whence);
void pageRead(Page *ret) {
long fileSize = myFseek(stable, 0, SEEK_END);
long pageoffset = ret->id * PAGE_SIZE;
long offset;
DEBUG("Reading page %d\n", ret->id);
if(!ret->memAddr) {
ret->memAddr = malloc(PAGE_SIZE);
}
assert(ret->memAddr);
if ((ret->id)*PAGE_SIZE >= fileSize) {
myFseek(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET);
if(1 != fwrite("", 1, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
offset = myFseek(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
if(1 != fread(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof reading!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error reading stream! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
void pageWrite(Page * ret) {
long pageoffset = ret->id * PAGE_SIZE;
long offset = myFseek(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
assert(ret->memAddr);
DEBUG("Writing page %d\n", ret->id);
if(flushedLSN() < pageReadLSN(ret)) {
DEBUG("pageWrite is calling syncLog()!\n");
syncLog();
}
finalize(ret);
if(1 != fwrite(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof writing!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error writing stream! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
static void openPageFile() {
DEBUG("Opening storefile.\n");
if( ! (stable = fopen(STORE_FILE, "r+"))) { /* file may not exist */
byte* zero = calloc(1, PAGE_SIZE);
if(!(stable = fopen(STORE_FILE, "w+"))) { perror("Couldn't open or create store file"); abort(); }
/* Write out one page worth of zeros to get started. */
if(1 != fwrite(zero, PAGE_SIZE, 1, stable)) { assert (0); }
free(zero);
}
lastFreepage = 0;
pthread_mutex_init(&lastFreepage_mutex , NULL);
DEBUG("storefile opened.\n");
}
static void closePageFile() {
int ret = fclose(stable);
assert(!ret);
stable = NULL;
}
int bufInit() {
stable = NULL;
/* stable = NULL; */
pageInit();
openPageFile();
pageCacheInit();
openBlobStore();
lastFreepage = 0;
pthread_mutex_init(&lastFreepage_mutex , NULL);
return 0;
}
@ -206,9 +111,6 @@ void bufDeinit() {
void simulateBufferManagerCrash() {
closeBlobStore();
closePageFile();
/* close(stable);
stable = -1;*/
}
/* ** No file I/O below this line. ** */
@ -267,10 +169,6 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
/** @todo This assert should be here, but the tests are broken, so it causes bogus failures. */
/*assert(pageReadLSN(*p) <= lsn);*/
/** @todo Should pageWriteRecord take an LSN (so it can do the locking?) */
/* pageWriteRecord(xid, *p, rid, dat);
p->LSN = lsn;
pageWriteLSN(*p); */
pageWriteRecord(xid, p, rid, lsn, dat);
}
@ -304,6 +202,24 @@ int bufTransAbort(int xid, lsn_t lsn) {
return 0;
}
void setSlotType(int pageid, int slot, int type) {
Page * p = loadPage(pageid);
pageSetSlotType(p, slot, type);
}
/**
Inform bufferManager that a new event (such as an update) will be
performed on page pageid. This function may not be called on a
page after finalize() has been called on that page, and each call
to this function must be followed by a corresponding call to
removePendingEvent.
This function is called by the logger when CLR or UPDATE records
are written.
@see finalize, removePendingEvent
*/
void addPendingEvent(int pageid){
Page * p = loadPage(pageid);
@ -318,7 +234,19 @@ void addPendingEvent(int pageid){
}
/** @todo as implemented, loadPage() ... doOperation is not atomic!*/
/**
Because updates to a page might not happen in order, we need to
make sure that we've applied all updates to a page that we've heard
about before we flush that page to disk.
This method informs bufferManager that an update has been applied.
It is called by operations.c every time doUpdate, redoUpdate, or
undoUpdate is called.
@todo as implemented, loadPage() ... doOperation is not atomic!
*/
void removePendingEvent(int pageid) {
Page * p = loadPage(pageid);
@ -338,23 +266,3 @@ void removePendingEvent(int pageid) {
}
static void finalize(Page * p) {
pthread_mutex_lock(&(p->pending_mutex));
p->waiting++;
while(p->pending) {
pthread_cond_wait(&(p->noMorePending), &(p->pending_mutex));
}
pthread_mutex_unlock(&(p->pending_mutex));
return;
}
void setSlotType(int pageid, int slot, int type) {
Page * p = loadPage(pageid);
pageSetSlotType(p, slot, type);
}

View file

@ -47,7 +47,7 @@ terms specified in this license.
#include "logHandle.h"
#include "../latches.h"
#include "../pageFile.h"
#include <assert.h>
#include <stdio.h>
@ -123,11 +123,6 @@ pthread_mutex_t truncateLog_mutex;
/**
@todo Put myFseek, myFwrite in their own file, and make a header for it... */
void myFwrite(const void * dat, long size, FILE * f);
long myFseek(FILE * f, long offset, int whence);
int openLogWriter() {
log = fopen(LOG_FILE, "a+");
@ -519,13 +514,3 @@ lsn_t firstLogEntry() {
return global_offset + sizeof(lsn_t);
}
void myFwrite(const void * dat, long size, FILE * f) {
int nmemb = fwrite(dat, size, 1, f);
/* test */
if(nmemb != 1) {
perror("myFwrite");
abort();
/* return FILE_WRITE_OPEN_ERROR; */
}
}

View file

@ -102,11 +102,15 @@ typedef struct Page_s {
Record write *READ LOCK*
Write LSN Write lock
kickPage() does not require a lock, since it may not be called
if any threads could still be manipulating the page.
Since the bufferManager re-uses page structs, this lock is also
held when the page is being read or written to disk:
Any circumstance where these locks are held during an I/O operation
is a bug.
Write page to disk Write lock
Read page from disk Write lock
Any circumstance where one these locks are held during an I/O
operation is a bug. (Unless the I/O operation is reading or
writing the locked page to disk)
*/
void * rwlatch;
@ -215,7 +219,6 @@ int pageGetSlotType(Page * p, int slot, int type);
void pageSetSlotType(Page * p, int slot, int type);
END_C_DECLS
#endif

View file

@ -9,12 +9,14 @@
#include <lladd/common.h>
#include <lladd/pageCache.h>
#include <lladd/bufferManager.h>
#include "latches.h"
#include <assert.h>
#include <pbl/pbl.h>
#include <stdio.h>
#include "page.h"
#include "pageFile.h"
static pblHashTable_t *activePages; /* page lookup */
static unsigned int bufferSize; /* < MAX_BUFFER_SIZE */
static Page *repHead, *repMiddle, *repTail; /* replacement policy */
@ -52,6 +54,7 @@ void pageCacheDeinit() {
abort();
/ * exit(ret); * /
}*/
pageWrite(p);
}
pblHtDelete(activePages);
@ -115,21 +118,38 @@ static Page *kickPage(int pageid) {
Page *ret = repTail;
assert( bufferSize == MAX_BUFFER_SIZE );
qRemove(ret);
pblHtRemove(activePages, &ret->id, sizeof(int));
/* if( munmap(ret->memAddr, PAGE_SIZE) ) */
/*if(pageWrite(ret)) / * flushPage(*ret)) * /
assert( 0 ); */
/* It's almost safe to release the mutex here. The LRU-2
linked lists are in a consistent (but under-populated)
state, and there is no reference to the page that we're
holding in the hash table, so the data structures are
internally consistent.
The problem is that that loadPagePtr could be called
multiple times with the same pageid, so we need to check
for that, or we might load the same page into multiple
cache slots, which would cause consistency problems.
@todo Don't block while holding the loadPagePtr mutex!
*/
/*pthread_mutex_unlock(loadPagePtr_mutex);*/
pageWrite(ret);
/*pthread_mutex_lock(loadPagePtr_mutex);*/
writelock(ret->rwlatch, 121);
pageRealloc(ret, pageid);
middleInsert(ret);
pblHtInsert(activePages, &pageid, sizeof(int), ret);
/* pblHtInsert(activePages, &pageid, sizeof(int), ret); */
return ret;
}
@ -137,17 +157,26 @@ static Page *kickPage(int pageid) {
int lastPageId = -1;
Page * lastPage = 0;
static pthread_mutex_t loadPagePtr_mutex;
void *loadPagePtr(int pageid) {
/* lock activePages, bufferSize */
Page *ret;
pthread_mutex_lock(&(loadPagePtr_mutex));
if(lastPage && lastPageId == pageid) {
return lastPage;
void * ret = lastPage;
pthread_mutex_unlock(&(loadPagePtr_mutex));
return ret;
} else {
ret = pblHtLookup(activePages, &pageid, sizeof(int));
}
if( ret ) {
/** Don't need write lock for linked list manipulations. The loadPagePtr_mutex protects those operations. */
if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to worry about page sorting */
/* move to head */
if( ret != repHead ) {
@ -169,6 +198,7 @@ void *loadPagePtr(int pageid) {
lastPageId = pageid;
/* DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); */
pthread_mutex_unlock(&(loadPagePtr_mutex));
return ret;
} else if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to kick */
@ -178,10 +208,12 @@ void *loadPagePtr(int pageid) {
Page *iter;
ret = pageAlloc(pageid);
writelock(ret->rwlatch, 224);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
pblHtInsert( activePages, &pageid, sizeof(int), ret );
/* pblHtInsert( activePages, &pageid, sizeof(int), ret ); */
bufferSize++;
@ -204,21 +236,37 @@ void *loadPagePtr(int pageid) {
bufferSize++;
ret = pageAlloc(pageid);
writelock(ret->rwlatch, 224);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
assert(ret->next != ret && ret->prev != ret);
pblHtInsert( activePages, &pageid, sizeof(int), ret );
/* pblHtInsert( activePages, &pageid, sizeof(int), ret ); */
}
/* we now have a page we can dump info into */
assert( ret->id == pageid );
/*pageMap(ret); */
pageRead(ret);
pblHtInsert( activePages, &pageid, sizeof(int), ret );
lastPage = ret;
lastPageId = pageid;
/* release mutex for this function */
pthread_mutex_unlock(&(loadPagePtr_mutex));
pageRead(ret);
/* release write lock on the page. */
writeunlock(ret->rwlatch);
DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr);
return ret;

145
src/lladd/pageFile.c Normal file
View file

@ -0,0 +1,145 @@
/**
This file handles all of the file I/O for pages.
*/
#include <lladd/bufferManager.h>
#include "page.h"
#include "pageFile.h"
#include <assert.h>
#include "logger/logWriter.h"
static FILE * stable = NULL;
/**
This function blocks until there are no events pending for this page.
@see addPendingEvent(), removePendingEvent()
*/
static void finalize(Page * p) {
pthread_mutex_lock(&(p->pending_mutex));
p->waiting++;
while(p->pending) {
pthread_cond_wait(&(p->noMorePending), &(p->pending_mutex));
}
pthread_mutex_unlock(&(p->pending_mutex));
return;
}
/* This function is declared in page.h */
void pageRead(Page *ret) {
long fileSize = myFseek(stable, 0, SEEK_END);
long pageoffset = ret->id * PAGE_SIZE;
long offset;
DEBUG("Reading page %d\n", ret->id);
if(!ret->memAddr) {
ret->memAddr = malloc(PAGE_SIZE);
}
assert(ret->memAddr);
if ((ret->id)*PAGE_SIZE >= fileSize) {
myFseek(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET);
if(1 != fwrite("", 1, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
offset = myFseek(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
if(1 != fread(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof reading!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error reading stream! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
/* This function is declared in page.h */
void pageWrite(Page * ret) {
long pageoffset = ret->id * PAGE_SIZE;
long offset = myFseek(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
assert(ret->memAddr);
DEBUG("Writing page %d\n", ret->id);
/* Need to call finalize before checking the LSN. Once finalize
returns, we have exclusive access to this page, and can safely
write it to disk. */
finalize(ret);
if(flushedLSN() < pageReadLSN(ret)) {
DEBUG("pageWrite is calling syncLog()!\n");
syncLog();
}
if(1 != fwrite(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof writing!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error writing stream! %d", ferror(stable)); fflush(NULL); abort(); }
}
}
void openPageFile() {
DEBUG("Opening storefile.\n");
if( ! (stable = fopen(STORE_FILE, "r+"))) { /* file may not exist */
byte* zero = calloc(1, PAGE_SIZE);
if(!(stable = fopen(STORE_FILE, "w+"))) { perror("Couldn't open or create store file"); abort(); }
/* Write out one page worth of zeros to get started. */
if(1 != fwrite(zero, PAGE_SIZE, 1, stable)) { assert (0); }
free(zero);
}
DEBUG("storefile opened.\n");
}
void closePageFile() {
int ret = fclose(stable);
assert(!ret);
stable = NULL;
}
long myFseek(FILE * f, long offset, int whence) {
long ret;
flockfile(f);
if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); }
if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); }
funlockfile(f);
return ret;
}
void myFwrite(const void * dat, long size, FILE * f) {
int nmemb = fwrite(dat, size, 1, f);
/* test */
if(nmemb != 1) {
perror("myFwrite");
abort();
/* return FILE_WRITE_OPEN_ERROR; */
}
}

48
src/lladd/pageFile.h Normal file
View file

@ -0,0 +1,48 @@
#ifndef __PAGE_FILE_H
#define __PAGE_FILE_H
#include "page.h"
#include <stdio.h>
/**
* Write page to disk, including correct LSN. Doing so may require a
* call to logSync(). There is not much that can be done to avoid
* this call right now. In the future, it might make sense to check
* to see if some other page can be kicked, in order to avoid the log
* flush.
*
* This funciton is automatically called immediately before a page is
* evicted from cache. Operation implementors, and normal users
* should never have to call this routine.
*
* @see bufferManager.c for the implementation of pageWrite
*
* @param dat The page to be flushed to disk.
*/
void pageWrite(Page * dat);
/**
Read a page from disk. This bypassess the cache, and should only be
called by bufferManager and blobManager. To retrieve a page under
normal circumstances, use loadPage() instead.
Operation implementors and normal users should never need to call
this routine.
@param ret A page struct, with id set correctly. The rest of this
struct will be overwritten by pageMap.
@see bufferManager.c for the implementation of pageRead.
@todo pageRead and pageWrite should be static, but pageCache needs
to call them.
*/
void pageRead(Page * ret);
void openPageFile();
void closePageFile();
long myFseek(FILE * f, long offset, int whence);
void myFwrite(const void * dat, long size, FILE * f);
#endif /* __PAGE_FILE_H */

View file

@ -1,7 +1,7 @@
INCLUDES = @CHECK_CFLAGS@
if HAVE_CHECK
## Had to disable check_lht because lht needs to be rewritten.
TESTS = check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery
TESTS = check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager
else
TESTS =
endif

View file

@ -0,0 +1,57 @@
#include <lladd/transactional.h>
#include <config.h>
#include <check.h>
#include <lladd/transactional.h>
/*#include <lladd/logger/logEntry.h> */
#include "../../src/lladd/logger/logHandle.h"
#include "../../src/lladd/logger/logWriter.h"
#include "../../src/lladd/latches.h"
#include <sched.h>
#include <assert.h>
#include "../check_includes.h"
#define LOG_NAME "check_bufferMananger.log"
/**
@test
Spawns a bunch of threads, and has each one randomly load pages off of the disk.
The pages are inialized with unique values that are checked by the
threads as they load the pages.
In order to be effective, this test must create enough pages on
disk to make sure that loadPagePtr will eventually have to kick
pages.
*/
START_TEST(pageLoadTest)
{
fail_unless(0, "Write this test!");
} END_TEST
Suite * check_suite(void) {
Suite *s = suite_create("logWriter");
/* Begin a new test */
TCase *tc = tcase_create("writeNew");
/* Sub tests are added, one per line, here */
tcase_add_test(tc, pageLoadTest);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);
return s;
}
#include "../check_setup.h"