Cleaning up bufferManager / page for locking. Want to limit access to the Page struct.

This commit is contained in:
Sears Russell 2004-07-14 20:49:18 +00:00
parent 475b8141d3
commit 2b42451280
24 changed files with 334 additions and 206 deletions

View file

@ -47,7 +47,7 @@ terms specified in this license.
#include <time.h>
#include <lladd/transactional.h>
#include <libdfa/messages.h>
#include "../lladd/lladd/page.h"
#include <sys/types.h>
#include <pthread.h>

View file

@ -87,9 +87,11 @@ terms specified in this license.
#ifndef __BUFFERMANAGER_H__
#define __BUFFERMANAGER_H__
#include <lladd/page.h>
#include <lladd/constants.h>
/*#include <lladd/page.h>*/
#include <lladd/constants.h>
#include <lladd/transactional.h>
/**
* initialize buffer manager
* @return 0 on success
@ -97,13 +99,6 @@ terms specified in this license.
*/
int bufInit();
/**
* @param pageid ID of the page you want to load
* @return fully formed Page type
* @return page with -1 ID if page not found
*/
Page loadPage(int pageid);
/**
* allocate a record
* @param xid The active transaction.
@ -112,6 +107,12 @@ Page loadPage(int pageid);
*/
recordid ralloc(int xid, long size);
/**
* allocate a record at a given slot. (Useful for recovery.)
*/
void slotRalloc(int pageid, lsn_t lsn, recordid rid);
/**
* Find a page with some free space.
*
@ -174,7 +175,7 @@ void readRecord(int xid, recordid rid, void *dat);
*
* @param dat The page to be flushed to disk.
*/
void pageWrite(const Page * dat);
/* void pageWrite(Page * dat); */
/**
@ -183,7 +184,7 @@ void pageWrite(const Page * dat);
@param ret A page struct, with id set correctly. The rest of this
struct will be overwritten by pageMap.
*/
void pageRead(Page * ret);
/* void pageRead(Page * ret); */
/* int flushPage(Page page); */
@ -237,4 +238,8 @@ int bufTransAbort(int xid, lsn_t lsn);
*/
void bufDeinit();
void addPendingEvent(int pageid);
void removePendingEvent(int pageid);
void setSlotType(int pageid, int slot, int type);
#endif

View file

@ -44,7 +44,8 @@ terms specified in this license.
#define __LLADD_LOGGING_LOGENTRY_H
#include <lladd/common.h>
#include <lladd/page.h>
/*#include <lladd/page.h>*/
/*#include <lladd/transactional.h> */
BEGIN_C_DECLS

View file

@ -54,8 +54,8 @@ terms specified in this license.
#define __OPERATIONS_H__
#include <lladd/constants.h>
/*#include <lladd/transactional.h>*/
#include <lladd/logger/logEntry.h>
#include <lladd/transactional.h>
#include <lladd/logger/logEntry.h>
BEGIN_C_DECLS
@ -127,10 +127,6 @@ typedef struct {
} Operation;
/* These need to be installed, since they are required by applications that use LLADD. */
/*#include "constants.h"*/
/*#include <lladd/bufferManager.h>*/
/*#include "logger/logEntry.h"*/
#include "operations/increment.h"
#include "operations/decrement.h"

View file

@ -75,6 +75,7 @@ terms specified in this license.
#define __PREPARE_H__
#include <lladd/operations.h>
#include <lladd/logger/logEntry.h>
extern recordid prepare_bogus_rec;
/**

View file

@ -56,16 +56,13 @@ terms specified in this license.
#include "common.h"
BEGIN_C_DECLS
/** @todo page.h includes things that it shouldn't! (Or, page.h shouldn't be an installed header.) */
/**
* represents how to look up a record on a page
*/
typedef struct {
int page;
int slot;
long size;
} recordid;
#include <lladd/transactional.h>
#include "../config.h"
#include "../src/lladd/latches.h"
BEGIN_C_DECLS
/**
The page type contains in-memory information about pages. This
@ -114,6 +111,17 @@ typedef struct Page_s {
void * rwlatch;
/** This mutex protects the pending field. We don't use rwlatch for
this, since we also need to use a condition variable to update
this properly, and there are no read-only functions for the
pending field. */
pthread_mutex_t pending_mutex; /* pthread_mutex_t */
pthread_cond_t noMorePending; /* pthread_cond_t */
int waiting;
/**
In the multi-threaded case, before we steal a page, we need to
know that all pending actions have been completed. Here, we
@ -134,18 +142,9 @@ typedef struct Page_s {
carefully.
*/
int * pending;
int pending;
} Page;
void addPendingEvent(Page p);
/**
This function blocks until there are no events pending for this page.
@todo implement addPendingEvent and unload for real!
*/
void acquireUnloadLock(Page p);
/**
* initializes all the important variables needed in all the
* functions dealing with pages.
@ -164,20 +163,24 @@ void pageInit();
* as a parameter a Page and returns the LSN that is currently written on that
* page in memory.
*/
lsn_t pageReadLSN(const Page page);
lsn_t pageReadLSN(const Page * page);
/**
* assumes that the page is already loaded in memory. It takes as a
* parameter a Page, and returns an estimate of the amount of free space on this
* page. This is either exact, or an underestimate.
*/
int freespace(Page page);
int freespace(Page * page);
/**
* assumes that the page is already loaded in memory. It takes as
* parameters a Page and the size in bytes of the new record. pageRalloc()
* returns a recordid representing the newly allocated record.
*
* If you call this function, you probably need to be holding lastFreepage_mutex.
*
* @see lastFreepage_mutex
*
* NOTE: might want to pad records to be multiple of words in length, or, simply
* make sure all records start word aligned, but not necessarily having
* a length that is a multiple of words. (Since Tread(), Twrite() ultimately
@ -188,13 +191,13 @@ int freespace(Page page);
*
* @todo Makes no attempt to reuse old recordid's.
*/
recordid pageRalloc(Page page, int size);
recordid pageRalloc(Page * page, int size);
void pageDeRalloc(Page page, recordid rid);
void pageDeRalloc(Page * page, recordid rid);
void pageWriteRecord(int xid, Page page, recordid rid, lsn_t lsn, const byte *data);
void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data);
void pageReadRecord(int xid, Page page, recordid rid, byte *buff);
void pageReadRecord(int xid, Page * page, recordid rid, byte *buff);
void pageCommit(int xid);
@ -204,12 +207,13 @@ void pageRealloc(Page * p, int id);
Page* pageAlloc(int id);
recordid pageSlotRalloc(Page page, lsn_t lsn, recordid rid);
recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid);
int pageTest();
/*int pageTest(); */
int pageGetSlotType(Page * p, int slot, int type);
void pageSetSlotType(Page * p, int slot, int type);
int getSlotType(Page p, int slot, int type);
void setSlotType(Page p, int slot, int type);
END_C_DECLS

View file

@ -7,6 +7,8 @@
#include <sys/time.h>
#include <time.h>
#ifndef __STATS_H
#define __STATS_H
typedef struct {
long sum_spin;
@ -23,3 +25,5 @@ void acquired_lock(profile_tuple * tup, long spin_count);
void released_lock(profile_tuple * tup);
void print_profile_tuple(profile_tuple * tup);
void init_tuple(profile_tuple * tup);
#endif

View file

@ -86,7 +86,17 @@ terms specified in this license.
BEGIN_C_DECLS
#include "page.h"
/**
* represents how to look up a record on a page
*/
typedef struct {
int page;
int slot;
long size;
} recordid;
#include "operations.h"
/**
@ -107,6 +117,8 @@ typedef struct {
/* int status; */
} Transaction;
/**
* initialize the transactional system, including running recover (if
* necessary), building the operations_table, and opening the logs

View file

@ -5,6 +5,7 @@
#include <lladd/transactional.h>
#include <lladd/bufferManager.h>
#include <lladd/page.h>
#include <lladd/constants.h>
#include "blobManager.h"
@ -89,8 +90,7 @@ long myFseek(FILE * f, long offset, int whence) {
recordid preAllocBlob(int xid, long blobSize) {
long fileSize = myFseek(blobf1, 0, SEEK_END);
blob_record_t blob_rec;
Page p;
/* char zero = 0; */
/* Allocate space for the blob entry. */
DEBUG("Allocing blob (size %ld)\n", blobSize);
@ -99,11 +99,8 @@ recordid preAllocBlob(int xid, long blobSize) {
/* First in buffer manager. */
/** Needs to be in alloc.c */
recordid rid = Talloc(xid, sizeof(blob_record_t));
/** Finally, fix up the fields in the record that points to the blob.
The rest of this also should go into alloc.c
*/
@ -111,10 +108,8 @@ recordid preAllocBlob(int xid, long blobSize) {
blob_rec.fd = 0;
blob_rec.size = blobSize;
blob_rec.offset = fileSize;
p = loadPage(rid.page);
setSlotType(p, rid.slot, BLOB_SLOT);
setSlotType(rid.page, rid.slot, BLOB_SLOT);
rid.size = BLOB_SLOT;
/* Tset() needs to know to 'do the right thing' here, since we've
@ -132,9 +127,8 @@ void allocBlob(int xid, lsn_t lsn, recordid rid) {
long fileSize = myFseek(blobf1, 0, SEEK_END);
blob_record_t blob_rec;
Page p;
char zero = 0;
/* recordid rid = preAllocBlob(xid, blobSize); */
/* Allocate space for the blob entry. */
DEBUG("post Allocing blob (size %ld)\n", rid.size);
@ -143,8 +137,6 @@ void allocBlob(int xid, lsn_t lsn, recordid rid) {
/* First in buffer manager. */
p = loadPage(rid.page);
/* Read in record to get the correct offset, size for the blob*/
readRawRecord(xid, rid, &blob_rec, sizeof(blob_record_t));
@ -156,7 +148,7 @@ void allocBlob(int xid, lsn_t lsn, recordid rid) {
}
static lsn_t * tripleHashLookup(int xid, recordid rid) {
static lsn_t * tripleHashLookup(int xid, recordid rid) {
pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(xid));
if(xidHash == NULL) {
return NULL;

View file

@ -56,9 +56,40 @@ terms specified in this license.
#include <lladd/pageCache.h>
#include <lladd/logger/logWriter.h>
#include <lladd/page.h>
static FILE * stable = NULL;
/**
This function blocks until there are no events pending for this page.
*/
static void finalize(Page * page);
/**
Invariant: This lock should be held while updating lastFreepage, or
while performing any operation that may decrease the amount of
freespace in the page that lastFreepage refers to.
Since pageCompact and pageDeRalloc may only increase this value,
they do not need to hold this lock. Since bufferManager is the
only place where pageRalloc is called, pageRalloc does not obtain
this lock.
*/
static pthread_mutex_t lastFreepage_mutex;
static unsigned int lastFreepage = 0;
/**
* @param pageid ID of the page you want to load
* @return fully formed Page type
* @return page with -1 ID if page not found
*/
Page * loadPage(int pageid);
/* ** File I/O functions ** */
/* Defined in blobManager.c, but don't want to export this in any public .h files... */
long myFseek(FILE * f, long offset, int whence);
@ -95,7 +126,7 @@ void pageRead(Page *ret) {
}
void pageWrite(const Page * ret) {
void pageWrite(Page * ret) {
long pageoffset = ret->id * PAGE_SIZE;
long offset = myFseek(stable, pageoffset, SEEK_SET);
@ -104,11 +135,13 @@ void pageWrite(const Page * ret) {
DEBUG("Writing page %d\n", ret->id);
if(flushedLSN() < pageReadLSN(*ret)) {
if(flushedLSN() < pageReadLSN(ret)) {
DEBUG("pageWrite is calling syncLog()!\n");
syncLog();
}
finalize(ret);
if(1 != fwrite(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof writing!\n"); fflush(NULL); abort(); }
@ -135,6 +168,8 @@ static void openPageFile() {
}
lastFreepage = 0;
pthread_mutex_init(&lastFreepage_mutex , NULL);
DEBUG("storefile opened.\n");
}
@ -148,11 +183,11 @@ static void closePageFile() {
int bufInit() {
stable = NULL;
pageInit();
openPageFile();
pageCacheInit();
openBlobStore();
return 0;
}
@ -178,8 +213,8 @@ void simulateBufferManagerCrash() {
/* ** No file I/O below this line. ** */
Page loadPage (int pageid) {
return *loadPagePtr(pageid);
Page * loadPage (int pageid) {
return loadPagePtr(pageid);
}
Page * lastRallocPage = 0;
@ -188,21 +223,30 @@ Page * lastRallocPage = 0;
recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
recordid ret;
Page p;
Page * p;
DEBUG("Rallocing record of size %ld\n", (long int)size);
assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT);
pthread_mutex_lock(&lastFreepage_mutex);
while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; }
ret = pageRalloc(p, size);
pthread_mutex_unlock(&lastFreepage_mutex);
DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size);
return ret;
}
void slotRalloc(int pageid, lsn_t lsn, recordid rid) {
Page * loadedPage = loadPage(rid.page);
pageSlotRalloc(loadedPage, lsn, rid);
}
long readLSN(int pageid) {
return pageReadLSN(loadPage(pageid));
@ -227,7 +271,7 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) {
/* pageWriteRecord(xid, *p, rid, dat);
p->LSN = lsn;
pageWriteLSN(*p); */
pageWriteRecord(xid, *p, rid, lsn, dat);
pageWriteRecord(xid, p, rid, lsn, dat);
}
}
@ -260,3 +304,57 @@ int bufTransAbort(int xid, lsn_t lsn) {
return 0;
}
void addPendingEvent(int pageid){
Page * p = loadPage(pageid);
pthread_mutex_lock(&(p->pending_mutex));
assert(!(p->waiting));
p->pending++;
pthread_mutex_unlock(&(p->pending_mutex));
}
/** @todo as implemented, loadPage() ... doOperation is not atomic!*/
void removePendingEvent(int pageid) {
Page * p = loadPage(pageid);
pthread_mutex_lock(&(p->pending_mutex));
p->pending--;
assert(p->pending >= 0);
if(p->waiting && !p->pending) {
assert(p->waiting == 1);
pthread_cond_signal(&(p->noMorePending));
}
pthread_mutex_unlock(&(p->pending_mutex));
}
static void finalize(Page * p) {
pthread_mutex_lock(&(p->pending_mutex));
p->waiting++;
while(p->pending) {
pthread_cond_wait(&(p->noMorePending), &(p->pending_mutex));
}
pthread_mutex_unlock(&(p->pending_mutex));
return;
}
void setSlotType(int pageid, int slot, int type) {
Page * p = loadPage(pageid);
pageSetSlotType(p, slot, type);
}

View file

@ -7,6 +7,9 @@
/*#include <pbl/pbl.h> -- Don't want everything that touches threading to include pbl... */
#include <lladd/stats.h>
#ifndef __LATCHES_H
#define __LATCHES_H
/**
A data structure for profiling latching behavior.
All time values recorded in this struct are in microseconds.
@ -72,3 +75,4 @@ void __profile_deletelock (rwl *lock);
#endif
#endif /* __LATCHES_H */

View file

@ -43,7 +43,7 @@ terms specified in this license.
#include <config.h>
#include <lladd/common.h>
#include <lladd/logger/logEntry.h>
#include <lladd/transactional.h>
#include <assert.h>
#include <lladd/operations.h>

View file

@ -42,6 +42,7 @@ terms specified in this license.
#include <config.h>
#include <lladd/common.h>
#include <lladd/transactional.h>
#include <lladd/logger/logWriter.h>
#include <lladd/logger/logHandle.h>
@ -49,6 +50,9 @@ terms specified in this license.
#include <assert.h>
#include <stdio.h>
#include <lladd/page.h> /* For addPendingEvent() */
#include <lladd/bufferManager.h>
/**
@todo Should the log file be global?
*/
@ -195,7 +199,7 @@ int openLogWriter() {
the highest LSN that we've seen so far. (If writeLogEntry has not
been called yet.)
The first time writeLogEntry is called, we seekfrom the highest
The first time writeLogEntry is called, we seek from the highest
LSN encountered so far to the end of the log.
*/
@ -203,6 +207,12 @@ int writeLogEntry(LogEntry * e) {
int nmemb;
const long size = sizeofLogEntry(e);
if(e->type == UPDATELOG) {
addPendingEvent(e->contents.update.rid.page);
}
if(e->type == CLRLOG) {
addPendingEvent(e->contents.clr.rid.page);
}
if(e->xid == -1) { /* Don't write log entries for recovery xacts. */
e->LSN = -1;

View file

@ -86,6 +86,7 @@ LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte
long argSize = 0;
LogEntry * e;
if(operationsTable[operation].sizeofData == SIZEOF_RECORD) {
argSize = rid.size;
} else {
@ -101,6 +102,7 @@ LogEntry * LogUpdate(TransactionLog * l, recordid rid, int operation, const byte
}
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage);
writeLogEntry(e);
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize);
@ -117,6 +119,7 @@ lsn_t LogCLR(LogEntry * undone) {
lsn_t ret;
LogEntry * e = allocCLRLogEntry(-1, undone->xid, undone->LSN, undone->contents.update.rid, undone->prevLSN);
writeLogEntry(e);
DEBUG("Log CLR %d, LSN: %ld type: %ld (undoing: %ld, next to undo: %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)undone->LSN, (long int)undone->prevLSN);

View file

@ -43,6 +43,7 @@ terms specified in this license.
#include <lladd/logger/logWriter.h>
#include <lladd/bufferManager.h>
#include <lladd/page.h>
#include <assert.h>
#include <stdio.h>
@ -54,6 +55,9 @@ void doUpdate(const LogEntry * e) {
DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN);
operationsTable[e->contents.update.funcID].run(e->xid, e->LSN, e->contents.update.rid, getUpdateArgs(e));
removePendingEvent(e->contents.update.rid.page);
}
void redoUpdate(const LogEntry * e) {
@ -67,6 +71,7 @@ void redoUpdate(const LogEntry * e) {
doUpdate(e);
} else {
DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %ld}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size);
removePendingEvent(e->contents.update.rid.page);
}
} else if(e->type == CLRLOG) {
LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN);
@ -81,6 +86,7 @@ void redoUpdate(const LogEntry * e) {
undoUpdate(f, e->LSN);
} else {
DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %ld}\n", f->LSN, rid.page, rid.slot, rid.size);
removePendingEvent(e->contents.update.rid.page);
}
} else {
assert(0);
@ -115,6 +121,9 @@ void undoUpdate(const LogEntry * e, lsn_t clr_lsn) {
} else {
DEBUG("OPERATION Skipping undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size);
}
removePendingEvent(e->contents.update.rid.page);
/* printf("Undo done."); fflush(NULL); */
}

View file

@ -28,9 +28,11 @@ static int operate(int xid, lsn_t lsn, recordid rid, const void * dat) {
if(rid.size >= BLOB_THRESHOLD_SIZE) {
allocBlob(xid, lsn, rid);
} else {
Page loadedPage = loadPage(rid.page);
/* Page * loadedPage = loadPage(rid.page); */
/* pageSlotRalloc(loadedPage, lsn, rid); */
/** Has no effect during normal operation. */
pageSlotRalloc(loadedPage, lsn, rid);
slotRalloc(rid.page, lsn, rid);
}
return 0;
@ -38,9 +40,10 @@ static int operate(int xid, lsn_t lsn, recordid rid, const void * dat) {
/** @todo Currently, we just leak store space on dealloc. */
static int deoperate(int xid, lsn_t lsn, recordid rid, const void * dat) {
Page loadedPage = loadPage(rid.page);
/* Page * loadedPage = loadPage(rid.page); */
/** Has no effect during normal operation, other than updating the LSN. */
pageSlotRalloc(loadedPage, lsn, rid);
/* pageSlotRalloc(loadedPage, lsn, rid); */
slotRalloc(rid.page, lsn, rid);
return 0;
}

View file

@ -94,7 +94,7 @@ terms specified in this license.
#include <assert.h>
#include <stdio.h>
#include "latches.h"
/*#include "latches.h" */
#include <lladd/constants.h>
/* TODO: Combine with buffer size... */
@ -145,13 +145,13 @@ Page pool[MAX_BUFFER_SIZE];
int isValidSlot(byte *memAddr, int slot);
void invalidateSlot(byte *memAddr, int slot);
void pageDeRalloc(Page page, recordid rid);
void pageDeRalloc(Page * page, recordid rid);
/**
The caller of this function must already have a writelock on the
page.
*/
static void pageCompact(Page page);
static void pageCompact(Page * page);
/**
* pageInit() initializes all the important variables needed in
@ -200,48 +200,6 @@ void pageAbort(int xid) {
}
/*void addPendingEvent(Page p){
pthread_mutex_lock(p.pending_mutex);
assert(!(p.waiting));
(*(p.pending))++;
pthread_mutex_unlock(p.pending_mutex);
}
void removePendingEvent(Page p) {
pthread_mutex_lock(p.pending_mutex);
(*(p.pending))--;
if(p.waiting && ! p.pending) {
pthread_cond_broadcast(p.removePendingCond);
}
pthread_mutex_unlock(p.pending_mutex);
}
*/
/** Assume single threaded for now.
*/
/*void unload(Page p) {
pthread_mutex_lock(p.pending_mutex);
p.waiting = 1;
while(*(p.pending)) {
pthread_cond_wait(p.removePendingCond);
}
pthread_mutex_unlock(p.pending_mutex);
return;
}*/
static int getFirstHalfOfWord(unsigned int *memAddr) {
unsigned int word = *memAddr;
word = (word >> (2*BITS_PER_BYTE)); /* & MASK_0000FFFF; */
@ -286,12 +244,12 @@ static const byte *slotMemAddr(const byte *memAddr, int slotNum) {
* as a parameter a Page and returns the LSN that is currently written on that
* page in memory.
*/
lsn_t pageReadLSN(Page page) {
lsn_t pageReadLSN(const Page * page) {
lsn_t ret;
readlock(page.rwlatch, 259);
ret = *(long *)(page.memAddr + START_OF_LSN);
readunlock(page.rwlatch);
readlock(page->rwlatch, 259);
ret = *(long *)(page->memAddr + START_OF_LSN);
readunlock(page->rwlatch);
return ret;
}
@ -303,12 +261,12 @@ lsn_t pageReadLSN(Page page) {
*
* @param page You must have a writelock on page before calling this function.
*/
static void pageWriteLSN(Page page) {
static void pageWriteLSN(Page * page) {
*(long *)(page.memAddr + START_OF_LSN) = page.LSN;
*(long *)(page->memAddr + START_OF_LSN) = page->LSN;
}
static int unlocked_freespace(Page page);
static int unlocked_freespace(Page * page);
/**
* freeSpace() assumes that the page is already loaded in memory. It takes
* as a parameter a Page, and returns an estimate of the amount of free space
@ -316,20 +274,20 @@ static int unlocked_freespace(Page page);
* in the page, minus the size of a new slot entry.) This is either exact,
* or an underestimate.
*/
int freespace(Page page) {
int freespace(Page * page) {
int ret;
readlock(page.rwlatch, 292);
readlock(page->rwlatch, 292);
ret = unlocked_freespace(page);
readunlock(page.rwlatch);
readunlock(page->rwlatch);
return ret;
}
/**
Just like freespace(), but doesn't obtain a lock. (So that other methods in this file can use it.)
*/
static int unlocked_freespace(Page page) {
static int unlocked_freespace(Page * page) {
int space;
space= (slotMemAddr(page.memAddr, readNumSlots(page.memAddr)) - (page.memAddr + readFreeSpace(page.memAddr)));
space= (slotMemAddr(page->memAddr, readNumSlots(page->memAddr)) - (page->memAddr + readFreeSpace(page->memAddr)));
return (space < 0) ? 0 : space;
}
@ -369,12 +327,12 @@ static void writeNumSlots(byte *memAddr, int numSlots) {
setFirstHalfOfWord((int*)(unsigned int*)(memAddr + START_OF_NUMSLOTS), numSlots);
}
recordid pageRalloc(Page page, int size) {
recordid pageRalloc(Page * page, int size) {
int freeSpace;
int numSlots;
int i;
writelock(page.rwlatch, 342);
writelock(page->rwlatch, 342);
if(unlocked_freespace(page) < size) {
pageCompact(page);
@ -386,12 +344,12 @@ recordid pageRalloc(Page page, int size) {
#endif
}
freeSpace = readFreeSpace(page.memAddr);
numSlots = readNumSlots(page.memAddr);
freeSpace = readFreeSpace(page->memAddr);
numSlots = readNumSlots(page->memAddr);
recordid rid;
rid.page = page.id;
rid.page = page->id;
rid.slot = numSlots;
rid.size = size;
@ -403,21 +361,21 @@ recordid pageRalloc(Page page, int size) {
*/
for (i = 0; i < numSlots; i++) {
if (!isValidSlot(page.memAddr, i)) {
if (!isValidSlot(page->memAddr, i)) {
rid.slot = i;
break;
}
}
if (rid.slot == numSlots) {
writeNumSlots(page.memAddr, numSlots+1);
writeNumSlots(page->memAddr, numSlots+1);
}
setSlotOffset(page.memAddr, rid.slot, freeSpace);
setSlotLength(page.memAddr, rid.slot, rid.size);
writeFreeSpace(page.memAddr, freeSpace + rid.size);
setSlotOffset(page->memAddr, rid.slot, freeSpace);
setSlotLength(page->memAddr, rid.slot, rid.size);
writeFreeSpace(page->memAddr, freeSpace + rid.size);
writeunlock(page.rwlatch);
writeunlock(page->rwlatch);
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
@ -427,14 +385,14 @@ recordid pageRalloc(Page page, int size) {
/** Only used for recovery, to make sure that consistent RID's are created
* on log playback. */
recordid pageSlotRalloc(Page page, lsn_t lsn, recordid rid) {
recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) {
int freeSpace;
int numSlots;
writelock(page.rwlatch, 376);
writelock(page->rwlatch, 376);
freeSpace = readFreeSpace(page.memAddr);
numSlots= readNumSlots(page.memAddr);
freeSpace = readFreeSpace(page->memAddr);
numSlots= readNumSlots(page->memAddr);
/* if(rid.size > BLOB_THRESHOLD_SIZE) {
return blobSlotAlloc(page, lsn_t lsn, recordid rid);
@ -445,22 +403,22 @@ recordid pageSlotRalloc(Page page, lsn_t lsn, recordid rid) {
if (freeSpace < rid.size) {
pageCompact(page);
freeSpace = readFreeSpace(page.memAddr);
freeSpace = readFreeSpace(page->memAddr);
assert (freeSpace < rid.size);
}
setSlotOffset(page.memAddr, rid.slot, freeSpace);
setSlotLength(page.memAddr, rid.slot, rid.size);
writeFreeSpace(page.memAddr, freeSpace + rid.size);
setSlotOffset(page->memAddr, rid.slot, freeSpace);
setSlotLength(page->memAddr, rid.slot, rid.size);
writeFreeSpace(page->memAddr, freeSpace + rid.size);
} else {
/* assert(rid.size == getSlotLength(page.memAddr, rid.slot)); */ /* Fails. Why? */
}
writeunlock(page.rwlatch);
writeunlock(page->rwlatch);
return rid;
}
int isValidSlot(byte *memAddr, int slot) {
return (getSlotOffset(memAddr, slot) != INVALID_SLOT) ? 1 : 0;
}
@ -470,10 +428,10 @@ void invalidateSlot(byte *memAddr, int slot) {
}
void pageDeRalloc(Page page, recordid rid) {
writelock(page.rwlatch, 416);
invalidateSlot(page.memAddr, rid.slot);
writeunlock(page.rwlatch);
void pageDeRalloc(Page * page, recordid rid) {
writelock(page->rwlatch, 416);
invalidateSlot(page->memAddr, rid.slot);
writeunlock(page->rwlatch);
}
/**
@ -484,7 +442,7 @@ void pageDeRalloc(Page page, recordid rid) {
@todo If we were supporting multithreaded operation, this routine
would need to pin the pages that it works on.
*/
static void pageCompact(Page page) {
static void pageCompact(Page * page) {
int i;
byte buffer[PAGE_SIZE];
@ -494,21 +452,21 @@ static void pageCompact(Page page) {
int slot_length;
int last_used_slot = -1;
numSlots = readNumSlots(page.memAddr);
numSlots = readNumSlots(page->memAddr);
/* DEBUG("Compact: numSlots=%d\n", numSlots); */
meta_size = LSN_SIZE + FREE_SPACE_SIZE + NUMSLOTS_SIZE + (SLOT_SIZE*numSlots);
/* Can't compact in place, slot numbers can come in different orders than
the physical space allocated to them. */
memcpy(buffer + PAGE_SIZE - meta_size, page.memAddr + PAGE_SIZE - meta_size, meta_size);
memcpy(buffer + PAGE_SIZE - meta_size, page->memAddr + PAGE_SIZE - meta_size, meta_size);
for (i = 0; i < numSlots; i++) {
/* DEBUG("i = %d\n", i); */
if (isValidSlot(page.memAddr, i)) {
if (isValidSlot(page->memAddr, i)) {
/* DEBUG("Buffer offset: %d\n", freeSpace); */
slot_length = getSlotLength(page.memAddr, i);
memcpy(buffer + freeSpace, page.memAddr + getSlotOffset(page.memAddr, i), slot_length);
slot_length = getSlotLength(page->memAddr, i);
memcpy(buffer + freeSpace, page->memAddr + getSlotOffset(page->memAddr, i), slot_length);
setSlotOffset(buffer, i, freeSpace);
freeSpace += slot_length;
last_used_slot = i;
@ -524,7 +482,7 @@ static void pageCompact(Page page) {
writeFreeSpace(buffer, freeSpace);
memcpy(page.memAddr, buffer, PAGE_SIZE);
memcpy(page->memAddr, buffer, PAGE_SIZE);
}
@ -577,32 +535,33 @@ int isBlobSlot(byte *pageMemAddr, int slot) {
@todo If the rid size has been overridden, we should check to make
sure that this really is a special record.
*/
void pageReadRecord(int xid, Page page, recordid rid, byte *buff) {
void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) {
byte *recAddress;
readlock(page.rwlatch, 519);
recAddress = page.memAddr + getSlotOffset(page.memAddr, rid.slot);
readlock(page->rwlatch, 519);
recAddress = page->memAddr + getSlotOffset(page->memAddr, rid.slot);
memcpy(buff, recAddress, rid.size);
readunlock(page.rwlatch);
readunlock(page->rwlatch);
}
void pageWriteRecord(int xid, Page page, recordid rid, lsn_t lsn, const byte *data) {
void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data) {
byte *rec;
readlock(page.rwlatch, 529);
readlock(page->rwlatch, 529);
assert(rid.size < PAGE_SIZE);
rec = page.memAddr + getSlotOffset(page.memAddr, rid.slot);
rec = page->memAddr + getSlotOffset(page->memAddr, rid.slot);
if(memcpy(rec, data, rid.size) == NULL ) {
printf("ERROR: MEM_WRITE_ERROR on %s line %d", __FILE__, __LINE__);
exit(MEM_WRITE_ERROR);
}
page.LSN = lsn;
page->LSN = lsn;
pageWriteLSN(page);
readunlock(page.rwlatch);
readunlock(page->rwlatch);
}
void pageRealloc(Page *p, int id) {
@ -610,7 +569,8 @@ void pageRealloc(Page *p, int id) {
p->id = id;
p->LSN = 0;
p->dirty = 0;
*(p->pending)=0;
p->pending = 0;
p->waiting = 0;
writeunlock(p->rwlatch);
}
@ -622,30 +582,33 @@ void pageRealloc(Page *p, int id) {
and should never be freed manually.
*/
Page *pageAlloc(int id) {
Page *p;
Page *page;
pthread_mutex_lock(&pageAllocMutex);
p = &(pool[nextPage]);
page = &(pool[nextPage]);
/* We have an implicit lock on rwlatch, since we allocated it, but
haven't returned yet. */
p->rwlatch = initlock();
page->rwlatch = initlock();
pthread_mutex_init(&page->pending_mutex, NULL);
pthread_cond_init(&page->noMorePending, NULL);
nextPage++;
assert(nextPage <= MAX_BUFFER_SIZE);
/* uggh. Really just want to pass pages by reference */
p->pending = malloc(sizeof(int));
/* page->pending = malloc(sizeof(int)); */
pthread_mutex_unlock(&pageAllocMutex);
/* pageRealloc does its own locking... */
pageRealloc(p, id);
pageRealloc(page, id);
return p;
return page;
}
void printPage(byte *memAddr) {
@ -664,7 +627,7 @@ void printPage(byte *memAddr) {
#define num 20
int pageTest() {
Page page;
Page * page = malloc(sizeof(Page));
recordid rid[num];
char *str[num] = {"one",
@ -689,33 +652,33 @@ int pageTest() {
"twenty"};
int i;
page.memAddr = (byte *)malloc(PAGE_SIZE);
memset(page.memAddr, 0, PAGE_SIZE);
page->memAddr = (byte *)malloc(PAGE_SIZE);
memset(page->memAddr, 0, PAGE_SIZE);
for (i = 0; i < num; i++) {
rid[i] = pageRalloc(page, strlen(str[i]) + 1);
pageWriteRecord(0, page, rid[i], 1, (byte*)str[i]);
}
printPage(page.memAddr);
printPage(page->memAddr);
for (i = 0; i < num; i+= 2)
pageDeRalloc(page, rid[i]);
pageCompact(page);
printf("\n\n\n");
printPage(page.memAddr);
printPage(page->memAddr);
return 0;
}
void setSlotType(Page p, int slot, int type) {
void pageSetSlotType(Page * p, int slot, int type) {
assert(type > PAGE_SIZE);
/* setSlotLength does the locking for us. */
setSlotLength(p.memAddr, slot, type);
setSlotLength(p->memAddr, slot, type);
}
int getSlotType(Page p, int slot, int type) {
int ret = getSlotLength(p.memAddr, slot);
int pageGetSlotType(Page * p, int slot, int type) {
int ret = getSlotLength(p->memAddr, slot);
/* getSlotType does the locking for us. */
return ret > PAGE_SIZE ? ret : NORMAL_SLOT;
}

View file

@ -142,7 +142,6 @@ Page *loadPagePtr(int pageid) {
Page *ret;
if(lastPage && lastPageId == pageid) {
DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) lastPage->memAddr);
return lastPage;
} else {
ret = pblHtLookup(activePages, &pageid, sizeof(int));
@ -169,7 +168,7 @@ Page *loadPagePtr(int pageid) {
lastPage = ret;
lastPageId = pageid;
DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr);
/* DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); */
return ret;
} else if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to kick */

View file

@ -10,7 +10,7 @@
#include <config.h>
#include <lladd/common.h>
#include <lladd/page.h>
#include <lladd/recovery.h>
#include <pbl/pbl.h>
@ -155,6 +155,11 @@ static void Redo() {
/* redoUpdate checks the page that contains e->rid, so we
don't need to check to see if the page is newer than this
log entry. */
if(e->type == UPDATELOG) {
addPendingEvent(e->contents.update.rid.page);
} else {
addPendingEvent(e->contents.clr.rid.page);
}
redoUpdate(e);
}
}

View file

@ -35,7 +35,7 @@ int Tinit() {
setupOperationsTable();
pageInit();
/* pageInit(); */
bufInit();
openLogWriter();

View file

@ -44,7 +44,6 @@ terms specified in this license.
#include <check.h>
#include <assert.h>
#include <lladd/logger/logEntry.h>
#include <lladd/transactional.h>
#include "../check_includes.h"

View file

@ -43,10 +43,10 @@ terms specified in this license.
#include <config.h>
#include <check.h>
#include <lladd/logger/logEntry.h>
#include <lladd/transactional.h>
/*#include <lladd/logger/logEntry.h> */
#include <lladd/logger/logHandle.h>
#include <lladd/logger/logWriter.h>
#include <lladd/transactional.h>
#include "../../src/lladd/latches.h"
#include <sched.h>
@ -61,6 +61,9 @@ static void setup_log() {
int i;
lsn_t prevLSN = -1;
int xid = 100;
Tinit();
deleteLogWriter();
openLogWriter();
@ -121,6 +124,8 @@ START_TEST(logWriterTest)
LogEntry * e;
LogHandle h;
int i = 0;
setup_log();
syncLog();
closeLogWriter();

View file

@ -48,6 +48,8 @@ terms specified in this license.
#include "../check_includes.h"
#define LOG_NAME "check_operations.log"
/**
Assuming that the Tset() operation is implemented correctly, checks
that doUpdate, redoUpdate and undoUpdate are working correctly, for
@ -74,11 +76,13 @@ START_TEST(operation_physical_do_undo) {
/* writeLSN(lsn, rid.page); */
DEBUG("B\n");
writeRecord(xid, lsn, rid, &buf);
setToTwo->LSN = 10;
DEBUG("C\n");
addPendingEvent(rid.page);
doUpdate(setToTwo); /* PAGE LSN= 10, value = 2. */
readRecord(xid, rid, &buf);
@ -93,6 +97,7 @@ START_TEST(operation_physical_do_undo) {
setToTwo->LSN = 5;
addPendingEvent(rid.page);
undoUpdate(setToTwo, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */
readRecord(xid, rid, &buf);
@ -100,6 +105,7 @@ START_TEST(operation_physical_do_undo) {
fail_unless(buf == 1, NULL);
DEBUG("E\n");
addPendingEvent(rid.page);
redoUpdate(setToTwo);
@ -140,6 +146,7 @@ START_TEST(operation_physical_do_undo) {
setToTwo->LSN = 10;
DEBUG("F\n");
addPendingEvent(rid.page);
redoUpdate(setToTwo);
/* writeLSN(setToTwo->LSN, rid.page); */
@ -148,6 +155,7 @@ START_TEST(operation_physical_do_undo) {
fail_unless(buf == 2, NULL);
DEBUG("G undo set to 2\n");
addPendingEvent(rid.page);
undoUpdate(setToTwo, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
readRecord(xid, rid, &buf);
@ -155,17 +163,18 @@ START_TEST(operation_physical_do_undo) {
fail_unless(buf == 1, NULL);
DEBUG("H don't redo set to 2\n");
addPendingEvent(rid.page);
redoUpdate(setToTwo); /* Fails */
readRecord(xid, rid, &buf);
fail_unless(buf == 1, NULL);
writeRecord(xid, 0, rid, &buf); /* reset the page's LSN. */
/* writeLSN(0,rid.page); */
DEBUG("I redo set to 2\n");
addPendingEvent(rid.page);
redoUpdate(setToTwo); /* Succeeds */
readRecord(xid, rid, &buf);

View file

@ -58,13 +58,18 @@ terms specified in this license.
#define LOG_NAME "check_page.log"
#define RECORD_SIZE sizeof(int)
/** @todo check_page needs to use loadPage, so it contains its own
delcaration of loadPage. (Otherwise, loadPage would clutter the
interface, which is especially bad, since we hide the Page struct
ffrom the user for locking purposes.)*/
Page * loadPage(int pageid);
pthread_mutex_t random_mutex;
static lsn_t lsn;
static pthread_mutex_t lsn_mutex;
static void* worker_thread(void * arg_ptr) {
Page p = *(Page*)arg_ptr;
Page * p = (Page*)arg_ptr;
int i;
lsn_t this_lsn;
int j;
@ -87,6 +92,7 @@ static void* worker_thread(void * arg_ptr) {
first = 0;
rid = pageRalloc(p, sizeof(int));
/* addPendingEvent(p); */
pageWriteRecord(1, p, rid, lsn, (byte*)&i);
sched_yield();
@ -111,8 +117,8 @@ static void* worker_thread(void * arg_ptr) {
*/
START_TEST(pageNoThreadTest)
{
Page p;
p.id = 0;
Page * p;
/* p->id = 0;*/
pthread_mutex_init(&lsn_mutex, NULL);
@ -121,7 +127,7 @@ START_TEST(pageNoThreadTest)
p = loadPage(0);
worker_thread(&p);
worker_thread(p);
Tdeinit();
@ -140,10 +146,10 @@ START_TEST(pageThreadTest) {
Tinit();
Page p = loadPage(1);
Page * p = loadPage(1);
for(i = 0; i < THREAD_COUNT; i++) {
pthread_create(&workers[i], NULL, worker_thread, &p);
pthread_create(&workers[i], NULL, worker_thread, p);
}
for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);