bufferMananger is (really!) re-entrant now, performed big refactoring to prevent deadlocks, check_transactional2 ran overnight without coring, w/ 25 threads, and 10000 allocate/write/reads per thread. (The whole test suite was run in a loop...)

This commit is contained in:
Sears Russell 2004-07-23 20:21:44 +00:00
parent 38ae73c146
commit 422198418a
25 changed files with 599 additions and 925 deletions

View file

@ -90,6 +90,32 @@ terms specified in this license.
#include <lladd/constants.h>
#include <lladd/transactional.h>
/**
Page is defined in bufferManager.h as an incomplete type to enforce
an abstraction barrier between page.h and the rest of the system.
If you need to muck with page internals, first consider the
implications that doing so has on locking. In particular, rwlatch
is currently entirely handled in page.c.
*/
typedef struct Page_s Page_s;
typedef struct Page_s Page;
/**
* @param pageid ID of the page you want to load
* @return fully formed Page type
* @return page with -1 ID if page not found
*/
Page * loadPage(int pageid);
/**
loadPage aquires a lock when it is called, effectively pinning it
in memory. realeasePage releases this lock.
*/
void releasePage(Page * p);
/**
* initialize buffer manager
* @return 0 on success
@ -118,13 +144,13 @@ recordid ralloc(int xid, long size);
*
* @see ralloc
*/
void slotRalloc(int pageid, lsn_t lsn, recordid rid);
void slotRalloc(Page * page, lsn_t lsn, recordid rid);
/**
* @param pageid ID of page you want to read
* @return LSN found on disk
*/
long readLSN(int pageid);
/*long readLSN(int pageid); */
/**
* @param xid transaction id @param lsn the lsn that the updated
@ -135,14 +161,14 @@ long readLSN(int pageid);
* @param rid recordid where you want to write @param dat data you
* wish to write
*/
void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat);
void writeRecord(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat);
/**
* @param xid transaction ID
* @param rid
* @param dat buffer for data
*/
void readRecord(int xid, recordid rid, void *dat);
void readRecord(int xid, Page * page, recordid rid, void *dat);
/**
* all actions necessary when committing a transaction. Can assume that the log
@ -183,9 +209,6 @@ int bufTransAbort(int xid, lsn_t lsn);
*/
void bufDeinit();
void setSlotType(int pageid, int slot, int type);
void addPendingEvent(int pageid);
void removePendingEvent(int pageid);
/*void setSlotType(int pageid, int slot, int type); */
#endif

View file

@ -107,7 +107,7 @@ lsn_t LogTransAbort(TransactionLog * l);
/**
LogUpdate writes an UPDATE log record to the log tail
*/
LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte * args);
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, const byte * args);
/* *
(Was folded into LogUpdate.)

View file

@ -56,6 +56,7 @@ terms specified in this license.
#include <lladd/constants.h>
#include <lladd/transactional.h>
#include <lladd/logger/logEntry.h>
#include <lladd/bufferManager.h>
BEGIN_C_DECLS
@ -63,7 +64,7 @@ BEGIN_C_DECLS
/* @type Function
* function pointer that the operation will run
*/
typedef int (*Function)(int xid, lsn_t lsn, recordid r, const void *d);
typedef int (*Function)(int xid, Page * p, lsn_t lsn, recordid r, const void *d);
/* @type Operation
@ -143,7 +144,7 @@ extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */
Does not write to the log, and assumes that the operation's
results are not already in the buffer manager.
*/
void doUpdate(const LogEntry * e);
void doUpdate(const LogEntry * e, Page * p);
/** Undo the update under normal operation, and during recovery.
Checks to see if the operation's results are reflected in the
@ -158,7 +159,7 @@ void doUpdate(const LogEntry * e);
@param e The log entry containing the operation to be undone.
@param clr_lsn The lsn of the clr that corresponds to this undo operation.
*/
void undoUpdate(const LogEntry * e, lsn_t clr_lsn);
void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn);
/**
Redoes an operation during recovery. This is different than
doUpdate because it checks to see if the operation needs to be redone

View file

@ -1,6 +1,12 @@
#ifndef __PAGECACHE_H
#define __PAGECACHE_H
#include <lladd/bufferManager.h>
#define RO 0
#define RW 1
Page * getPage(int pageid, int locktype);
/**
Implements lladd's caching policy. Looks up pageid in the cache.
If pageid doesn't exist, then allocate a new slot for it. If
@ -21,6 +27,6 @@
*/
void pageCacheInit();
void pageCacheDeinit();
void * loadPagePtr(int pageid);
/*Page * loadPage(int pageid); */
#endif

View file

@ -1,4 +0,0 @@
Splint 3.1.1 --- 23 Apr 2004
2: <invalid flag>
less: <invalid flag>

View file

@ -4,11 +4,12 @@
#include <assert.h>
#include <lladd/transactional.h>
#include "pageFile.h"
#include <lladd/bufferManager.h>
#include <lladd/constants.h>
#include "blobManager.h"
#include "pageFile.h"
#include <pbl/pbl.h>
#include <stdio.h>
@ -92,6 +93,8 @@ recordid preAllocBlob(int xid, long blobSize) {
recordid rid = Talloc(xid, sizeof(blob_record_t));
Page * p = loadPage(rid.page); /** @todo blob's are almost surely broken! */
/** Finally, fix up the fields in the record that points to the blob.
The rest of this also should go into alloc.c
*/
@ -100,9 +103,12 @@ recordid preAllocBlob(int xid, long blobSize) {
blob_rec.size = blobSize;
blob_rec.offset = fileSize;
setSlotType(rid.page, rid.slot, BLOB_SLOT);
pageSetSlotType(p, rid.slot, BLOB_SLOT);
rid.size = BLOB_SLOT;
releasePage(p);
/* Tset() needs to know to 'do the right thing' here, since we've
changed the size it has recorded for this record, and
writeRawRecord makes sure that that is the case. */
@ -110,6 +116,7 @@ recordid preAllocBlob(int xid, long blobSize) {
rid.size = blob_rec.size;
return rid;
}
@ -129,6 +136,8 @@ void allocBlob(int xid, lsn_t lsn, recordid rid) {
/* First in buffer manager. */
/* Read in record to get the correct offset, size for the blob*/
/** @todo blobs deadlock... */
readRawRecord(xid, rid, &blob_rec, sizeof(blob_record_t));
myFseek(blobf0, fileSize + rid.size - 1, SEEK_SET);

View file

@ -49,11 +49,13 @@ terms specified in this license.
#include <lladd/common.h>
#include <latches.h>
#include <assert.h>
#include "page.h"
#include <lladd/bufferManager.h>
#include "blobManager.h"
#include <lladd/pageCache.h>
#include "page.h"
#include "pageFile.h"
/**
@ -68,22 +70,10 @@ terms specified in this license.
*/
static pthread_mutex_t lastFreepage_mutex;
pthread_mutex_t add_pending_mutex;
static unsigned int lastFreepage = 0;
/**
* @param pageid ID of the page you want to load
* @return fully formed Page type
* @return page with -1 ID if page not found
*/
Page * loadPage(int pageid);
pthread_cond_t addPendingOK;
int bufInit() {
/* stable = NULL; */
pageInit();
openPageFile();
pageCacheInit();
@ -91,10 +81,6 @@ int bufInit() {
lastFreepage = 0;
pthread_mutex_init(&lastFreepage_mutex , NULL);
pthread_cond_init(&addPendingOK, NULL);
pthread_mutex_init(&add_pending_mutex, NULL);
return 0;
}
@ -117,10 +103,8 @@ void simulateBufferManagerCrash() {
/* ** No file I/O below this line. ** */
Page * loadPage (int pageid) {
Page * p = loadPagePtr(pageid);
assert (p->id == pageid);
return p;
void releasePage (Page * p) {
unlock(p->loadlatch);
}
Page * lastRallocPage = 0;
@ -138,13 +122,13 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
pthread_mutex_lock(&lastFreepage_mutex);
while(freespace(p = loadPage(lastFreepage)) < size ) {
unlock(p->loadlatch);
releasePage(p);
lastFreepage++;
}
ret = pageRalloc(p, size);
unlock(p->loadlatch);
releasePage(p);
pthread_mutex_unlock(&lastFreepage_mutex);
@ -153,22 +137,9 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
return ret;
}
void slotRalloc(int pageid, lsn_t lsn, recordid rid) {
Page * loadedPage = loadPage(rid.page);
pageSlotRalloc(loadedPage, lsn, rid);
unlock(loadedPage->loadlatch);
}
void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
long readLSN(int pageid) {
Page *p;
lsn_t lsn = pageReadLSN(p = loadPage(pageid));
unlock(p->loadlatch);
return lsn;
}
void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
Page *p;
/* Page *p; */
if(rid.size > BLOB_THRESHOLD_SIZE) {
/* DEBUG("Writing blob.\n"); */
@ -176,31 +147,31 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
} else {
/* DEBUG("Writing record.\n"); */
p = loadPage(rid.page); /* loadPagePtr(rid.page); */
assert( (p->id == rid.page) && (p->memAddr != NULL) );
/** @todo This assert should be here, but the tests are broken, so it causes bogus failures. */
/*assert(pageReadLSN(*p) <= lsn);*/
pageWriteRecord(xid, p, rid, lsn, dat);
unlock(p->loadlatch);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
}
}
void readRecord(int xid, recordid rid, void *buf) {
void readRecord(int xid, Page * p, recordid rid, void *buf) {
if(rid.size > BLOB_THRESHOLD_SIZE) {
/* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
/* @todo should readblob take a page pointer? */
readBlob(xid, rid, buf);
} else {
Page * p = loadPage(rid.page);
assert(rid.page == p->id);
/* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
pageReadRecord(xid, p, rid, buf);
assert(rid.page == p->id);
unlock(p->loadlatch);
}
}
@ -219,94 +190,3 @@ int bufTransAbort(int xid, lsn_t lsn) {
return 0;
}
void setSlotType(int pageid, int slot, int type) {
Page * p = loadPage(pageid);
pageSetSlotType(p, slot, type);
unlock(p->loadlatch);
}
/**
Inform bufferManager that a new event (such as an update) will be
performed on page pageid. This function may not be called on a
page after finalize() has been called on that page, and each call
to this function must be followed by a corresponding call to
removePendingEvent.
This function is called by the logger when CLR or UPDATE records
are written.
@see finalize, removePendingEvent
*/
/*void addPendingEvent(int pageid){
Page * p;
p = loadPage(pageid);
pthread_mutex_lock(&add_pending_mutex);
while(p->waiting) {
pthread_mutex_unlock(&add_pending_mutex);
unlock(p->loadlatch);
DEBUG("B");
pthread_mutex_lock(&add_pending_mutex);
pthread_cond_wait(&addPendingOK, &add_pending_mutex);
pthread_mutex_unlock(&add_pending_mutex);
p = loadPage(pageid);
pthread_mutex_lock(&add_pending_mutex);
}
p->pending++;
pthread_mutex_unlock(&add_pending_mutex);
unlock(p->loadlatch);
}*/
/**
Because updates to a page might not happen in order, we need to
make sure that we've applied all updates to a page that we've heard
about before we flush that page to disk.
This method informs bufferManager that an update has been applied.
It is called by operations.c every time doUpdate, redoUpdate, or
undoUpdate is called.
@todo as implemented, loadPage() ... doOperation is not atomic!
*/
/*void removePendingEvent(int pageid) {
Page * p;
p = loadPage(pageid);
pthread_mutex_lock(&(add_pending_mutex));
p->pending--;
assert(p->id == pageid);
assert(p->pending >= 0);
if(p->waiting && !p->pending) {
assert(p->waiting == 1);
pthread_cond_signal(&(p->noMorePending));
}
pthread_mutex_unlock(&(add_pending_mutex));
unlock(p->loadlatch);
}*/

View file

@ -208,7 +208,7 @@ int writeLogEntry(LogEntry * e) {
}
if(e->xid == -1) { /* Don't write log entries for recovery xacts. */
e->LSN = -1;
e->LSN = -1;
return 0;
}
@ -245,7 +245,7 @@ int writeLogEntry(LogEntry * e) {
e->LSN = nextAvailableLSN;
/* We have the write lock, so no-one else can call fseek behind our back. */
/* We have the write lock, so no one else can call fseek behind our back. */
/* flockfile(log); */ /* Prevent other threads from calling fseek... */
fseek(log, nextAvailableLSN - global_offset, SEEK_SET);
@ -332,9 +332,8 @@ static LogEntry * readLogEntry() {
long size, entrySize;
int nmemb;
if(feof(log)) {
return NULL;
return NULL;
}
nmemb = fread(&size, sizeof(long), 1, log);

View file

@ -81,7 +81,7 @@ lsn_t LogTransAbort(TransactionLog * l) {
return LogTransCommon(l, XABORT);
}
LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte * args) {
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, const byte * args) {
void * preImage = NULL;
long argSize = 0;
LogEntry * e;
@ -97,13 +97,13 @@ LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte
DEBUG("Creating %ld byte physical pre-image.\n", rid.size);
preImage = malloc(rid.size);
if(!preImage) { perror("malloc"); abort(); }
readRecord(l->xid, rid, preImage);
readRecord(l->xid, p, rid, preImage);
DEBUG("got preimage");
}
}
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage);
writeLogEntry(e);
writeLogEntry(e);
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize);

View file

@ -45,49 +45,56 @@ terms specified in this license.
#include <lladd/bufferManager.h>
#include <assert.h>
/** @todo questionable include */
#include "page.h"
#include <stdio.h>
Operation operationsTable[MAX_OPERATIONS];
void doUpdate(const LogEntry * e) {
void doUpdate(const LogEntry * e, Page * p) {
DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN);
operationsTable[e->contents.update.funcID].run(e->xid, e->LSN, e->contents.update.rid, getUpdateArgs(e));
/* removePendingEvent(e->contents.update.rid.page); */
operationsTable[e->contents.update.funcID].run(e->xid, p, e->LSN, e->contents.update.rid, getUpdateArgs(e));
}
void redoUpdate(const LogEntry * e) {
if(e->type == UPDATELOG) {
lsn_t pageLSN = readLSN(e->contents.update.rid.page);
#ifdef DEBUGGING
/* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */
recordid rid = e->contents.update.rid;
#endif
Page * p = loadPage(rid.page);
lsn_t pageLSN = pageReadLSN(p);
if(e->LSN > pageLSN) {
DEBUG("OPERATION Redo, %ld > %ld {%d %d %ld}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size);
doUpdate(e);
doUpdate(e, p);
} else {
DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %ld}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size);
/* removePendingEvent(e->contents.update.rid.page); */
}
releasePage(p);
} else if(e->type == CLRLOG) {
LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN);
#ifdef DEBUGGING
recordid rid = f->contents.update.rid;
#endif
Page * p = loadPage(rid.page);
assert(rid.page == e->contents.update.rid.page); /* @todo Should this always hold? */
/* See if the page contains the result of the undo that this CLR is supposed to perform. If it
doesn't, then undo the original operation. */
if(f->LSN > readLSN(e->contents.update.rid.page)) {
/* if(f->LSN > pageReadLSN(e->contents.update.rid.page)) { */
if(f->LSN > pageReadLSN(p)) {
DEBUG("OPERATION Undoing for clr, %ld {%d %d %ld}\n", f->LSN, rid.page, rid.slot, rid.size);
undoUpdate(f, e->LSN);
undoUpdate(f, p, e->LSN);
} else {
DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %ld}\n", f->LSN, rid.page, rid.slot, rid.size);
/* removePendingEvent(e->contents.update.rid.page); */
}
releasePage(p);
} else {
assert(0);
}
@ -95,7 +102,7 @@ void redoUpdate(const LogEntry * e) {
}
void undoUpdate(const LogEntry * e, lsn_t clr_lsn) {
void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
int undo = operationsTable[e->contents.update.funcID].undo;
DEBUG("OPERATION FuncID %d Undo op %d LSN %ld\n",e->contents.update.funcID, undo, clr_lsn);
@ -103,7 +110,8 @@ void undoUpdate(const LogEntry * e, lsn_t clr_lsn) {
#ifdef DEBUGGING
recordid rid = e->contents.update.rid;
#endif
lsn_t page_lsn = readLSN(e->contents.update.rid.page);
/* lsn_t page_lsn = readLSN(e->contents.update.rid.page); */
lsn_t page_lsn = pageReadLSN(p);
if(e->LSN <= page_lsn) {
/* Actually execute the undo */
@ -111,19 +119,17 @@ void undoUpdate(const LogEntry * e, lsn_t clr_lsn) {
/* Physical undo */
DEBUG("OPERATION Physical undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size);
writeRecord(e->xid, clr_lsn, e->contents.update.rid, getUpdatePreImage(e));
writeRecord(e->xid, p, clr_lsn, e->contents.update.rid, getUpdatePreImage(e));
} else {
/* @see doUpdate() */
/* printf("Logical undo"); fflush(NULL); */
DEBUG("OPERATION Logical undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size);
operationsTable[undo].run(e->xid, clr_lsn, e->contents.update.rid, getUpdateArgs(e));
operationsTable[undo].run(e->xid, p, clr_lsn, e->contents.update.rid, getUpdateArgs(e));
}
} else {
DEBUG("OPERATION Skipping undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size);
}
/* removePendingEvent(e->contents.update.rid.page); */
/* printf("Undo done."); fflush(NULL); */
}

View file

@ -6,6 +6,7 @@
#include <lladd/transactional.h>
#include <lladd/bufferManager.h>
#include "../blobManager.h"
#include "../page.h"
/**
@file
@ -29,7 +30,7 @@
*/
static int operate(int xid, lsn_t lsn, recordid rid, const void * dat) {
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
if(rid.size >= BLOB_THRESHOLD_SIZE) {
allocBlob(xid, lsn, rid);
} else {
@ -37,18 +38,18 @@ static int operate(int xid, lsn_t lsn, recordid rid, const void * dat) {
/* pageSlotRalloc(loadedPage, lsn, rid); */
/** Has no effect during normal operation. */
slotRalloc(rid.page, lsn, rid);
pageSlotRalloc(p, lsn, rid);
}
return 0;
}
/** @todo Currently, we just leak store space on dealloc. */
static int deoperate(int xid, lsn_t lsn, recordid rid, const void * dat) {
static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
/* Page * loadedPage = loadPage(rid.page); */
/** Has no effect during normal operation, other than updating the LSN. */
/* pageSlotRalloc(loadedPage, lsn, rid); */
slotRalloc(rid.page, lsn, rid);
pageSlotRalloc(p, lsn, rid);
return 0;
}

View file

@ -48,12 +48,12 @@ terms specified in this license.
#include <lladd/operations/decrement.h>
#include <lladd/bufferManager.h>
static int operate(int xid, lsn_t lsn, recordid r, const void *d) {
static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
int i;
readRecord(xid, r, &i);
readRecord(xid, p, r, &i);
i--;
writeRecord(xid, lsn, r, &i);
writeRecord(xid, p, lsn, r, &i);
return 0;
}

View file

@ -47,12 +47,13 @@ terms specified in this license.
#include <lladd/operations/increment.h>
#include <lladd/bufferManager.h>
static int operate(int xid, lsn_t lsn, recordid r, const void *d) {
static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
int i;
readRecord(xid, r, &i);
readRecord(xid, p, r, &i);
i++;
writeRecord(xid, lsn, r, &i);
writeRecord(xid, p, lsn, r, &i);
return 0;
}

View file

@ -52,7 +52,7 @@ terms specified in this license.
recordid prepare_bogus_rec = { 0, 0, 0};
static int operate(int xid, lsn_t lsn, recordid rid, const void *dat) {
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
syncLog();
return 0;
}

View file

@ -47,8 +47,8 @@ terms specified in this license.
#include <lladd/operations/set.h>
#include <lladd/bufferManager.h>
static int operate(int xid, lsn_t lsn, recordid rid, const void *dat) {
writeRecord(xid, lsn, rid, dat);
static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
writeRecord(xid, p, lsn, rid, dat);
return 0;
}

View file

@ -144,9 +144,11 @@ static pthread_mutex_t pageAllocMutex;
Page pool[MAX_BUFFER_SIZE+1];
/* ------------------ STATIC FUNCTIONS. NONE OF THESE ACQUIRE LOCKS
ON THE MEMORY THAT IS PASSED INTO THEM -------------*/
static int isValidSlot(byte *memAddr, int slot);
static void invalidateSlot(byte *memAddr, int slot);
void pageDeRalloc(Page * page, recordid rid);
/**
The caller of this function must already have a writelock on the
@ -154,53 +156,6 @@ void pageDeRalloc(Page * page, recordid rid);
*/
static void pageCompact(Page * page);
/**
* pageInit() initializes all the important variables needed in
* all the functions dealing with pages.
*/
void pageInit() {
nextPage = 0;
/**
* For now, we will assume that slots are 4 bytes long, and that the
* first two bytes are the offset, and the second two bytes are the
* the length. There are some functions at the bottom of this file
* that may be useful later if we decide to dynamically choose
* sizes for offset and length.
*/
/**
* the largest a slot length can be is the size of the page,
* and the greatest offset at which a record could possibly
* start is at the end of the page
*/
SLOT_LENGTH_SIZE = SLOT_OFFSET_SIZE = 2; /* in bytes */
SLOT_SIZE = SLOT_OFFSET_SIZE + SLOT_LENGTH_SIZE;
LSN_SIZE = sizeof(long);
FREE_SPACE_SIZE = NUMSLOTS_SIZE = 2;
/* START_OF_LSN is the offset in the page to the lsn */
START_OF_LSN = PAGE_SIZE - LSN_SIZE;
START_OF_FREE_SPACE = START_OF_LSN - FREE_SPACE_SIZE;
START_OF_NUMSLOTS = START_OF_FREE_SPACE - NUMSLOTS_SIZE;
MASK_0000FFFF = (1 << (2*BITS_PER_BYTE)) - 1;
MASK_FFFF0000 = ~MASK_0000FFFF;
pthread_mutex_init(&pageAllocMutex, NULL);
}
void pageCommit(int xid) {
/* rmTouch(xid); */
}
void pageAbort(int xid) {
/* rmTouch(xid); */
}
static int getFirstHalfOfWord(unsigned int *memAddr) {
unsigned int word = *memAddr;
word = (word >> (2*BITS_PER_BYTE)); /* & MASK_0000FFFF; */
@ -214,8 +169,7 @@ static int getSecondHalfOfWord(int *memAddr) {
return word;
}
void setFirstHalfOfWord(int *memAddr, int value){
static void setFirstHalfOfWord(int *memAddr, int value){
int word = *memAddr;
word = word & MASK_0000FFFF;
word = word | (value << (2*BITS_PER_BYTE));
@ -223,7 +177,7 @@ void setFirstHalfOfWord(int *memAddr, int value){
}
void setSecondHalfOfWord(int *memAddr, int value) {
static void setSecondHalfOfWord(int *memAddr, int value) {
int word = *memAddr;;
word = word & MASK_FFFF0000;
word = word | (value & MASK_0000FFFF);
@ -240,21 +194,6 @@ static const byte *slotMemAddr(const byte *memAddr, int slotNum) {
return (memAddr + PAGE_SIZE) - (LSN_SIZE + FREE_SPACE_SIZE + NUMSLOTS_SIZE + ((slotNum+1) * SLOT_SIZE));
}
/**
* pageReadLSN() assumes that the page is already loaded in memory. It takes
* as a parameter a Page and returns the LSN that is currently written on that
* page in memory.
*/
lsn_t pageReadLSN(const Page * page) {
lsn_t ret;
readlock(page->rwlatch, 259);
ret = *(long *)(page->memAddr + START_OF_LSN);
readunlock(page->rwlatch);
return ret;
}
/**
* pageWriteLSN() assumes that the page is already loaded in memory. It takes
* as a parameter a Page. The Page struct contains the new LSN and the page
@ -269,20 +208,6 @@ static void pageWriteLSN(Page * page) {
}
static int unlocked_freespace(Page * page);
/**
* freeSpace() assumes that the page is already loaded in memory. It takes
* as a parameter a Page, and returns an estimate of the amount of free space
* available to a new slot on this page. (This is the amount of unused space
* in the page, minus the size of a new slot entry.) This is either exact,
* or an underestimate.
*/
int freespace(Page * page) {
int ret;
readlock(page->rwlatch, 292);
ret = unlocked_freespace(page);
readunlock(page->rwlatch);
return ret;
}
/**
Just like freespace(), but doesn't obtain a lock. (So that other methods in this file can use it.)
@ -329,98 +254,6 @@ static void writeNumSlots(byte *memAddr, int numSlots) {
setFirstHalfOfWord((int*)(unsigned int*)(memAddr + START_OF_NUMSLOTS), numSlots);
}
recordid pageRalloc(Page * page, int size) {
int freeSpace;
int numSlots;
int i;
writelock(page->rwlatch, 342);
if(unlocked_freespace(page) < size) {
pageCompact(page);
/* Make sure there's enough free space... */
#ifdef DEBUGGING
assert (unlocked_freespace(page) >= (int)size); /*Expensive, so skip it when debugging is off. */
#endif
}
freeSpace = readFreeSpace(page->memAddr);
numSlots = readNumSlots(page->memAddr);
recordid rid;
rid.page = page->id;
rid.slot = numSlots;
rid.size = size;
/*
Reuse an old (invalid) slot entry. Why was this here?
@todo is slot reuse in page.c a performance bottleneck?
*/
for (i = 0; i < numSlots; i++) {
if (!isValidSlot(page->memAddr, i)) {
rid.slot = i;
break;
}
}
if (rid.slot == numSlots) {
writeNumSlots(page->memAddr, numSlots+1);
}
setSlotOffset(page->memAddr, rid.slot, freeSpace);
setSlotLength(page->memAddr, rid.slot, rid.size);
writeFreeSpace(page->memAddr, freeSpace + rid.size);
writeunlock(page->rwlatch);
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
return rid;
}
/** Only used for recovery, to make sure that consistent RID's are created
* on log playback. */
recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) {
int freeSpace;
int numSlots;
writelock(page->rwlatch, 376);
freeSpace = readFreeSpace(page->memAddr);
numSlots= readNumSlots(page->memAddr);
/* if(rid.size > BLOB_THRESHOLD_SIZE) {
return blobSlotAlloc(page, lsn_t lsn, recordid rid);
}*/
/* assert(rid.slot >= numSlots); */
if(rid.slot >= numSlots) {
if (freeSpace < rid.size) {
pageCompact(page);
freeSpace = readFreeSpace(page->memAddr);
assert (freeSpace < rid.size);
}
setSlotOffset(page->memAddr, rid.slot, freeSpace);
setSlotLength(page->memAddr, rid.slot, rid.size);
writeFreeSpace(page->memAddr, freeSpace + rid.size);
} else {
/* assert(rid.size == getSlotLength(page.memAddr, rid.slot)); */ /* Fails. Why? */
}
writeunlock(page->rwlatch);
return rid;
}
static int isValidSlot(byte *memAddr, int slot) {
return (getSlotOffset(memAddr, slot) != INVALID_SLOT) ? 1 : 0;
}
@ -430,11 +263,6 @@ static void invalidateSlot(byte *memAddr, int slot) {
}
void pageDeRalloc(Page * page, recordid rid) {
/* Don't need any locking, since we don't support concurrent access to the same slot.. */
invalidateSlot(page->memAddr, rid.slot);
}
/**
@ -525,13 +353,212 @@ static void setSlotOffset(byte *memAddr, int slot, int offset) {
static void setSlotLength(byte *memAddr, int slot, int length) {
setSecondHalfOfWord((int*)(unsigned int*)slotMemAddr(memAddr, slot), length);
}
/*
static int isBlobSlot(byte *pageMemAddr, int slot) {
return BLOB_SLOT == getSlotLength(pageMemAddr, slot);
}*/
static void pageReallocNoLock(Page *p, int id) {
p->id = id;
p->LSN = 0;
p->dirty = 0;
/* assert(p->pending == 0);
assert(p->waiting == 1);
p->waiting = 0;*/
}
/* ----- end static functions ----- */
/* ----- (de)initialization functions. Do not need to support multithreading. -----*/
/**
* pageInit() initializes all the important variables needed in
* all the functions dealing with pages.
*/
void pageInit() {
nextPage = 0;
/**
* For now, we will assume that slots are 4 bytes long, and that the
* first two bytes are the offset, and the second two bytes are the
* the length. There are some functions at the bottom of this file
* that may be useful later if we decide to dynamically choose
* sizes for offset and length.
*/
/**
* the largest a slot length can be is the size of the page,
* and the greatest offset at which a record could possibly
* start is at the end of the page
*/
SLOT_LENGTH_SIZE = SLOT_OFFSET_SIZE = 2; /* in bytes */
SLOT_SIZE = SLOT_OFFSET_SIZE + SLOT_LENGTH_SIZE;
LSN_SIZE = sizeof(long);
FREE_SPACE_SIZE = NUMSLOTS_SIZE = 2;
/* START_OF_LSN is the offset in the page to the lsn */
START_OF_LSN = PAGE_SIZE - LSN_SIZE;
START_OF_FREE_SPACE = START_OF_LSN - FREE_SPACE_SIZE;
START_OF_NUMSLOTS = START_OF_FREE_SPACE - NUMSLOTS_SIZE;
MASK_0000FFFF = (1 << (2*BITS_PER_BYTE)) - 1;
MASK_FFFF0000 = ~MASK_0000FFFF;
pthread_mutex_init(&pageAllocMutex, NULL);
}
void pageCommit(int xid) {
/* rmTouch(xid); */
}
void pageAbort(int xid) {
/* rmTouch(xid); */
}
/**
* pageReadLSN() assumes that the page is already loaded in memory. It takes
* as a parameter a Page and returns the LSN that is currently written on that
* page in memory.
*/
lsn_t pageReadLSN(const Page * page) {
lsn_t ret;
readlock(page->rwlatch, 259);
ret = *(long *)(page->memAddr + START_OF_LSN);
readunlock(page->rwlatch);
return ret;
}
/**
* freeSpace() assumes that the page is already loaded in memory. It takes
* as a parameter a Page, and returns an estimate of the amount of free space
* available to a new slot on this page. (This is the amount of unused space
* in the page, minus the size of a new slot entry.) This is either exact,
* or an underestimate.
*
* @todo is it ever safe to call freespace without a lock on the page?
*
*/
int freespace(Page * page) {
int ret;
readlock(page->rwlatch, 292);
ret = unlocked_freespace(page);
readunlock(page->rwlatch);
return ret;
}
recordid pageRalloc(Page * page, int size) {
int freeSpace;
int numSlots;
int i;
writelock(page->rwlatch, 342);
if(unlocked_freespace(page) < size) {
pageCompact(page);
/* Make sure there's enough free space... */
/*#ifdef DEBUGGING*/
assert (unlocked_freespace(page) >= (int)size); /*Expensive, so skip it when debugging is off. */
/*#endif */
}
freeSpace = readFreeSpace(page->memAddr);
numSlots = readNumSlots(page->memAddr);
recordid rid;
rid.page = page->id;
rid.slot = numSlots;
rid.size = size;
/*
Reuse an old (invalid) slot entry. Why was this here?
@todo is slot reuse in page.c a performance bottleneck?
*/
for (i = 0; i < numSlots; i++) {
if (!isValidSlot(page->memAddr, i)) {
rid.slot = i;
break;
}
}
if (rid.slot == numSlots) {
writeNumSlots(page->memAddr, numSlots+1);
}
setSlotOffset(page->memAddr, rid.slot, freeSpace);
setSlotLength(page->memAddr, rid.slot, rid.size);
writeFreeSpace(page->memAddr, freeSpace + rid.size);
writeunlock(page->rwlatch);
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
return rid;
}
/** Only used for recovery, to make sure that consistent RID's are created
* on log playback. */
recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) {
int freeSpace;
int numSlots;
writelock(page->rwlatch, 376);
freeSpace = readFreeSpace(page->memAddr);
numSlots= readNumSlots(page->memAddr);
/* printf("!"); fflush(NULL); */
/* if(rid.size > BLOB_THRESHOLD_SIZE) {
return blobSlotAlloc(page, lsn_t lsn, recordid rid);
}*/
/* assert(rid.slot >= numSlots); */
/** @todo for recovery, pageSlotRalloc assumes no other thread added a slot
between when ralloc and it were called. (This may be a
safe assumption..) */
if(getSlotLength(page->memAddr, rid.slot) == 0) {
/* if(rid.slot >= numSlots) { */
if (unlocked_freespace(page) < rid.size) { /*freeSpace < rid.size) { */
pageCompact(page);
freeSpace = readFreeSpace(page->memAddr);
assert (freeSpace < rid.size);
}
setSlotOffset(page->memAddr, rid.slot, freeSpace);
setSlotLength(page->memAddr, rid.slot, rid.size);
writeFreeSpace(page->memAddr, freeSpace + rid.size);
/* printf("?"); fflush(NULL);*/
} else {
assert((rid.size == getSlotLength(page->memAddr, rid.slot)) ||
(getSlotLength(page->memAddr, rid.slot) >= PAGE_SIZE)); /* Fails. Why? */
}
writeunlock(page->rwlatch);
return rid;
}
void pageDeRalloc(Page * page, recordid rid) {
readlock(page->rwlatch, 443);
invalidateSlot(page->memAddr, rid.slot);
unlock(page->rwlatch);
}
/*
This needs should trust the rid (since the caller needs to
This should trust the rid (since the caller needs to
override the size in special circumstances)
@todo If the rid size has been overridden, we should check to make
@ -539,24 +566,41 @@ static int isBlobSlot(byte *pageMemAddr, int slot) {
*/
void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) {
byte *recAddress;
int slot_length;
readlock(page->rwlatch, 519);
assert(page->id == rid.page);
recAddress = page->memAddr + getSlotOffset(page->memAddr, rid.slot);
slot_length = getSlotLength(page->memAddr, rid.slot);
/** @todo these assertions really *should* work... is slot length storage broken? */
/* assert((slot_length > 0) || (rid.size >= PAGE_SIZE)); */
/* assert(slot_length); */
/*assert */
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
/* if((rid.size == slot_length) || (slot_length >= PAGE_SIZE)) {
printf ("1");
} else {
printf ("2")
}*/
memcpy(buff, recAddress, rid.size);
readunlock(page->rwlatch);
unlock(page->rwlatch);
}
void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data) {
byte *rec;
writelock(page->rwlatch, 529);
int len;
readlock(page->rwlatch, 529);
assert(rid.size < PAGE_SIZE);
rec = page->memAddr + getSlotOffset(page->memAddr, rid.slot);
len = getSlotLength(page->memAddr, rid.slot);
assert(rid.size == len || len >= PAGE_SIZE);
if(memcpy(rec, data, rid.size) == NULL ) {
printf("ERROR: MEM_WRITE_ERROR on %s line %d", __FILE__, __LINE__);
exit(MEM_WRITE_ERROR);
@ -564,19 +608,10 @@ void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *
page->LSN = lsn;
pageWriteLSN(page);
writeunlock(page->rwlatch);
unlock(page->rwlatch);
}
void pageReallocNoLock(Page *p, int id) {
p->id = id;
p->LSN = 0;
p->dirty = 0;
/* assert(p->pending == 0);
assert(p->waiting == 1);
p->waiting = 0;*/
}
void pageRealloc(Page *p, int id) {
writelock(p->rwlatch, 10);
pageReallocNoLock(p,id);
@ -602,23 +637,13 @@ Page *pageAlloc(int id) {
page->rwlatch = initlock();
page->loadlatch = initlock();
/* pthread_mutex_init(&page->pending_mutex, NULL);*/
/* pthread_cond_init(&page->noMorePending, NULL); */
page->memAddr = malloc(PAGE_SIZE);
nextPage++;
assert(nextPage <= MAX_BUFFER_SIZE + 1); /* There's a dummy page that we need to keep around, thus the +1 */
/* uggh. Really just want to pass pages by reference */
/* page->pending = malloc(sizeof(int)); */
pthread_mutex_unlock(&pageAllocMutex);
/**@todo if re-implement pending event thing, these lines need to be protected by a lock!? */
/* page->pending = 0;
page->waiting = 1; */
return page;
}

View file

@ -61,7 +61,11 @@ terms specified in this license.
#include <lladd/transactional.h>
/*#ifdef __BUFFERMANAGER_H__
#error bufferManager.h must be included after page.h
#endif*/
#include <lladd/bufferManager.h>
BEGIN_C_DECLS
/**
@ -76,7 +80,7 @@ BEGIN_C_DECLS
pointers). This is starting to become cumbersome, as the page
struct is becoming more complex...)
*/
typedef struct Page_s {
struct Page_s {
/** @todo Shouldn't Page.id be a long? */
int id;
/** @todo The Page.LSN field seems extraneous. Why do we need it? */
@ -180,9 +184,7 @@ typedef struct Page_s {
*/
/* int pending; */
} Page;
extern pthread_cond_t addPendingOK;
};
/**
* initializes all the important variables needed in all the
@ -242,13 +244,14 @@ void pageCommit(int xid);
void pageAbort(int xid);
void pageReallocNoLock(Page * p, int id);
/*void pageReallocNoLock(Page * p, int id); */
/** @todo Do we need a locking version of pageRealloc? */
void pageRealloc(Page * p, int id);
Page* pageAlloc(int id);
recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid);
void pageDeRalloc(Page * page, recordid rid);
/*int pageTest(); */

View file

@ -8,6 +8,7 @@
#include <config.h>
#include <lladd/common.h>
#include "latches.h"
#include "page.h"
#include <lladd/pageCache.h>
#include <lladd/bufferManager.h>
@ -15,7 +16,6 @@
#include <pbl/pbl.h>
#include <stdio.h>
#include "page.h"
#include "pageFile.h"
static pblHashTable_t *activePages; /* page lookup */
static unsigned int bufferSize; /* < MAX_BUFFER_SIZE */
@ -124,11 +124,10 @@ static void qRemove(Page *ret) {
assert(state == FULL);
/* assert( bufferSize == MAX_BUFFER_SIZE ); */
assert(ret->next != ret && ret->prev != ret);
assert(ret->next != ret && ret->prev != ret);
if( ret->prev )
ret->prev->next = ret->next;
ret->prev->next = ret->next;
else /* is head */
repHead = ret->next; /* won't have head == tail because of test in loadPage */
if( ret->next ) {
@ -145,66 +144,6 @@ static void qRemove(Page *ret) {
assert(ret != repTail);
assert(ret != repHead);
}
/*
static Page *getFreePage() {
Page *ret;
if( state == FULL ) { / * kick * /
ret = repTail;
/ ** Make sure no one else will try to reuse this page. * /
cacheRemovePage(ret);
/ ** Temporarily drop the mutex while we wait for outstanding
operations on the page to complete. * /
pthread_mutex_unlock(&loadPagePtr_mutex);
/ ** @ todo getFreePage (finalize) needs to yield the getPage mutex,
but also needs to remove a page from the kick list before
doing so. If there is a cache hit on the page that's been
removed from the kick list, then the cache eviction policy
code needs o know this, and ignore the hit. -- Done. * /
/ * finalize(ret); * /
/ * ret->waiting++; * / / * @todo remove waiting / pending fields.. * /
/ * This cannot deadlock because each thread can
only have outstanding pending events on the
page that it's accessing, but they can only
hold that lock if the page is in cache. If the
page is in cache, then the thread surely isn't
here! Therefore any threads that finalize will
block on can not possibly be blocking on this
thread's latches. * /
/ * writelock(ret->loadlatch, 181); * / / * Don't need the lock here--No one else has a pointer to this page! * /
pthread_mutex_lock(&loadPagePtr_mutex);
/ * Now that finalize returned, pull ret out of the cache's lookup table. * /
/ * pblHtRemove(activePages, &ret->id, sizeof(int)); * /
} else {
ret = pageAlloc(-1);
ret->id = -1;
ret->inCache = 0;
/ * writelock(ret->loadlatch, 166); * /
}
return ret;
}
*/
#define RO 0
#define RW 1
Page * getPage(int pageid, int locktype) {
Page * ret;
@ -214,291 +153,118 @@ Page * getPage(int pageid, int locktype) {
if(ret) {
readlock(ret->loadlatch, 217);
//writelock(ret->loadlatch, 217);
}
while (ret && ret->id != pageid) {
while (ret && (ret->id != pageid)) {
unlock(ret->loadlatch);
pthread_mutex_unlock(&loadPagePtr_mutex);
sched_yield();
pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
// writelock(ret->loadlatch, 217);
readlock(ret->loadlatch, 217);
}
spin++;
if(spin > 10000) {
printf("GetPage stuck!");
printf("GetPage is stuck!");
}
}
if(ret) {
cacheHitOnPage(ret);
assert(ret->id == -1 || ret->id == pageid);
assert(ret->id == pageid);
pthread_mutex_unlock(&loadPagePtr_mutex);
} else {
ret = dummy_page;
readlock(ret->loadlatch, 232);
}
if(ret->id != pageid) {
/* If ret is null, then we know that:
unlock(ret->loadlatch);
a) there is no cache entry for pageid
b) this is the only thread that has gotten this far,
and that will try to add an entry for pageid
c) the most recent version of this page has been
written to the OS's file cache. */
int oldid = -1;
if( state == FULL ) {
/* Select an item from cache, and remove it atomicly. (So it's
only reclaimed once) */
ret = repTail;
cacheRemovePage(ret);
oldid = ret->id;
assert(oldid != pageid);
} else {
ret = pageAlloc(-1);
ret->id = -1;
ret->inCache = 0;
}
writelock(ret->loadlatch, 217);
cacheInsertPage(ret);
/* Inserting this into the cache before releasing the mutex
ensures that constraint (b) above holds. */
pblHtInsert(activePages, &pageid, sizeof(int), ret);
pblHtRemove(activePages, &(ret->id), sizeof(int));
pthread_mutex_unlock(&loadPagePtr_mutex);
/*new*/ /*writelock(ret->loadlatch, 217);*/
/* Could writelock(ret) go here? */
assert(ret != dummy_page);
if(ret->id != -1) {
assert(ret != dummy_page);
pageWrite(ret);
}
pageRealloc(ret, pageid);
pageRead(ret);
/*new*/
/* pthread_mutex_lock(&loadPagePtr_mutex);
pblHtRemove(activePages, &(ret->id), sizeof(int));
pthread_mutex_unlock(&loadPagePtr_mutex); */
/*new*/
downgradelock(ret->loadlatch);
} else {
pthread_mutex_unlock(&loadPagePtr_mutex);
}
assert(ret->id == pageid);
return ret;
}
/* Page * getPageOld(int pageid, int locktype) {
Page * ret;
int spin = 0;
assert(0);
/ ** This wonderful bit of code avoids deadlocks.
Locking works like this:
a) get a HT mutex, lookup pageid, did we get a pointer?
- yes, release the mutex, so we don't deadlock getting a page lock.
- no, keep the mutex, move on to the next part of the function.
b) lock whatever pointer the HT returned. (Safe, since the memory + mutex are allocated exactly one time.)
c) did we get the right page?
- yes, success!
- no, goto (a)
* /
pthread_mutex_lock(&loadPagePtr_mutex);
do {
do {
if(spin) {
sched_yield();
}
spin ++;
if(spin > 1000 && (spin % 10000 == 0)) {
DEBUG("Spinning in pageCache's hashtable lookup: %d\n", spin);
}
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
/ * pthread_mutex_unlock(&loadPagePtr_mutex); * /
if(locktype == RO) {
readlock(ret->loadlatch, 147);
} else {
writelock(ret->loadlatch, 149);
}
/ * pthread_mutex_lock(&loadPagePtr_mutex); * /
writeunlock(ret->loadlatch);
}
pthread_mutex_lock(&loadPagePtr_mutex);
} while (ret && (ret->id != pageid));
/* pblHtRemove(activePages, &(ret->id), sizeof(int)); */
pblHtRemove(activePages, &(oldid), sizeof(int));
if(ret) {
cacheHitOnPage(ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
assert(ret->id == pageid);
return ret;
}
/ * OK, we need to handle a cache miss. This is also tricky.
If getFreePage needs to kick a page, then it will need a
writeLock on the thing it kicks, so we drop our mutex here.
But, before we do that, we need to make sure that no one else
tries to load our page. We do this by inserting a dummy entry in
the cache. Since it's pageid won't match the pageid that we're
inserting, other threads will spin in the do..while loop untile
we've loaded the page.
* /
pblHtInsert(activePages, &pageid, sizeof(int), dummy_page);
ret = getFreePage();
/ * ret is now a pointer that no other thread has access to, and we
hold a write lock on it * /
pblHtRemove(activePages, &pageid, sizeof(int));
pblHtInsert(activePages, &pageid, sizeof(int), ret);
/ * writes were here... * /
/ * pthread_mutex_unlock(&loadPagePtr_mutex); * /
if(ret->id != -1) {
pageWrite(ret);
}
pageRealloc(ret, pageid);
pageRead(ret);
/ * pthread_mutex_lock(&loadPagePtr_mutex); * /
assert(ret->inCache == 0);
/* Put off putting this back into cache until we're done with
it. -- This could cause the cache to empty out if the ratio of
threads to buffer slots is above ~ 1/3, but it decreases the
liklihood of thrashing. */
cacheInsertPage(ret);
assert(ret->inCache == 1);
pthread_mutex_unlock(&loadPagePtr_mutex);
if(locktype == RO) {
readlock(ret->loadlatch, 314);
} else {
writelock(ret->loadlatch, 316);
/* downgradelock(ret->loadlatch); */
// writelock(ret->loadlatch, 217);
readlock(ret->loadlatch, 217);
if(ret->id != pageid) {
unlock(ret->loadlatch);
printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n");
fflush(NULL);
return getPage(pageid, locktype);
}
/* } else {
if(locktype == RO) {
downgradelock(ret->loadlatch);
}
} while (ret->id != pageid);
pthread_mutex_unlock(&loadPagePtr_mutex);
} */
}
assert(ret->id == pageid);
return ret;
}*/
/*
static Page *kickPage(int pageid) {
Page *ret = repTail;
assert( bufferSize == MAX_BUFFER_SIZE );
qRemove(ret);
pblHtRemove(activePages, &ret->id, sizeof(int));
/ * It's almost safe to release the mutex here. The LRU-2
linked lists are in a consistent (but under-populated)
state, and there is no reference to the page that we're
holding in the hash table, so the data structures are
internally consistent.
The problem is that that loadPagePtr could be called
multiple times with the same pageid, so we need to check
for that, or we might load the same page into multiple
cache slots, which would cause consistency problems.
@todo Don't block while holding the loadPagePtr mutex!
* /
/ *pthread_mutex_unlock(loadPagePtr_mutex);* /
/ *pthread_mutex_lock(loadPagePtr_mutex);* /
writelock(ret->rwlatch, 121);
/ * pblHtInsert(activePages, &pageid, sizeof(int), ret); * /
return ret;
}
int lastPageId = -1;
Page * lastPage = 0;
*/
/*
static void noteRead(Page * ret) {
if( bufferSize == MAX_BUFFER_SIZE ) { / * we need to worry about page sorting * /
/ * move to head * /
if( ret != repHead ) {
qRemove(ret);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
if( ret->queue == 2 ) {
/ * keep first queue same size * /
repMiddle = repMiddle->prev;
repMiddle->queue = 2;
ret->queue = 1;
}
}
}
}
void loadPagePtrFoo(int pageid, int readOnly) {
Page * ret;
pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
getPage(
if(ret) {
if(readOnly) {
readlock(ret->rwlatch, 178);
} else {
writelock(ret->rwlatch, 180);
}
noteRead(ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
} else if(bufferSize == MAX_BUFFER_SIZE - 1) {
}
}
*/
static void cacheInsertPage (Page * ret) {
bufferSize++;
assert(!ret->inCache);
@ -563,128 +329,7 @@ static void cacheHitOnPage(Page * ret) {
}
}
void *loadPagePtr(int pageid) {
Page * ret = getPage(pageid, RO);
Page *loadPage(int pageid) {
Page * ret = getPage(pageid, RW);
return ret;
}
/** @todo loadPagePtr needs to aquire the page read/write lock -- if it does, then should page.c do any locking? */
/*void *loadPagePtr(int pageid) {
/ * lock activePages, bufferSize * /
Page *ret;
pthread_mutex_lock(&(loadPagePtr_mutex));
if(lastPage && lastPageId == pageid) {
void * ret = lastPage;
pthread_mutex_unlock(&(loadPagePtr_mutex));
return ret;
} else {
ret = pblHtLookup(activePages, &pageid, sizeof(int));
}
if( ret ) {
/ ** Don't need write lock for linked list manipulations. The loadPagePtr_mutex protects those operations. * /
if( bufferSize == MAX_BUFFER_SIZE ) { / * we need to worry about page sorting * /
/ * move to head * /
if( ret != repHead ) {
qRemove(ret);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
if( ret->queue == 2 ) {
/ * keep first queue same size * /
repMiddle = repMiddle->prev;
repMiddle->queue = 2;
ret->queue = 1;
}
}
}
lastPage = ret;
lastPageId = pageid;
/ * DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); * /
pthread_mutex_unlock(&(loadPagePtr_mutex));
return ret;
} else if( bufferSize == MAX_BUFFER_SIZE ) { / * we need to kick * /
ret = kickPage(pageid);
pageWrite(ret);
pageRealloc(ret, pageid);
middleInsert(ret);
} else if( bufferSize == MAX_BUFFER_SIZE-1 ) { / * we need to setup kickPage mechanism * /
int i;
Page *iter;
ret = pageAlloc(pageid);
bufferSize++;
pageRealloc(ret, pageid);
writelock(ret->rwlatch, 224);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
/ * split up queue:
* "in all cases studied ... fixing the primary region to 30% ...
* resulted in the best performance"
* /
repMiddle = repHead;
for( i = 0; i < MAX_BUFFER_SIZE / 3; i++ ) {
repMiddle->queue = 1;
repMiddle = repMiddle->next;
}
for( iter = repMiddle; iter; iter = iter->next ) {
iter->queue = 2;
}
} else { / * we are adding to an nonfull queue * /
bufferSize++;
ret = pageAlloc(pageid);
pageRealloc(ret, pageid);
writelock(ret->rwlatch, 224);
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
assert(ret->next != ret && ret->prev != ret);
}
/ * we now have a page we can dump info into * /
assert( ret->id == pageid );
pblHtInsert( activePages, &pageid, sizeof(int), ret );
lastPage = ret;
lastPageId = pageid;
/ * release mutex for this function * /
pthread_mutex_unlock(&(loadPagePtr_mutex));
pageRead(ret);
/ * release write lock on the page. * /
writeunlock(ret->rwlatch);
/ * DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); * /
return ret;
}
*/

View file

@ -4,9 +4,10 @@
*/
#include "page.h"
#include <lladd/bufferManager.h>
#include "page.h"
#include "pageFile.h"
#include <assert.h>
#include "logger/logWriter.h"
@ -42,9 +43,6 @@ extern pthread_mutex_t add_pending_mutex;
return;
}*/
/* This function is declared in page.h */
void pageRead(Page *ret) {
long fileSize;
@ -92,7 +90,6 @@ void pageRead(Page *ret) {
}
/* This function is declared in page.h */
void pageWrite(Page * ret) {
long pageoffset = ret->id * PAGE_SIZE;

View file

@ -18,6 +18,12 @@
#include "logger/logWriter.h"
#include <lladd/bufferManager.h>
/** @todo recovery2.c shouldn't include pageCache.h once refactoring is done. */
#include <lladd/pageCache.h>
/** @todo questionable include? */
#include "page.h"
#include <lladd/transactional.h>
#include <stdio.h>
@ -200,30 +206,33 @@ static void Undo(int recovery) {
/* printf("."); fflush(NULL); */
switch(e->type) {
case UPDATELOG:
/* Sanity check. If this fails, we've already undone this
update, or something is wrong with the redo phase or normal operation. */
this_lsn= readLSN(e->contents.update.rid.page);
/* printf("1"); fflush(NULL); */
assert(e->LSN <= this_lsn);
/* printf("1a"); fflush(NULL); */
/* Need to log a clr here. */
clr_lsn = LogCLR(e);
/* Undo update is a no-op if the page does not reflect this
update, but it will write the new clr_lsn if necessary. */
undoUpdate(e, clr_lsn);
/* printf("1b"); fflush(NULL); */
break;
{
/* Need write lock for undo.. */
Page * p = getPage(e->contents.update.rid.page, RW);
/* Sanity check. If this fails, we've already undone this
update, or something is wrong with the redo phase or normal operation. */
this_lsn= pageReadLSN(p); /* e->contents.update.rid.page); */
/* printf("1"); fflush(NULL); */
assert(e->LSN <= this_lsn);
/* printf("1a"); fflush(NULL); */
/* Need to log a clr here. */
clr_lsn = LogCLR(e);
/* Undo update is a no-op if the page does not reflect this
update, but it will write the new clr_lsn if necessary. */
undoUpdate(e, p, clr_lsn);
/* printf("1b"); fflush(NULL); */
releasePage(p);
break;
}
case CLRLOG:
/* Don't need to do anything special to handle CLR's.
Iterator will correctly jump to clr's previous undo record. */

View file

@ -11,8 +11,6 @@
#include <stdio.h>
#include <assert.h>
#include <page.h>
TransactionLog XactionTable[MAX_TRANSACTIONS];
int numActiveXactions = 0;
int xidCount = 0;
@ -81,13 +79,14 @@ int Tbegin() {
}
xidCount_tmp = xidCount;
/* Don't want to block while we're logging... */
pthread_mutex_unlock(&transactional_2_mutex);
/** @todo Don't want to block while we're logging... */
assert( i < MAX_TRANSACTIONS );
XactionTable[index] = LogTransBegin(xidCount_tmp);
pthread_mutex_unlock(&transactional_2_mutex);
return XactionTable[index].xid;
}
@ -102,20 +101,22 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
p = loadPage(rid.page);
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], rid, op, dat);
/* KLUDGE re-enable loggging!*/
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
doUpdate(e, p);
unlock(p->loadlatch);
releasePage(p);
}
/* @todo what about locking? */
void Tread(int xid, recordid rid, void * dat) {
readRecord(xid, rid, dat);
Page * p = loadPage(rid.page);
readRecord(xid, p, rid, dat);
releasePage(p);
}
int Tcommit(int xid) {
@ -123,31 +124,36 @@ int Tcommit(int xid) {
#ifdef DEBUGGING
pthread_mutex_lock(&transactional_2_mutex);
assert(numActiveXactions <= MAX_TRANSACTIONS);
pthread_mutex_unlock(&transactional_2_mutex);
#endif
lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]);
bufTransCommit(xid, lsn); /* unlocks pages */
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
pthread_mutex_lock(&transactional_2_mutex);
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
numActiveXactions--;
assert( numActiveXactions >= 0 );
pthread_mutex_unlock(&transactional_2_mutex);
return 0;
}
int Tabort(int xid) {
lsn_t lsn;
lsn = LogTransAbort(&XactionTable[xid%MAX_TRANSACTIONS]);
TransactionLog * t =&XactionTable[xid%MAX_TRANSACTIONS];
/* @todo is the order of the next two calls important? */
undoTrans(XactionTable[xid%MAX_TRANSACTIONS]);
bufTransAbort(xid, lsn);
lsn = LogTransAbort(t /*&XactionTable[xid%MAX_TRANSACTIONS]*/);
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
/** @todo is the order of the next two calls important? */
undoTrans(*t/*XactionTable[xid%MAX_TRANSACTIONS]*/);
bufTransAbort(xid, lsn);
pthread_mutex_lock(&transactional_2_mutex);
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
numActiveXactions--;
assert( numActiveXactions >= 0 );
pthread_mutex_unlock(&transactional_2_mutex);
@ -172,6 +178,7 @@ int Tdeinit() {
void Trevive(int xid, long lsn) {
int index = xid % MAX_TRANSACTIONS;
pthread_mutex_lock(&transactional_2_mutex);
if(XactionTable[index].xid != INVALID_XTABLE_XID) {
if(xid != XactionTable[index].xid) {
printf("Clashing Tprepare()'ed XID's encountered on recovery!!\n");
@ -182,10 +189,11 @@ void Trevive(int xid, long lsn) {
} else {
XactionTable[index].xid = xid;
XactionTable[index].prevLSN = lsn;
pthread_mutex_lock(&transactional_2_mutex);
numActiveXactions++;
pthread_mutex_unlock(&transactional_2_mutex);
}
pthread_mutex_unlock(&transactional_2_mutex);
}
void TsetXIDCount(int xid) {

View file

@ -33,15 +33,18 @@ void initializePages() {
rid.page = i;
rid.slot = 0;
rid.size = sizeof(int);
p = loadPage(rid.page);
p = loadPage(rid.page);
/* p = loadPage(i); */
assert(p->id != -1);
pageSlotRalloc(p, 0, rid);
unlock(p->loadlatch);
/* rid = pageRalloc(p, sizeof(int)); */
/* addPendingEvent(rid.page); */
writeRecord(1, 1, rid, &i);
writeRecord(1, p, 1, rid, &i);
/* removePendingEvent(rid.page); */
/* assert(p->pending == 0); */
releasePage(p);
}
printf("Initialization complete.\n"); fflush(NULL);
@ -65,10 +68,14 @@ void * workerThread(void * p) {
rid.slot = 0;
rid.size = sizeof(int);
/* addPendingEvent(rid.page); */
readRecord(1, rid, &j);
p = loadPage(rid.page);
readRecord(1, p, rid, &j);
assert(rid.page == k);
/* removePendingEvent(rid.page); */
releasePage(p);
assert(k == j);
}
@ -81,38 +88,56 @@ void * workerThreadWriting(void * q) {
int offset = *(int*)q;
recordid rids[RECORDS_PER_THREAD];
for(int i = 0 ; i < RECORDS_PER_THREAD; i++) {
/* addPendingEvent(rids[i].page); */
rids[i] = ralloc(1, sizeof(int));
/* removePendingEvent(rids[i].page); */
/* printf("\nRID:\t%d,%d\n", rids[i].page, rids[i].slot); */
fflush(NULL);
/* fflush(NULL); */
if(! (i % 1000) ) {
printf("A%d", i / 1000); fflush(NULL);
}
sched_yield();
/* sched_yield(); */
}
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int val = (i * 10000) + offset;
int oldpage = rids[i].page;
int k;
Page * p = loadPage(rids[i].page);
writeRecord(1, 0, rids[i], &val);
assert(p->id == rids[i].page);
for(k = 0; k < 100; k++) {
int * j =NULL;
// assert(p->loadlatch->lock->readers);
assert(p->id == rids[i].page);
free(j = malloc(sizeof(int)));
assert(j);
}
/* sched_yield(); */
writeRecord(1, p, 0, rids[i], &val);
assert(p->id == rids[i].page);
releasePage(p);
if(! (i % 1000) ) {
printf("W%d", i / 1000); fflush(NULL);
}
sched_yield();
/* sched_yield(); */
}
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int val;
Page * p;
readRecord(1, rids[i], &val);
p = loadPage(rids[i].page);
readRecord(1, p, rids[i], &val);
releasePage(p);
if(! (i % 1000) ) {
printf("R%d", i / 1000); fflush(NULL);
@ -121,7 +146,7 @@ void * workerThreadWriting(void * q) {
assert(val == (i * 10000)+offset);
sched_yield();
/* sched_yield(); */
}
return NULL;
@ -134,6 +159,8 @@ START_TEST(pageSingleThreadTest) {
initializePages();
printf("Initialize pages returned.\n"); fflush(NULL);
/* sleep(100000000); */
workerThread(NULL);
@ -209,10 +236,10 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */
/* tcase_add_test(tc, pageSingleThreadTest); */
tcase_add_test(tc, pageLoadTest);
/* tcase_add_test(tc, pageSingleThreadWriterTest); */
tcase_add_test(tc, pageThreadedWritersTest);
/* tcase_add_test(tc, pageSingleThreadTest);
tcase_add_test(tc, pageLoadTest);
tcase_add_test(tc, pageSingleThreadWriterTest); */
tcase_add_test(tc, pageThreadedWritersTest);
/* --------------------------------------------- */

View file

@ -46,7 +46,7 @@ terms specified in this license.
#include <lladd/bufferManager.h>
#include "../check_includes.h"
#include "../../src/lladd/page.h"
#define LOG_NAME "check_operations.log"
@ -62,7 +62,8 @@ START_TEST(operation_physical_do_undo) {
int buf;
int arg;
LogEntry * setToTwo;
Page * p;
Tinit();
rid = ralloc(xid, sizeof(int));
@ -76,16 +77,21 @@ START_TEST(operation_physical_do_undo) {
/* writeLSN(lsn, rid.page); */
DEBUG("B\n");
writeRecord(xid, lsn, rid, &buf);
p = loadPage(rid.page);
writeRecord(xid, p, lsn, rid, &buf);
releasePage(p);
setToTwo->LSN = 10;
DEBUG("C\n");
/* addPendingEvent(rid.page); */
doUpdate(setToTwo); /* PAGE LSN= 10, value = 2. */
p = loadPage(rid.page);
doUpdate(setToTwo, p); /* PAGE LSN= 10, value = 2. */
releasePage(p);
readRecord(xid, rid, &buf);
p = loadPage(rid.page);
readRecord(xid, p, rid, &buf);
releasePage(p);
fail_unless(buf == 2, NULL);
@ -93,14 +99,19 @@ START_TEST(operation_physical_do_undo) {
DEBUG("D\n");
fail_unless(10 == readLSN(rid.page), "page lsn not set correctly.");
p = loadPage(rid.page);
fail_unless(10 == pageReadLSN(p), "page lsn not set correctly.");
setToTwo->LSN = 5;
/* addPendingEvent(rid.page); */
undoUpdate(setToTwo, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */
undoUpdate(setToTwo, p, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */
releasePage(p);
readRecord(xid, rid, &buf);
p = loadPage(rid.page);
readRecord(xid, p, rid, &buf);
releasePage(p);
fail_unless(buf == 1, NULL);
@ -109,7 +120,9 @@ START_TEST(operation_physical_do_undo) {
redoUpdate(setToTwo);
readRecord(xid, rid, &buf);
p = loadPage(rid.page);
readRecord(xid, p, rid, &buf);
releasePage(p);
fail_unless(buf == 1, NULL);
@ -129,8 +142,9 @@ START_TEST(operation_physical_do_undo) {
buf = 1;
/* writeLSN(lsn, rid.page); */
writeRecord(xid, lsn, rid, &buf);
p = loadPage(rid.page);
writeRecord(xid, p, lsn, rid, &buf);
releasePage(p);
/* Trace of test:
PAGE LSN LOG LSN CLR LSN TYPE SUCCEED?
@ -150,37 +164,42 @@ START_TEST(operation_physical_do_undo) {
redoUpdate(setToTwo);
/* writeLSN(setToTwo->LSN, rid.page); */
readRecord(xid, rid, &buf);
p = loadPage(rid.page);
readRecord(xid, p, rid, &buf);
fail_unless(buf == 2, NULL);
DEBUG("G undo set to 2\n");
/* addPendingEvent(rid.page); */
undoUpdate(setToTwo, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
undoUpdate(setToTwo, p, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
readRecord(xid, rid, &buf);
readRecord(xid, p, rid, &buf);
fail_unless(buf == 1, NULL);
releasePage(p);
DEBUG("H don't redo set to 2\n");
/* addPendingEvent(rid.page); */
redoUpdate(setToTwo); /* Fails */
readRecord(xid, rid, &buf);
p = loadPage(rid.page);
readRecord(xid, p, rid, &buf);
fail_unless(buf == 1, NULL);
writeRecord(xid, 0, rid, &buf); /* reset the page's LSN. */
writeRecord(xid, p, 0, rid, &buf); /* reset the page's LSN. */
/* writeLSN(0,rid.page); */
DEBUG("I redo set to 2\n");
/* addPendingEvent(rid.page); */
redoUpdate(setToTwo); /* Succeeds */
readRecord(xid, rid, &buf);
releasePage(p);
redoUpdate(setToTwo); /* Succeeds */
p = loadPage(rid.page);
readRecord(xid, p, rid, &buf);
fail_unless(buf == 2, NULL);
releasePage(p);
Tdeinit();
}
END_TEST

View file

@ -75,7 +75,7 @@ static void* worker_thread(void * arg_ptr) {
int j;
int first = 1;
recordid rid;
for(i = 0; i < 1000; i++) {
for(i = 0; i < 10000; i++) {
pthread_mutex_lock(&lsn_mutex);
this_lsn = lsn;
lsn++;
@ -146,13 +146,20 @@ START_TEST(pageThreadTest) {
int i;
pthread_mutex_init(&random_mutex, NULL);
fail_unless(1, NULL);
Tinit();
Tdeinit();
Tinit();
fail_unless(1, NULL);
Page * p = loadPage(1);
fail_unless(1, NULL);
for(i = 0; i < THREAD_COUNT; i++) {
pthread_create(&workers[i], NULL, worker_thread, p);
}
fail_unless(1, NULL);
for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);
}

View file

@ -56,33 +56,45 @@ void * writingWorkerThread ( void * v ) {
recordid * rids = malloc(RECORDS_PER_THREAD * sizeof(recordid));
int xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
rids[i] = Talloc(xid, sizeof(int));
rids[i] = /* ralloc(xid, sizeof(int)); */ Talloc(xid, sizeof(int));
if(! (i %1000)) {
printf("A%d", i/1000);fflush(NULL);
}
}
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int tmp = i + offset;
/* Page * p = loadPage(rids[i].page);
writeRecord(1, p, 0, rids[i], &tmp);
releasePage(p); */
Tset(xid, rids[i], &tmp);
if(! (i %1000)) {
printf("W%d", i/1000); fflush(NULL);
}
}
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int j;
Tread(xid, rids[i], &j);
/* Page * p = loadPage(rids[i].page);
readRecord(1, p, rids[i], &j);
releasePage(p); */
Tread(xid, rids[i], &j);
assert(i + offset == j);
if(! (i %1000)) {
printf("R%d", i/1000);fflush(NULL);
}
}
Tcommit(xid);
/* Tcommit(xid);
xid = Tbegin();
@ -90,7 +102,7 @@ void * writingWorkerThread ( void * v ) {
int j;
Tread(xid, rids[i], &j);
assert(i + offset == j);
}
}*/
return NULL;
}
@ -230,9 +242,9 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */
tcase_add_test(tc, transactional_smokeTest);
tcase_add_test(tc, transactional_blobSmokeTest);
/* tcase_add_test(tc, transactional_blobSmokeTest); */
tcase_add_test(tc, transactional_nothreads_commit);
tcase_add_test(tc, transactional_threads_commit);
tcase_add_test(tc, transactional_threads_commit);
/** @todo still need to make blobs reentrant! */
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);