Rewrote / refactored page.h. Now, it uses a freelist to reclaim slotid's within a page, and is implemented as two seperate modules. The first handles lsn's, and (will soon) check the type of each page, and then delegate work to a particular implementation of the second module.

This commit is contained in:
Sears Russell 2004-07-30 01:28:39 +00:00
parent c649ba7504
commit 961b63af15
19 changed files with 774 additions and 856 deletions

View file

@ -45,18 +45,11 @@ terms specified in this license.
pageManager - Provides cached page handling, delegates to blob pageManager - Provides cached page handling, delegates to blob
manager when necessary. Doesn't implement an eviction policy. manager when necessary. Doesn't implement an eviction policy.
That is left to a cacheManager. (Multiple cacheManagers can be That is left to a cacheManager. (Multiple cacheManagers could be
used with a single page manager.) used with a single page manager.)
@todo Allow error checking! @todo Allow error checking!
@todo Make linux provide a better version of malloc(). We need to
directly DMA pages into and out of userland, or setup mmap() so
that it takes a flag that makes it page mmapped() pages to swap
instead of back to disk. (munmap() and msync() would still hit the
on-disk copy)
@todo Refactoring for lock manager @todo Refactoring for lock manager
Possible interface for lockManager: Possible interface for lockManager:
@ -84,12 +77,13 @@ terms specified in this license.
* $Id$ * $Id$
*/ */
#include <lladd/constants.h>
#include <lladd/transactional.h>
#ifndef __BUFFERMANAGER_H__ #ifndef __BUFFERMANAGER_H__
#define __BUFFERMANAGER_H__ #define __BUFFERMANAGER_H__
BEGIN_C_DECLS
#include <lladd/constants.h>
#include <lladd/transactional.h>
/** /**
Page is defined in bufferManager.h as an incomplete type to enforce Page is defined in bufferManager.h as an incomplete type to enforce
@ -114,7 +108,6 @@ Page * loadPage(int pageid);
*/ */
void releasePage(Page * p); void releasePage(Page * p);
/** /**
* initialize buffer manager * initialize buffer manager
* @return 0 on success * @return 0 on success
@ -122,39 +115,17 @@ void releasePage(Page * p);
*/ */
int bufInit(); int bufInit();
/** /**
* @param pageid ID of page you want to read * will write out any dirty pages, assumes that there are no running
* @return LSN found on disk * transactions
*/ */
/*long readLSN(int pageid); */ void bufDeinit();
/**
* @param xid transaction id @param lsn the lsn that the updated
* record will reflect. This is needed by recovery, and undo. (The
* lsn of a page must always increase. Undos are handled by passing
* in the LSN of the CLR that records the undo.)
*
* @param rid recordid where you want to write @param dat data you
* wish to write
*/
void writeRecord(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat);
/**
* @param xid transaction ID
* @param rid
* @param dat buffer for data
*/
void readRecord(int xid, Page * page, recordid rid, void *dat);
/** /**
* all actions necessary when committing a transaction. Can assume that the log * all actions necessary when committing a transaction. Can assume that the log
* has been written as well as any other actions that do not depend on the * has been written as well as any other udpates that do not depend on the
* buffer manager * buffer manager
* *
* Basicly, this call is here because we used to do copy on write, and
* it might be useful when locking is implemented.
*
* @param xid transaction ID * @param xid transaction ID
* @param lsn the lsn at which the transaction aborted. (Currently * @param lsn the lsn at which the transaction aborted. (Currently
* unused, but may be useful for other implementations of the buffer * unused, but may be useful for other implementations of the buffer
@ -175,17 +146,10 @@ int bufTransCommit(int xid, lsn_t lsn);
* manager.) * manager.)
* *
* @return 0 on success * @return 0 on success
*
* @return error code on failure * @return error code on failure
*/ */
int bufTransAbort(int xid, lsn_t lsn); int bufTransAbort(int xid, lsn_t lsn);
/** END_C_DECLS
* will write out any dirty pages, assumes that there are no running
* transactions
*/
void bufDeinit();
/*void setSlotType(int pageid, int slot, int type); */
#endif #endif

View file

@ -61,21 +61,25 @@ terms specified in this license.
BEGIN_C_DECLS BEGIN_C_DECLS
/* @type Function /**
* function pointer that the operation will run * function pointer that the operation will run
*/ */
typedef int (*Function)(int xid, Page * p, lsn_t lsn, recordid r, const void *d); typedef int (*Function)(int xid, Page * p, lsn_t lsn, recordid r, const void *d);
/* @type Operation /**
* @param sizeofData size of the data that function accepts (as void*)
* @param undo index into operations table of undo function (takes same args)
* @param run what function to actually run
*/ */
/* @type Special cases /**
If the Operation struct's sizeofData is set to this value, then the
size field of the recordid is used to determine the size of the
argument passed into the operation.
*/ */
#define SIZEOF_RECORD -1 #define SIZEOF_RECORD -1
/** If the Operation struct's undo field is set to this value, then
physical logging is used in lieu of logical logging.
*/
#define NO_INVERSE -1 #define NO_INVERSE -1
typedef struct { typedef struct {
/** /**
@ -88,12 +92,6 @@ typedef struct {
that the operation affects will be used instead. that the operation affects will be used instead.
*/ */
long sizeofData; long sizeofData;
/**
Does this operation supply an undo operation?
--Unneeded; just set undo to the special value NO_INVERSE.
*/
/* int invertible; */
/** /**
Implementing operations that may span records is subtle. Implementing operations that may span records is subtle.
Recovery assumes that page writes (and therefore logical Recovery assumes that page writes (and therefore logical
@ -123,6 +121,9 @@ typedef struct {
We chose the second option for now. We chose the second option for now.
*/ */
/**
index into operations table of undo function
*/
int undo; int undo;
Function run; Function run;
} Operation; } Operation;

View file

@ -285,7 +285,7 @@ int _chtEval(DfaSet * dfaSet,
state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
TwoPCMachineState * state = (TwoPCMachineState*) &(stateMachine->app_state); /* TwoPCMachineState * state = (TwoPCMachineState*) &(stateMachine->app_state);*/
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup));
CHTAppState * app_state_cht = app_state_2pc->app_state; CHTAppState * app_state_cht = app_state_2pc->app_state;

View file

@ -140,14 +140,14 @@ void recover(DfaSet * dfaSet) {
StateMachine sm_stack; StateMachine sm_stack;
StateMachine * sm = &sm_stack; StateMachine * sm = &sm_stack;
StateMachine * this; StateMachine * this;
int ret = (jbHtFirst(dfaSet->smash->xid, dfaSet->smash->hash, sm) != -1); int ret = (jbHtFirst(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1);
while(ret) { while(ret) {
this = getSmash(dfaSet->smash, sm->machine_id); this = getSmash(dfaSet->smash, sm->machine_id);
printf("StateMachine %ld\n", sm->machine_id); printf("StateMachine %ld\n", sm->machine_id);
this->worker_thread = spawn_worker_thread(dfaSet, sm->machine_id); this->worker_thread = spawn_worker_thread(dfaSet, sm->machine_id);
ret = (jbHtNext(dfaSet->smash->xid, dfaSet->smash->hash, sm) != -1); ret = (jbHtNext(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1);
} }
} }

View file

@ -93,7 +93,7 @@ StateMachine * _insertSmash(smash_t * smash, state_machine_id id) {
new->current_state = START_STATE; new->current_state = START_STATE;
/* printf("Insert %ld\n", id); */ /* printf("Insert %ld\n", id); */
ret = (-1 != jbHtInsert(smash->xid, smash->hash, &id, sizeof(state_machine_id), new, sizeof(StateMachine))); ret = (-1 != jbHtInsert(smash->xid, smash->hash, (byte*)&id, sizeof(state_machine_id), (byte*)new, sizeof(StateMachine)));
pblHtInsert(smash->memHash, &id, sizeof(state_machine_id), new); pblHtInsert(smash->memHash, &id, sizeof(state_machine_id), new);
/* Tcommit(smash->xid); /* Tcommit(smash->xid);
smash->xid = Tbegin(); */ smash->xid = Tbegin(); */
@ -136,7 +136,7 @@ StateMachine * insertSmash(smash_t * smash, state_machine_id id) {
pthread_mutex_lock(smash->lock); pthread_mutex_lock(smash->lock);
if(jbHtLookup(smash->xid, smash->hash, &(smash->next_sm_id), sizeof(state_machine_id), &junk) != -1) { if(jbHtLookup(smash->xid, smash->hash, (byte*)&(smash->next_sm_id), sizeof(state_machine_id), (byte*)&junk) != -1) {
pthread_mutex_unlock(smash->lock); pthread_mutex_unlock(smash->lock);
return NULL; return NULL;
} }
@ -167,7 +167,7 @@ int freeSmash (smash_t * smash, state_machine_id id) {
free(old->sleepCond); free(old->sleepCond);
pblHtRemove(smash->memHash, &(id), sizeof(state_machine_id)); pblHtRemove(smash->memHash, &(id), sizeof(state_machine_id));
ret = jbHtRemove(smash->xid, smash->hash, &(id), sizeof(state_machine_id), NULL) != -1; ret = jbHtRemove(smash->xid, smash->hash, (byte*)&(id), sizeof(state_machine_id), NULL) != -1;
free(old); free(old);
@ -194,7 +194,7 @@ int _setSmash(smash_t * smash, state_machine_id id) {
StateMachine * machine; StateMachine * machine;
machine = _getSmash(smash, id); machine = _getSmash(smash, id);
return (-1 != jbHtInsert(smash->xid, smash->hash, &id, sizeof(state_machine_id), machine, sizeof(StateMachine))); return (-1 != jbHtInsert(smash->xid, smash->hash, (byte*)&id, sizeof(state_machine_id),(byte*) machine, sizeof(StateMachine)));
} }

View file

@ -3,6 +3,6 @@
lib_LIBRARIES=liblladd.a lib_LIBRARIES=liblladd.a
#liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a #liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a
# removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c # removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c
liblladd_a_SOURCES=common.c stats.c io.c bufferManager.c linkedlist.c operations.c pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c #operations/lladdhash.c liblladd_a_SOURCES=common.c stats.c io.c bufferManager.c linkedlist.c operations.c pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c page/slotted.c #operations/lladdhash.c
AM_CFLAGS= -g -Wall -pedantic -std=gnu99 AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -50,13 +50,13 @@ terms specified in this license.
#include <latches.h> #include <latches.h>
#include <assert.h> #include <assert.h>
#include "page.h"
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
#include "page.h"
#include "blobManager.h" #include "blobManager.h"
#include <lladd/pageCache.h> #include <lladd/pageCache.h>
#include "pageFile.h" #include "pageFile.h"
#include <pbl/pbl.h> #include <pbl/pbl.h>
@ -138,44 +138,6 @@ void releasePage (Page * p) {
unlock(p->loadlatch); unlock(p->loadlatch);
} }
void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
/* Page *p; */
if(rid.size > BLOB_THRESHOLD_SIZE) {
/* DEBUG("Writing blob.\n"); */
writeBlob(xid, p, lsn, rid, dat);
} else {
/* DEBUG("Writing record.\n"); */
assert( (p->id == rid.page) && (p->memAddr != NULL) );
/** @todo This assert should be here, but the tests are broken, so it causes bogus failures. */
/*assert(pageReadLSN(*p) <= lsn);*/
pageWriteRecord(xid, p, rid, lsn, dat);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
}
}
void readRecord(int xid, Page * p, recordid rid, void *buf) {
if(rid.size > BLOB_THRESHOLD_SIZE) {
/* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
/* @todo should readblob take a page pointer? */
readBlob(xid, p, rid, buf);
} else {
assert(rid.page == p->id);
/* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
pageReadRecord(xid, p, rid, buf);
assert(rid.page == p->id);
}
}
int bufTransCommit(int xid, lsn_t lsn) { int bufTransCommit(int xid, lsn_t lsn) {
commitBlobs(xid); commitBlobs(xid);
@ -316,6 +278,6 @@ Page * getPage(int pageid, int locktype) {
} }
Page *loadPage(int pageid) { Page *loadPage(int pageid) {
Page * ret = getPage(pageid, RW); Page * ret = getPage(pageid, RO);
return ret; return ret;
} }

View file

@ -45,7 +45,8 @@ terms specified in this license.
#include <lladd/logger/logger2.h> #include <lladd/logger/logger2.h>
#include "logWriter.h" #include "logWriter.h"
#include <lladd/bufferManager.h> #include "page.h"
/*#include <lladd/bufferManager.h>*/
#include <stdio.h> #include <stdio.h>
TransactionLog LogTransBegin(int xid) { TransactionLog LogTransBegin(int xid) {
TransactionLog tl; TransactionLog tl;
@ -101,6 +102,7 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
DEBUG("got preimage"); DEBUG("got preimage");
} }
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage); e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage);
writeLogEntry(e); writeLogEntry(e);

View file

@ -46,7 +46,8 @@ terms specified in this license.
*********************************************/ *********************************************/
#include <lladd/operations/decrement.h> #include <lladd/operations/decrement.h>
#include <lladd/bufferManager.h> /*#include <lladd/bufferManager.h>*/
#include "../page.h"
static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) { static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
int i; int i;

View file

@ -46,7 +46,8 @@ terms specified in this license.
**********************************************/ **********************************************/
#include <lladd/operations/increment.h> #include <lladd/operations/increment.h>
#include <lladd/bufferManager.h> /*#include <lladd/bufferManager.h>*/
#include "../page.h"
static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) { static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
int i; int i;

View file

@ -46,7 +46,9 @@ terms specified in this license.
**********************************************/ **********************************************/
#include <lladd/operations/set.h> #include <lladd/operations/set.h>
#include <lladd/bufferManager.h> /*#include <lladd/bufferManager.h>*/
#include "../page.h"
static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) { static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
writeRecord(xid, p, lsn, rid, dat); writeRecord(xid, p, lsn, rid, dat);
return 0; return 0;

View file

@ -40,52 +40,6 @@ permission to use and distribute the software in accordance with the
terms specified in this license. terms specified in this license.
---*/ ---*/
/************************************************************************
* implementation of pages
STRUCTURE OF A PAGE
+-------------------------------------------+-----------------------+--+
| DATA SECTION +--------->| RID: (PAGE, 0) | |
| +-----------------+ | +-----------------------+ |
| +-->| RID: (PAGE, 1) | | |
| | +-----------------+ | |
| | | |
| +-----------------+ | +----------------------------+
| | | +--->| RID: (PAGE, n) |
| | | | +----------------------------+
|======================================================================|
|^ FREE SPACE | | | |
|+-----------------------|-------|---|--------------------+ |
| | | | | |
| +-------------|-------|---+ | |
| | | | | |
| +---|---+-----+---|---+---|---+--------------+-----|------+-----+
| | slotn | ... | slot1 | slot0 | num of slots | free space | LSN |
+------+-------+-----+-------+-------+--------------+------------+-----+
NOTE:
- slots are zero indexed.
- slots are of implemented as (offset, length)
Latching summary:
Each page has an associated read/write lock. This lock only
protects the internal layout of the page, and the members of the
page struct. Here is how it is held in various circumstances:
Record allocation: Write lock
Record read: Read lock
Read LSN Read lock
Record write *READ LOCK*
Write LSN Write lock
Any circumstance where these locks are held during an I/O operation
is a bug.
$Id$
************************************************************************/
/* _XOPEN_SOURCE is needed for posix_memalign */ /* _XOPEN_SOURCE is needed for posix_memalign */
#define _XOPEN_SOURCE 600 #define _XOPEN_SOURCE 600
#include <stdlib.h> #include <stdlib.h>
@ -103,28 +57,11 @@ terms specified in this license.
#include "blobManager.h" #include "blobManager.h"
#include "pageFile.h" #include "pageFile.h"
#include "page/slotted.h"
/* TODO: Combine with buffer size... */ /* TODO: Combine with buffer size... */
static int nextPage = 0; static int nextPage = 0;
static const byte *slotMemAddr(const byte *memAddr, int slotNum) ;
/** @todo: Why does only one of the get/set First/Second HalfOfWord take an unsigned int? */
static int getFirstHalfOfWord(unsigned int *memAddr);
static int getSecondHalfOfWord(int *memAddr);
static void setFirstHalfOfWord(int *memAddr, int value);
static void setSecondHalfOfWord(int *memAddr, int value);
static int readFreeSpace(byte *memAddr);
static void writeFreeSpace(byte *memAddr, int newOffset);
static int readNumSlots(byte *memAddr);
static void writeNumSlots(byte *memAddr, int numSlots);
static int getSlotOffset(byte *memAddr, int slot) ;
static int getSlotLength(byte *memAddr, int slot) ;
static void setSlotOffset(byte *memAddr, int slot, int offset) ;
static void setSlotLength(byte *memAddr, int slot, int length) ;
/** /**
Invariant: This lock should be held while updating lastFreepage, or Invariant: This lock should be held while updating lastFreepage, or
while performing any operation that may decrease the amount of while performing any operation that may decrease the amount of
@ -140,82 +77,12 @@ static unsigned int lastFreepage = 0;
/** @todo replace static ints in page.c with #defines. */
/* ------ */
static int SLOT_OFFSET_SIZE;
static int SLOT_LENGTH_SIZE;
static int SLOT_SIZE;
static int LSN_SIZE;
static int FREE_SPACE_SIZE;
static int NUMSLOTS_SIZE;
static int START_OF_LSN;
static int START_OF_FREE_SPACE;
static int START_OF_NUMSLOTS;
static int MASK_0000FFFF;
static int MASK_FFFF0000;
/* ------ */ /* ------ */
static pthread_mutex_t pageAllocMutex; static pthread_mutex_t pageAllocMutex;
/** We need one dummy page for locking purposes, so this array has one extra page in it. */ /** We need one dummy page for locking purposes, so this array has one extra page in it. */
Page pool[MAX_BUFFER_SIZE+1]; Page pool[MAX_BUFFER_SIZE+1];
/* ------------------ STATIC FUNCTIONS. NONE OF THESE ACQUIRE LOCKS
ON THE MEMORY THAT IS PASSED INTO THEM -------------*/
static int isValidSlot(byte *memAddr, int slot);
static void invalidateSlot(byte *memAddr, int slot);
/**
The caller of this function must already have a writelock on the
page.
*/
static void pageCompact(Page * page);
static int getFirstHalfOfWord(unsigned int *memAddr) {
unsigned int word = *memAddr;
word = (word >> (2*BITS_PER_BYTE)); /* & MASK_0000FFFF; */
return word;
}
static int getSecondHalfOfWord(int *memAddr) {
int word = *memAddr;
word = word & MASK_0000FFFF;
return word;
}
static void setFirstHalfOfWord(int *memAddr, int value){
int word = *memAddr;
word = word & MASK_0000FFFF;
word = word | (value << (2*BITS_PER_BYTE));
*memAddr = word;
}
static void setSecondHalfOfWord(int *memAddr, int value) {
int word = *memAddr;;
word = word & MASK_FFFF0000;
word = word | (value & MASK_0000FFFF);
*memAddr = word;
}
/**
* slotMemAddr() calculates the memory address of the given slot. It does this
* by going to the end of the page, then walking backwards, past the LSN field
* (LSN_SIZE), past the 'free space' and 'num of slots' fields (NUMSLOTS_SIZE),
* and then past a slotNum slots (slotNum * SLOT_SIZE).
*/
static const byte *slotMemAddr(const byte *memAddr, int slotNum) {
return (memAddr + PAGE_SIZE) - (LSN_SIZE + FREE_SPACE_SIZE + NUMSLOTS_SIZE + ((slotNum+1) * SLOT_SIZE));
}
/** /**
* pageWriteLSN() assumes that the page is already loaded in memory. It takes * pageWriteLSN() assumes that the page is already loaded in memory. It takes
* as a parameter a Page. The Page struct contains the new LSN and the page * as a parameter a Page. The Page struct contains the new LSN and the page
@ -223,159 +90,30 @@ static const byte *slotMemAddr(const byte *memAddr, int slotNum) {
* *
* @param page You must have a writelock on page before calling this function. * @param page You must have a writelock on page before calling this function.
*/ */
static void pageWriteLSN(Page * page) { void pageWriteLSN(Page * page) {
/* unlocked since we're only called by a function that holds the writelock. */ /* unlocked since we're only called by a function that holds the writelock. */
*(long *)(page->memAddr + START_OF_LSN) = page->LSN; /* *(long *)(page->memAddr + START_OF_LSN) = page->LSN; */
*lsn_ptr(page) = page->LSN;
}
static int unlocked_freespace(Page * page);
/**
Just like freespace(), but doesn't obtain a lock. (So that other methods in this file can use it.)
*/
static int unlocked_freespace(Page * page) {
int space;
space= (slotMemAddr(page->memAddr, readNumSlots(page->memAddr)) - (page->memAddr + readFreeSpace(page->memAddr)));
return (space < 0) ? 0 : space;
} }
/** /**
* readFreeSpace() assumes that the page is already loaded in memory. It takes * pageReadLSN() assumes that the page is already loaded in memory. It takes
* as a parameter the memory address of the loaded page in memory and returns * as a parameter a Page and returns the LSN that is currently written on that
* the offset at which the free space section of this page begins. * page in memory.
*/ */
static int readFreeSpace(byte *memAddr) { lsn_t pageReadLSN(const Page * page) {
return getSecondHalfOfWord((int*)(memAddr + START_OF_NUMSLOTS)); lsn_t ret;
}
/** readlock(page->rwlatch, 259);
* writeFreeSpace() assumes that the page is already loaded in memory. It takes /* ret = *(long *)(page->memAddr + START_OF_LSN); */
* as parameters the memory address of the loaded page in memory and a new offset ret = *lsn_ptr(page);
* in the page that will denote the point at which free space begins. readunlock(page->rwlatch);
*/
static void writeFreeSpace(byte *memAddr, int newOffset) {
setSecondHalfOfWord((int*)(memAddr + START_OF_NUMSLOTS), newOffset);
}
/** return ret;
* readNumSlots() assumes that the page is already loaded in memory. It takes
* as a parameter the memory address of the loaded page in memory, and returns
* the memory address at which the free space section of this page begins.
*/
static int readNumSlots(byte *memAddr) {
return getFirstHalfOfWord((unsigned int*)(memAddr + START_OF_NUMSLOTS));
}
/**
* writeNumSlots() assumes that the page is already loaded in memory. It takes
* as parameters the memory address of the loaded page in memory and an int
* to which the value of the numSlots field in the page will be set to.
*/
static void writeNumSlots(byte *memAddr, int numSlots) {
setFirstHalfOfWord((int*)(unsigned int*)(memAddr + START_OF_NUMSLOTS), numSlots);
}
static int isValidSlot(byte *memAddr, int slot) {
return (getSlotOffset(memAddr, slot) != INVALID_SLOT) ? 1 : 0;
}
static void invalidateSlot(byte *memAddr, int slot) {
setSlotOffset(memAddr, slot, INVALID_SLOT);
} }
/**
Move all of the records to the beginning of the page in order to
increase the available free space.
@todo If we were supporting multithreaded operation, this routine
would need to pin the pages that it works on.
*/
static void pageCompact(Page * page) {
int i;
byte buffer[PAGE_SIZE];
int freeSpace = 0;
int numSlots;
int meta_size;
int slot_length;
int last_used_slot = -1;
numSlots = readNumSlots(page->memAddr);
/* DEBUG("Compact: numSlots=%d\n", numSlots); */
meta_size = LSN_SIZE + FREE_SPACE_SIZE + NUMSLOTS_SIZE + (SLOT_SIZE*numSlots);
/* Can't compact in place, slot numbers can come in different orders than
the physical space allocated to them. */
memcpy(buffer + PAGE_SIZE - meta_size, page->memAddr + PAGE_SIZE - meta_size, meta_size);
for (i = 0; i < numSlots; i++) {
/* DEBUG("i = %d\n", i); */
if (isValidSlot(page->memAddr, i)) {
/* DEBUG("Buffer offset: %d\n", freeSpace); */
slot_length = getSlotLength(page->memAddr, i);
memcpy(buffer + freeSpace, page->memAddr + getSlotOffset(page->memAddr, i), slot_length);
setSlotOffset(buffer, i, freeSpace);
freeSpace += slot_length;
last_used_slot = i;
}
}
/* if (last_used_slot < numSlots) { */
writeNumSlots(buffer, last_used_slot + 1);
/*} */
/* DEBUG("freeSpace = %d, num slots = %d\n", freeSpace, last_used_slot + 1); */
writeFreeSpace(buffer, freeSpace);
memcpy(page->memAddr, buffer, PAGE_SIZE);
}
/**
* getSlotOffset() assumes that the page is already loaded in memory. It takes
* as parameters the memory address of the page loaded in memory, and a slot
* number. It returns the offset corresponding to that slot.
*/
static int getSlotOffset(byte *memAddr, int slot) {
return getFirstHalfOfWord((unsigned int*)slotMemAddr(memAddr, slot));
}
/**
* getSlotLength() assumes that the page is already loaded in memory. It takes
* as parameters the memory address of the page loaded in memory, and a slot
* number. It returns the length corresponding to that slot.
*/
static int getSlotLength(byte *memAddr, int slot) {
return getSecondHalfOfWord((int*)(unsigned int*)slotMemAddr(memAddr, slot));
}
/**
* setSlotOffset() assumes that the page is already loaded in memory. It takes
* as parameters the memory address of the page loaded in memory, a slot number,
* and an offset. It sets the offset of the given slot to the offset passed in
* as a parameter.
*/
static void setSlotOffset(byte *memAddr, int slot, int offset) {
setFirstHalfOfWord((int*)slotMemAddr(memAddr, slot), offset);
}
/**
* setSlotLength() assumes that the page is already loaded in memory. It takes
* as parameters the memory address of the page loaded in memory, a slot number,
* and a length. It sets the length of the given slot to the length passed in
* as a parameter.
*/
static void setSlotLength(byte *memAddr, int slot, int length) {
setSecondHalfOfWord((int*)(unsigned int*)slotMemAddr(memAddr, slot), length);
}
static void pageReallocNoLock(Page *p, int id) { static void pageReallocNoLock(Page *p, int id) {
p->id = id; p->id = id;
p->LSN = 0; p->LSN = 0;
@ -409,7 +147,7 @@ void pageInit() {
* and the greatest offset at which a record could possibly * and the greatest offset at which a record could possibly
* start is at the end of the page * start is at the end of the page
*/ */
SLOT_LENGTH_SIZE = SLOT_OFFSET_SIZE = 2; /* in bytes */ /* SLOT_LENGTH_SIZE = SLOT_OFFSET_SIZE = 2; / * in bytes * /
SLOT_SIZE = SLOT_OFFSET_SIZE + SLOT_LENGTH_SIZE; SLOT_SIZE = SLOT_OFFSET_SIZE + SLOT_LENGTH_SIZE;
LSN_SIZE = sizeof(long); LSN_SIZE = sizeof(long);
@ -422,7 +160,7 @@ void pageInit() {
MASK_0000FFFF = (1 << (2*BITS_PER_BYTE)) - 1; MASK_0000FFFF = (1 << (2*BITS_PER_BYTE)) - 1;
MASK_FFFF0000 = ~MASK_0000FFFF; MASK_FFFF0000 = ~MASK_0000FFFF;
*/
pthread_mutex_init(&pageAllocMutex, NULL); pthread_mutex_init(&pageAllocMutex, NULL);
for(int i = 0; i < MAX_BUFFER_SIZE+1; i++) { for(int i = 0; i < MAX_BUFFER_SIZE+1; i++) {
@ -446,111 +184,13 @@ void pageDeInit() {
} }
} }
typedef struct {
int page;
int slot;
/** If pageptr is not null, then it is used by the iterator methods.
Otherwise, they re-load the pages and obtain short latches for
each call. */
Page * pageptr;
} page_iterator_t;
void pageIteratorInit(recordid rid, page_iterator_t * pit, Page * p) {
pit->page = rid.page;
pit->slot = rid.slot;
pit->pageptr = p;
assert((!p) || (p->id == pit->page));
}
int nextSlot(page_iterator_t * pit, recordid * rid) {
Page * p;
int numSlots;
int done = 0;
int ret;
if(pit->pageptr) {
p = pit->pageptr;
} else {
p = loadPage(pit->page);
}
numSlots = readNumSlots(p->memAddr);
while(pit->slot < numSlots && !done) {
if(isValidSlot(p->memAddr, pit->slot)) {
done = 1;
} else {
pit->slot ++;
}
}
if(!done) {
ret = 0;
} else {
ret = 1;
rid->page = pit->page;
rid->slot = pit->slot;
rid->size = getSlotLength(p->memAddr, rid->slot);
if(rid->size >= PAGE_SIZE) {
if(rid->size == BLOB_SLOT) {
blob_record_t br;
pageReadRecord(-1, p, *rid, (byte*)&br);
rid->size = br.size;
}
}
}
if(!pit->pageptr) {
releasePage(p);
}
return ret;
}
void pageCommit(int xid) { void pageCommit(int xid) {
/* rmTouch(xid); */
} }
void pageAbort(int xid) { void pageAbort(int xid) {
/* rmTouch(xid); */
} }
/**
* pageReadLSN() assumes that the page is already loaded in memory. It takes
* as a parameter a Page and returns the LSN that is currently written on that
* page in memory.
*/
lsn_t pageReadLSN(const Page * page) {
lsn_t ret;
readlock(page->rwlatch, 259);
ret = *(long *)(page->memAddr + START_OF_LSN);
readunlock(page->rwlatch);
return ret;
}
/**
* freeSpace() assumes that the page is already loaded in memory. It takes
* as a parameter a Page, and returns an estimate of the amount of free space
* available to a new slot on this page. (This is the amount of unused space
* in the page, minus the size of a new slot entry.) This is either exact,
* or an underestimate.
*
* @todo is it ever safe to call freespace without a lock on the page?
*
*/
int freespace(Page * page) {
int ret;
readlock(page->rwlatch, 292);
ret = unlocked_freespace(page);
readunlock(page->rwlatch);
return ret;
}
/** @todo ralloc ignores it's xid parameter; change the interface? */ /** @todo ralloc ignores it's xid parameter; change the interface? */
recordid ralloc(int xid, long size) { recordid ralloc(int xid, long size) {
@ -581,161 +221,8 @@ recordid ralloc(int xid, long size) {
recordid pageRalloc(Page * page, int size) {
int freeSpace;
int numSlots;
int i;
writelock(page->rwlatch, 342);
if(unlocked_freespace(page) < size) {
pageCompact(page);
/* Make sure there's enough free space... */
/*#ifdef DEBUGGING*/
assert (unlocked_freespace(page) >= (int)size); /*Expensive, so skip it when debugging is off. */
/*#endif */
}
freeSpace = readFreeSpace(page->memAddr);
numSlots = readNumSlots(page->memAddr);
recordid rid;
rid.page = page->id;
rid.slot = numSlots;
rid.size = size;
/*
Reuse an old (invalid) slot entry. Why was this here?
@todo is slot reuse in page.c a performance bottleneck?
*/
for (i = 0; i < numSlots; i++) {
if (!isValidSlot(page->memAddr, i)) {
rid.slot = i;
break;
}
}
if (rid.slot == numSlots) {
writeNumSlots(page->memAddr, numSlots+1);
}
setSlotOffset(page->memAddr, rid.slot, freeSpace);
setSlotLength(page->memAddr, rid.slot, rid.size);
writeFreeSpace(page->memAddr, freeSpace + rid.size);
writeunlock(page->rwlatch);
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
return rid;
}
/** Only used for recovery, to make sure that consistent RID's are created
* on log playback. */
recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) {
int freeSpace;
int numSlots;
writelock(page->rwlatch, 376);
freeSpace = readFreeSpace(page->memAddr);
numSlots= readNumSlots(page->memAddr);
/* printf("!"); fflush(NULL); */
/* if(rid.size > BLOB_THRESHOLD_SIZE) {
return blobSlotAlloc(page, lsn_t lsn, recordid rid);
}*/
/* assert(rid.slot >= numSlots); */
/** @todo for recovery, pageSlotRalloc assumes no other thread added a slot
between when ralloc and it were called. (This may be a
safe assumption..) */
if(getSlotLength(page->memAddr, rid.slot) == 0) {
/* if(rid.slot >= numSlots) { */
if (unlocked_freespace(page) < rid.size) { /*freeSpace < rid.size) { */
pageCompact(page);
freeSpace = readFreeSpace(page->memAddr);
assert (freeSpace < rid.size);
}
setSlotOffset(page->memAddr, rid.slot, freeSpace);
setSlotLength(page->memAddr, rid.slot, rid.size);
writeFreeSpace(page->memAddr, freeSpace + rid.size);
/* printf("?"); fflush(NULL);*/
} else {
assert((rid.size == getSlotLength(page->memAddr, rid.slot)) ||
(getSlotLength(page->memAddr, rid.slot) >= PAGE_SIZE)); /* Fails. Why? */
}
writeunlock(page->rwlatch);
return rid;
}
void pageDeRalloc(Page * page, recordid rid) {
readlock(page->rwlatch, 443);
invalidateSlot(page->memAddr, rid.slot);
unlock(page->rwlatch);
}
/*
This should trust the rid (since the caller needs to
override the size in special circumstances)
@todo If the rid size has been overridden, we should check to make
sure that this really is a special record.
*/
void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) {
byte *recAddress;
int slot_length;
readlock(page->rwlatch, 519);
assert(page->id == rid.page);
recAddress = page->memAddr + getSlotOffset(page->memAddr, rid.slot);
slot_length = getSlotLength(page->memAddr, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
memcpy(buff, recAddress, rid.size);
unlock(page->rwlatch);
}
void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data) {
byte *rec;
int len;
readlock(page->rwlatch, 529);
assert(rid.size < PAGE_SIZE);
rec = page->memAddr + getSlotOffset(page->memAddr, rid.slot);
len = getSlotLength(page->memAddr, rid.slot);
assert(rid.size == len || len >= PAGE_SIZE);
if(memcpy(rec, data, rid.size) == NULL ) {
printf("ERROR: MEM_WRITE_ERROR on %s line %d", __FILE__, __LINE__);
exit(MEM_WRITE_ERROR);
}
page->LSN = lsn;
pageWriteLSN(page);
unlock(page->rwlatch);
}
/** @todo Does pageRealloc really need to obtain a lock? */
void pageRealloc(Page *p, int id) { void pageRealloc(Page *p, int id) {
writelock(p->rwlatch, 10); writelock(p->rwlatch, 10);
pageReallocNoLock(p,id); pageReallocNoLock(p,id);
@ -756,93 +243,54 @@ Page *pageAlloc(int id) {
page = &(pool[nextPage]); page = &(pool[nextPage]);
/* We have an implicit lock on rwlatch, since we allocated it, but
haven't returned yet. */
/* page->rwlatch = initlock();
page->loadlatch = initlock();
page->memAddr = malloc(PAGE_SIZE); */
nextPage++; nextPage++;
assert(nextPage <= MAX_BUFFER_SIZE + 1); /* There's a dummy page that we need to keep around, thus the +1 */ /* There's a dummy page that we need to keep around, thus the +1 */
assert(nextPage <= MAX_BUFFER_SIZE + 1);
pthread_mutex_unlock(&pageAllocMutex); pthread_mutex_unlock(&pageAllocMutex);
return page; return page;
} }
void printPage(byte *memAddr) { void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
int i = 0;
for (i = 0; i < PAGE_SIZE; i++) { /* writelock(p->rwlatch, 225); *//* Need a writelock so that we can update the lsn. */
if((*(char *)(memAddr+i)) == 0) {
printf("#"); if(rid.size > BLOB_THRESHOLD_SIZE) {
/* DEBUG("Writing blob.\n"); */
writeBlob(xid, p, lsn, rid, dat);
} else { } else {
printf("%c", *(char *)(memAddr+i)); /* DEBUG("Writing record.\n"); */
assert( (p->id == rid.page) && (p->memAddr != NULL) );
/** @todo This assert should be here, but the tests are broken, so it causes bogus failures. */
/*assert(pageReadLSN(*p) <= lsn);*/
pageWriteRecord(xid, p, lsn, rid, dat);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
} }
if((i+1)%4 == 0)
printf(" "); /* p->LSN = lsn;
pageWriteLSN(p);
unlock(p->rwlatch); */
}
void readRecord(int xid, Page * p, recordid rid, void *buf) {
if(rid.size > BLOB_THRESHOLD_SIZE) {
/* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
/* @todo should readblob take a page pointer? */
readBlob(xid, p, rid, buf);
} else {
assert(rid.page == p->id);
/* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
pageReadRecord(xid, p, rid, buf);
assert(rid.page == p->id);
} }
} }
#define num 20
int pageTest() {
Page * page = malloc(sizeof(Page));
recordid rid[num];
char *str[num] = {"one",
"two",
"three",
"four",
"five",
"six",
"seven",
"eight",
"nine",
"ten",
"eleven",
"twelve",
"thirteen",
"fourteen",
"fifteen",
"sixteen",
"seventeen",
"eighteen",
"nineteen",
"twenty"};
int i;
page->memAddr = (byte *)malloc(PAGE_SIZE);
memset(page->memAddr, 0, PAGE_SIZE);
for (i = 0; i < num; i++) {
rid[i] = pageRalloc(page, strlen(str[i]) + 1);
pageWriteRecord(0, page, rid[i], 1, (byte*)str[i]);
}
printPage(page->memAddr);
for (i = 0; i < num; i+= 2)
pageDeRalloc(page, rid[i]);
pageCompact(page);
printf("\n\n\n");
printPage(page->memAddr);
return 0;
}
/** @todo: Should the caller need to obtain the writelock when calling pageSetSlotType? */
void pageSetSlotType(Page * p, int slot, int type) {
assert(type > PAGE_SIZE);
writelock(p->rwlatch, 686);
setSlotLength(p->memAddr, slot, type);
unlock(p->rwlatch);
}
int pageGetSlotType(Page * p, int slot, int type) {
int ret;
readlock(p->rwlatch, 693);
ret = getSlotLength(p->memAddr, slot);
unlock(p->rwlatch);
/* getSlotType does the locking for us. */
return ret > PAGE_SIZE ? ret : NORMAL_SLOT;
}

View file

@ -45,10 +45,39 @@ terms specified in this license.
* *
* interface for dealing with slotted pages * interface for dealing with slotted pages
* *
* This file provides a re-entrant interface for pages that contain
* variable-size records.
*
* @ingroup LLADD_CORE * @ingroup LLADD_CORE
* $Id$ * $Id$
* *
* @todo update docs in this file. * @todo The slotted pages implementation, and the rest of the page
* structure should be seperated, and each page should have a 'type'
* slot so that we can implement multiple page types on top of LLADD.
Slotted page layout:
END:
lsn (4 bytes)
type (2 bytes)
free space (2 bytes)
num of slots (2 bytes)
freelist head(2 bytes)
slot 0 (2 bytes)
slot 1 (2 bytes)
...
slot n (2 bytes)
...
unused
...
record n (x bytes)
...
record 0 (y bytes)
record 1 (z bytes)
START
**/ **/
#ifndef __PAGE_H__ #ifndef __PAGE_H__
@ -57,16 +86,31 @@ terms specified in this license.
#include <config.h> #include <config.h>
#include <lladd/common.h> #include <lladd/common.h>
#include "latches.h" #include "latches.h"
/** @todo page.h includes things that it shouldn't! (Or, page.h shouldn't be an installed header.) */ /** @todo page.h includes things that it shouldn't, and page.h should eventually be an installed header. */
#include <lladd/transactional.h> #include <lladd/transactional.h>
/*#ifdef __BUFFERMANAGER_H__
#error bufferManager.h must be included after page.h
#endif*/
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
BEGIN_C_DECLS BEGIN_C_DECLS
/*
#define LSN_SIZE sizeof(lsn_t)
#define START_OF_LSN (PAGE_SIZE - LSN_SIZE)
#define PAGE_TYPE_SIZE 0
#define START_OF_PAGE_TYPE (START_OF_LSN - PAGE_TYPE_SIZE)
#define USABLE_SPACE_SIZE (START_OF_PAGE_TYPE)*/
#define lsn_ptr(page) (((lsn_t *)(&((page)->memAddr[PAGE_SIZE])))-1)
#define page_type_ptr(page) (((int*)lsn_ptr((page)))-1)
#define end_of_usable_space_ptr(page) page_type_ptr((page))
#define shorts_from_end(page, count) (((short*)end_of_usable_space_ptr((page)))-(count))
#define bytes_from_start(page, count) (((byte*)((page)->memAddr))+(count))
/*#define invalidateSlot(page, n) (*slot_ptr((page), (n)) = INVALID_SLOT)*/
/** /**
The page type contains in-memory information about pages. This The page type contains in-memory information about pages. This
@ -76,9 +120,7 @@ BEGIN_C_DECLS
In particular, our current page replacement policy requires two doubly In particular, our current page replacement policy requires two doubly
linked lists, linked lists,
@todo In general, we pass around page structs (as opposed to page @todo The Page struct should be tuned for better memory utilization.
pointers). This is starting to become cumbersome, as the page
struct is becoming more complex...)
*/ */
struct Page_s { struct Page_s {
/** @todo Shouldn't Page.id be a long? */ /** @todo Shouldn't Page.id be a long? */
@ -149,41 +191,8 @@ struct Page_s {
@see rwlatch, getPage(), pageRalloc(), pageRead() @see rwlatch, getPage(), pageRalloc(), pageRead()
*/ */
rwl * loadlatch; rwl * loadlatch;
/** This mutex protects the pending field. We don't use rwlatch for
this, since we also need to use a condition variable to update
this properly, and there are no read-only functions for the
pending field. */
/* pthread_cond_t noMorePending; */ /* pthread_cond_t */
/* int waiting; */
/**
In the multi-threaded case, before we steal a page, we need to
know that all pending actions have been completed. Here, we
track that on a per-resident page basis, by incrementing the
pending field each time we generate a log entry that will result
in a write to the corresponding page.
(For a concrete example of why this is needed, imagine two
threads write to different records on the same page, and get
LSN's 1 and 2. If 2 happens to write first, then the page is
stolen, and then we crash, recovery will not know that the page
does not reflect LSN 1.)
"Pending events" are calls to functions that take lsn's.
Currently, those functions are writeRecord and pageSlotRalloc.
@todo work out what happens with kickPage() and loadPage() more
carefully.
*/
/* int pending; */
}; };
/** /**
@ -191,7 +200,6 @@ struct Page_s {
* functions dealing with pages. * functions dealing with pages.
*/ */
void pageInit(); void pageInit();
void pageDeInit(); void pageDeInit();
/** /**
@ -209,11 +217,22 @@ void pageDeInit();
lsn_t pageReadLSN(const Page * page); lsn_t pageReadLSN(const Page * page);
/** /**
* assumes that the page is already loaded in memory. It takes as a * @param xid transaction id @param lsn the lsn that the updated
* parameter a Page, and returns an estimate of the amount of free space on this * record will reflect. This is needed by recovery, and undo. (The
* page. This is either exact, or an underestimate. * lsn of a page must always increase. Undos are handled by passing
* in the LSN of the CLR that records the undo.)
*
* @param rid recordid where you want to write @param dat data you
* wish to write
*/ */
int freespace(Page * page); void writeRecord(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat);
/**
* @param xid transaction ID
* @param rid
* @param dat buffer for data
*/
void readRecord(int xid, Page * page, recordid rid, void *dat);
/** /**
* allocate a record. This must be done in two phases. The first * allocate a record. This must be done in two phases. The first
@ -231,7 +250,6 @@ int freespace(Page * page);
recordid ralloc(int xid, long size); recordid ralloc(int xid, long size);
/** /**
* assumes that the page is already loaded in memory. It takes as * assumes that the page is already loaded in memory. It takes as
* parameters a Page and the size in bytes of the new record. pageRalloc() * parameters a Page and the size in bytes of the new record. pageRalloc()
@ -252,34 +270,18 @@ recordid ralloc(int xid, long size);
* @todo Makes no attempt to reuse old recordid's. * @todo Makes no attempt to reuse old recordid's.
*/ */
recordid pageRalloc(Page * page, int size); recordid pageRalloc(Page * page, int size);
void pageDeRalloc(Page * page, recordid rid);
void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data);
void pageReadRecord(int xid, Page * page, recordid rid, byte *buff);
void pageCommit(int xid);
void pageAbort(int xid);
/*void pageReallocNoLock(Page * p, int id); */
/** @todo Do we need a locking version of pageRealloc? */
void pageRealloc(Page * p, int id);
Page* pageAlloc(int id);
recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid); recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid);
void pageDeRalloc(Page * page, recordid rid); void pageDeRalloc(Page * page, recordid rid);
/*int pageTest(); */ void pageCommit(int xid);
void pageAbort(int xid);
Page* pageAlloc(int id);
void pageRealloc(Page * p, int id);
int pageGetSlotType(Page * p, int slot, int type); int pageGetSlotType(Page * p, int slot, int type);
void pageSetSlotType(Page * p, int slot, int type); void pageSetSlotType(Page * p, int slot, int type);
Page * loadPage(int page);
Page * getPage(int page, int mode);
END_C_DECLS END_C_DECLS
#endif #endif

412
src/lladd/page/slotted.c Normal file
View file

@ -0,0 +1,412 @@
/************************************************************************
* implementation of pages
STRUCTURE OF A PAGE
+-------------------------------------------+-----------------------+--+
| DATA SECTION +--------->| RID: (PAGE, 0) | |
| +-----------------+ | +-----------------------+ |
| +-->| RID: (PAGE, 1) | | |
| | +-----------------+ | |
| | | |
| +-----------------+ | +----------------------------+
| | | +--->| RID: (PAGE, n) |
| | | | +----------------------------+
|======================================================================|
|^ FREE SPACE | | | |
|+-----------------------|-------|---|--------------------+ |
| | | | | |
| +-------------|-------|---+ | |
| | | | | |
| +---|---+-----+---|---+---|---+--------------+-----|------+-----+
| | slotn | ... | slot1 | slot0 | num of slots | free space | LSN |
+------+-------+-----+-------+-------+--------------+------------+-----+
NOTE:
- slots are zero indexed.
- slots are of implemented as (offset, length)
Latching summary:
Each page has an associated read/write lock. This lock only
protects the internal layout of the page, and the members of the
page struct. Here is how it is held in various circumstances:
Record allocation: Write lock
Record read: Read lock
Read LSN Read lock
Record write *READ LOCK*
Write LSN Write lock
Any circumstance where these locks are held during an I/O operation
is a bug.
$Id$
************************************************************************/
#include "../page.h"
#include "../blobManager.h"
#include "slotted.h"
#include <assert.h>
/* ------------------ STATIC FUNCTIONS. NONE OF THESE ACQUIRE LOCKS
ON THE MEMORY THAT IS PASSED INTO THEM -------------*/
static void __really_do_ralloc(Page * page, recordid rid) ;
/**
Move all of the records to the beginning of the page in order to
increase the available free space.
The caller of this function must have a writelock on the page.
*/
static void pageCompact(Page * page) {
int i;
Page bufPage;
byte buffer[PAGE_SIZE];
int numSlots;
int meta_size;
bufPage.id = -1;
bufPage.memAddr = buffer;
/* Can't compact in place, slot numbers can come in different orders than
the physical space allocated to them. */
memset(buffer, -1, PAGE_SIZE);
meta_size = (((int)page->memAddr) + PAGE_SIZE ) - (int)end_of_usable_space_ptr(page);
/* *slot_length_ptr(page, (*numslots_ptr(page))-1);*/
memcpy(buffer + PAGE_SIZE - meta_size, page->memAddr + PAGE_SIZE - meta_size, meta_size);
pageInitialize(&bufPage);
numSlots = *numslots_ptr(page);
for (i = 0; i < numSlots; i++) {
/* printf("i = %d\n", i); */
if (isValidSlot(page, i)) {
/* printf("copying %d\n", i);
fflush(NULL); */
/* DEBUG("Buffer offset: %d\n", freeSpace); */
recordid rid;
rid.page = -1;
rid.slot = i;
rid.size = *slot_length_ptr(page, i);
__really_do_ralloc(&bufPage, rid);
memcpy(record_ptr(&bufPage, rid.slot), record_ptr(page, rid.slot), rid.size);
} else {
*slot_ptr(&bufPage, i) = INVALID_SLOT;
*slot_length_ptr(&bufPage, i) = *freelist_ptr(page);
*freelist_ptr(page) = i;
}
}
/* Rebuild the freelist. */
/* *freelist_ptr(&bufPage) = 0;
for (i = 0; i < numSlots; i++) {
if (!isValidSlot(&bufPage, i)) {
*slot_length_ptr(&bufPage, i) = *freelist_ptr(&bufPage);
*freelist_ptr(&bufPage) = i;
break;
}
}
*/
memcpy(page->memAddr, buffer, PAGE_SIZE);
}
void pageInitialize(Page * page) {
/* printf("Initializing page %d\n", page->id);
fflush(NULL); */
memset(page->memAddr, 0, PAGE_SIZE);
*freespace_ptr(page) = 0;
*numslots_ptr(page) = 0;
*freelist_ptr(page) = INVALID_SLOT;
}
int unlocked_freespace(Page * page) {
return (int)slot_length_ptr(page, *numslots_ptr(page)) - (int)(page->memAddr + *freespace_ptr(page));
}
/**
* freeSpace() assumes that the page is already loaded in memory. It takes
* as a parameter a Page, and returns an estimate of the amount of free space
* available to a new slot on this page. (This is the amount of unused space
* in the page, minus the size of a new slot entry.) This is either exact,
* or an underestimate.
*
* @todo is it ever safe to call freespace without a lock on the page?
*
*/
int freespace(Page * page) {
int ret;
readlock(page->rwlatch, 292);
ret = unlocked_freespace(page);
readunlock(page->rwlatch);
return ret;
}
recordid pageRalloc(Page * page, int size) {
writelock(page->rwlatch, 342);
recordid rid;
rid.page = page->id;
rid.slot = *numslots_ptr(page);
rid.size = size;
/*
Reuse an old (invalid) slot entry.
@todo This is terribly slow, but seems to be necessary, or
we will leak slot ids. Is there a better (non n^2) way?
Perhaps we could use the empty slots to construct a linked
list of free pages. (The slot length could be the offset
of the next slot on the list, and we could use the standard
INVALID_SLOT value to distinguish between the types.)
*/
/* Old way */
/* int i;
for (i = 0; i < numSlots; i++) {
if (!isValidSlot(page, i)) {
rid.slot = i;
break;
}
} */
/* new way @todo leaks slot zero (until pageCompact is called)*/
if(*freelist_ptr(page) != INVALID_SLOT) {
rid.slot = *freelist_ptr(page);
/* printf("Reusing old slot %d\n", rid.slot); */
*freelist_ptr(page) = *slot_length_ptr(page, rid.slot);
*slot_length_ptr(page, rid.slot) = 0;
} else {
/* printf("Allocating new slot\n"); */
}
fflush(NULL);
__really_do_ralloc(page, rid);
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
writeunlock(page->rwlatch);
return rid;
}
static void __really_do_ralloc(Page * page, recordid rid) {
int freeSpace;
assert(rid.size > 0);
if(unlocked_freespace(page) < rid.size) {
pageCompact(page);
/* Make sure there's enough free space... */
assert (unlocked_freespace(page) >= rid.size);
}
freeSpace = *freespace_ptr(page);
if(*numslots_ptr(page) <= rid.slot) {
/* printf("Incrementing numSlots."); */
*numslots_ptr(page) = rid.slot + 1;
}
DEBUG("Num slots %d\trid.slot %d\n", *numslots_ptr(page), rid.slot);
*freespace_ptr(page) = freeSpace + rid.size;
*slot_ptr(page, rid.slot) = freeSpace;
/* assert(!*slot_length_ptr(page, rid.slot) || (-1 == *slot_length_ptr(page, rid.slot)));*/
*slot_length_ptr(page, rid.slot) = rid.size;
}
/** Only used for recovery, to make sure that consistent RID's are created
* on log playback. */
recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) {
writelock(page->rwlatch, 376);
if(*slot_length_ptr(page, rid.slot) == 0 /*|| *slot_length_ptr(page, rid.slot) == -1*/) {
__really_do_ralloc(page, rid);
} else {
assert((rid.size == *slot_length_ptr(page, rid.slot)) ||
(*slot_length_ptr(page, rid.slot) >= PAGE_SIZE));
}
writeunlock(page->rwlatch);
return rid;
}
void pageDeRalloc(Page * page, recordid rid) {
readlock(page->rwlatch, 443);
*slot_ptr(page, rid.slot) = INVALID_SLOT;
*slot_length_ptr(page, rid.slot) = *freelist_ptr(page);
*freelist_ptr(page) = rid.slot;
unlock(page->rwlatch);
}
/*
This should trust the rid (since the caller needs to
override the size in special circumstances)
@todo If the rid size has been overridden, we should check to make
sure that this really is a special record.
*/
void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) {
int slot_length;
readlock(page->rwlatch, 519);
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) {
perror("memcpy");
abort();
}
unlock(page->rwlatch);
}
void pageWriteRecord(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) {
int slot_length;
writelock(page->rwlatch, 529);
assert(rid.size < PAGE_SIZE);
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
if(!memcpy(record_ptr(page, rid.slot), data, rid.size)) {
perror("memcpy");
abort();
}
page->LSN = lsn;
/* *lsn_ptr(page) = lsn */
pageWriteLSN(page);
unlock(page->rwlatch);
}
/** @todo: Should the caller need to obtain the writelock when calling pageSetSlotType? */
void pageSetSlotType(Page * p, int slot, int type) {
assert(type > PAGE_SIZE);
writelock(p->rwlatch, 686);
*slot_length_ptr(p, slot) = type;
unlock(p->rwlatch);
}
int pageGetSlotType(Page * p, int slot, int type) {
int ret;
readlock(p->rwlatch, 693);
ret = *slot_length_ptr(p, slot);
unlock(p->rwlatch);
/* getSlotType does the locking for us. */
return ret > PAGE_SIZE ? ret : NORMAL_SLOT;
}
/*
typedef struct {
int page;
int slot;
/ ** If pageptr is not null, then it is used by the iterator methods.
Otherwise, they re-load the pages and obtain short latches for
each call. * /
Page * pageptr;
} page_iterator_t;
void pageIteratorInit(recordid rid, page_iterator_t * pit, Page * p) {
pit->page = rid.page;
pit->slot = rid.slot;
pit->pageptr = p;
assert((!p) || (p->id == pit->page));
}
int nextSlot(page_iterator_t * pit, recordid * rid) {
Page * p;
int numSlots;
int done = 0;
int ret;
if(pit->pageptr) {
p = pit->pageptr;
} else {
p = loadPage(pit->page);
}
numSlots = readNumSlots(p->memAddr);
while(pit->slot < numSlots && !done) {
if(isValidSlot(p->memAddr, pit->slot)) {
done = 1;
} else {
pit->slot ++;
}
}
if(!done) {
ret = 0;
} else {
ret = 1;
rid->page = pit->page;
rid->slot = pit->slot;
rid->size = getSlotLength(p->memAddr, rid->slot);
if(rid->size >= PAGE_SIZE) {
if(rid->size == BLOB_SLOT) {
blob_record_t br;
pageReadRecord(-1, p, *rid, (byte*)&br);
rid->size = br.size;
}
}
}
if(!pit->pageptr) {
releasePage(p);
}
return ret;
}
*/

21
src/lladd/page/slotted.h Normal file
View file

@ -0,0 +1,21 @@
void pageWriteRecord(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data);
void pageReadRecord(int xid, Page * page, recordid rid, byte *buff);
/**
* assumes that the page is already loaded in memory. It takes as a
* parameter a Page, and returns an estimate of the amount of free space on this
* page. This is either exact, or an underestimate.
* @todo how should this be handled? */
int freespace(Page * p);
void pageInitialize(Page * p);
#define freespace_ptr(page) shorts_from_end((page), 1)
#define numslots_ptr(page) shorts_from_end((page), 2)
#define freelist_ptr(page) shorts_from_end((page), 3)
#define slot_ptr(page, n) shorts_from_end((page), (2*(n))+4)
#define slot_length_ptr(page, n) shorts_from_end((page), (2*(n))+5)
#define record_ptr(page, n) bytes_from_start((page), *slot_ptr((page), (n)))
#define isValidSlot(page, n) ((*slot_ptr((page), (n)) == INVALID_SLOT) ? 0 : 1)

View file

@ -4,6 +4,7 @@
*/ */
#include "page.h" #include "page.h"
#include "page/slotted.h"
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
@ -20,48 +21,19 @@
#include <unistd.h> #include <unistd.h>
static int stable = -1; static int stable = -1;
/** Defined in bufferManager.c */
extern pthread_mutex_t add_pending_mutex;
static pthread_mutex_t stable_mutex; static pthread_mutex_t stable_mutex;
static long myLseek(int f, long offset, int whence); static long myLseek(int f, long offset, int whence);
static long myLseekNoLock(int f, long offset, int whence); static long myLseekNoLock(int f, long offset, int whence);
void pageRead(Page *ret) { void pageRead(Page *ret) {
/* long fileSize; */
long pageoffset; long pageoffset;
long offset; long offset;
/** @todo pageRead() is using fseek to calculate the file size on each read, which is inefficient. */ /** @todo pageRead() is using fseek to calculate the file size on each read, which is inefficient. */
pageoffset = ret->id * PAGE_SIZE; pageoffset = ret->id * PAGE_SIZE;
/* flockfile(stable); */
pthread_mutex_lock(&stable_mutex); pthread_mutex_lock(&stable_mutex);
/* fileSize = myLseekNoLock(stable, 0, SEEK_END); */
/* DEBUG("Reading page %d\n", ret->id); */
/* if(!ret->memAddr) {
ret->memAddr = malloc(PAGE_SIZE);
}
if(!ret->memAddr) {
perror("pageFile.c");
fflush(NULL);
}
assert(ret->memAddr); */
/** @todo was manual extension of the storefile really necessary? */
/* if ((ret->id)*PAGE_SIZE >= fileSize) {
myLseekNoLock(stable, (ret->id - 1) * PAGE_SIZE -1, SEEK_SET);
if(1 != fwrite("", 1, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); }
}
}*/
offset = myLseekNoLock(stable, pageoffset, SEEK_SET); offset = myLseekNoLock(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset); assert(offset == pageoffset);
@ -73,7 +45,7 @@ void pageRead(Page *ret) {
offset = myLseekNoLock(stable, pageoffset, SEEK_SET); offset = myLseekNoLock(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset); assert(offset == pageoffset);
if(fileSize <= pageoffset) { if(fileSize <= pageoffset) {
memset(ret->memAddr, 0, PAGE_SIZE); pageInitialize(ret);
write(stable, ret->memAddr, PAGE_SIZE); write(stable, ret->memAddr, PAGE_SIZE);
} }
} else if(read_size == -1) { } else if(read_size == -1) {
@ -129,20 +101,6 @@ void pageWrite(Page * ret) {
void openPageFile() { void openPageFile() {
DEBUG("Opening storefile.\n"); DEBUG("Opening storefile.\n");
/* if( ! (stable = fopen(STORE_FILE, "r+"))) { / * file may not exist * /
byte* zero = calloc(1, PAGE_SIZE);
if(!(stable = fopen(STORE_FILE, "w+"))) { perror("Couldn't open or create store file"); abort(); }
/ * Write out one page worth of zeros to get started. * /
/ * if(1 != fwrite(zero, PAGE_SIZE, 1, stable)) { assert (0); } * /
free(zero);
}
DEBUG("storefile opened.\n");
*/
stable = open (STORE_FILE, O_CREAT | O_RDWR | O_DIRECT, S_IRWXU | S_IRWXG | S_IRWXO); stable = open (STORE_FILE, O_CREAT | O_RDWR | O_DIRECT, S_IRWXU | S_IRWXG | S_IRWXO);

View file

@ -6,6 +6,7 @@
#include <lladd/recovery.h> #include <lladd/recovery.h>
#include "logger/logWriter.h" #include "logger/logWriter.h"
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
#include "page.h"
#include <lladd/logger/logger2.h> #include <lladd/logger/logger2.h>
#include <stdio.h> #include <stdio.h>

View file

@ -45,6 +45,7 @@ terms specified in this license.
#include <check.h> #include <check.h>
#include "../../src/lladd/page.h" #include "../../src/lladd/page.h"
#include "../../src/lladd/page/slotted.h"
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
#include <lladd/transactional.h> #include <lladd/transactional.h>
@ -54,7 +55,6 @@ terms specified in this license.
#include "../check_includes.h" #include "../check_includes.h"
#define LOG_NAME "check_page.log" #define LOG_NAME "check_page.log"
#define RECORD_SIZE sizeof(int) #define RECORD_SIZE sizeof(int)
@ -65,9 +65,58 @@ terms specified in this license.
Page * loadPage(int pageid); Page * loadPage(int pageid);
pthread_mutex_t random_mutex; pthread_mutex_t random_mutex;
static lsn_t lsn; static lsn_t lsn;
static pthread_mutex_t lsn_mutex; static pthread_mutex_t lsn_mutex;
static void * multiple_simultaneous_pages ( void * arg_ptr) {
Page * p = (Page*)arg_ptr;
int i;
lsn_t this_lsn;
int j;
int first = 1;
int k;
recordid rid[100];
for(i = 0; i < 10000; i++) {
pthread_mutex_lock(&lsn_mutex);
this_lsn = lsn;
lsn++;
pthread_mutex_unlock(&lsn_mutex);
if(! first ) {
/* addPendingEvent(p); */
/*pageReadRecord(1, p, rid, (byte*)&j);*/
for(k = 0; k < 100; k++) {
readRecord(1, p, rid[k], (byte*)&j);
assert((j + 1) == i + k);
pageDeRalloc(p, rid[k]);
sched_yield();
}
}
first = 0;
for(k = 0; k < 100; k++) {
rid[k] = pageRalloc(p, sizeof(short));
i +=k;
/* printf("Slot = %d\n", rid[k].slot); */
writeRecord(1, p, lsn, rid[k], (byte*)&i);
i -=k;
sched_yield();
}
assert(pageReadLSN(p) <= lsn);
}
return NULL;
}
static void* worker_thread(void * arg_ptr) { static void* worker_thread(void * arg_ptr) {
Page * p = (Page*)arg_ptr; Page * p = (Page*)arg_ptr;
int i; int i;
@ -83,7 +132,8 @@ static void* worker_thread(void * arg_ptr) {
if(! first ) { if(! first ) {
/* addPendingEvent(p); */ /* addPendingEvent(p); */
pageReadRecord(1, p, rid, (byte*)&j); /*pageReadRecord(1, p, rid, (byte*)&j);*/
readRecord(1, p, rid, (byte*)&j);
assert((j + 1) == i); assert((j + 1) == i);
pageDeRalloc(p, rid); pageDeRalloc(p, rid);
sched_yield(); sched_yield();
@ -92,8 +142,7 @@ static void* worker_thread(void * arg_ptr) {
first = 0; first = 0;
rid = pageRalloc(p, sizeof(int)); rid = pageRalloc(p, sizeof(int));
/* addPendingEvent(p); */ writeRecord(1, p, lsn, rid, (byte*)&i);
pageWriteRecord(1, p, rid, lsn, (byte*)&i);
sched_yield(); sched_yield();
assert(pageReadLSN(p) <= lsn); assert(pageReadLSN(p) <= lsn);
@ -136,6 +185,95 @@ START_TEST(pageNoThreadTest)
} }
END_TEST END_TEST
/**
@test
*/
START_TEST(pageCheckMacros) {
Page p;
byte buffer[PAGE_SIZE];
memset(buffer, -1, PAGE_SIZE);
p.memAddr = buffer;
lsn_t lsn = 5;
*lsn_ptr(&p) = lsn;
*page_type_ptr(&p) = 10;
*freespace_ptr(&p) = 15;
*numslots_ptr(&p) = 20;
*slot_ptr(&p, 0) = 30;
*slot_ptr(&p, 1) = 35;
*slot_ptr(&p, 40) = 40;
*slot_length_ptr(&p, 0) = 31;
*slot_length_ptr(&p, 1) = 36;
*slot_length_ptr(&p, 40) = 41;
*bytes_from_start(&p, 0) = 50;
*bytes_from_start(&p, 1) = 51;
*bytes_from_start(&p, 2) = 52;
*bytes_from_start(&p, 3) = 53;
*bytes_from_start(&p, 4) = 54;
assert(*lsn_ptr(&p) == lsn);
assert(*page_type_ptr(&p) == 10);
assert(*end_of_usable_space_ptr(&p) == 10);
assert(*freespace_ptr(&p) == 15);
assert(*numslots_ptr(&p) == 20);
assert(*slot_ptr(&p, 0) == 30);
assert(*slot_ptr(&p, 1) == 35);
assert(*slot_ptr(&p, 40) == 40);
assert(*slot_length_ptr(&p, 0) == 31);
assert(*slot_length_ptr(&p, 1) == 36);
assert(*slot_length_ptr(&p, 40) == 41);
assert(*bytes_from_start(&p, 0) == 50);
assert(*bytes_from_start(&p, 1) == 51);
assert(*bytes_from_start(&p, 2) == 52);
assert(*bytes_from_start(&p, 3) == 53);
assert(*bytes_from_start(&p, 4) == 54);
assert(isValidSlot(&p, 0));
assert(isValidSlot(&p, 1));
assert(isValidSlot(&p, 40));
/* invalidateSlot(&p, 0);
invalidateSlot(&p, 1);
invalidateSlot(&p, 40);
assert(!isValidSlot(&p, 0));
assert(!isValidSlot(&p, 1));
assert(!isValidSlot(&p, 40));*/
} END_TEST
/**
@test
Page test that allocates multiple records
*/
START_TEST(pageNoThreadMultPageTest)
{
Page * p;
/* p->id = 0;*/
pthread_mutex_init(&lsn_mutex, NULL);
Tinit();
p = loadPage(1);
multiple_simultaneous_pages(p);
unlock(p->loadlatch);
Tdeinit();
}
END_TEST
/** /**
Check the page implementation in the multi-threaded case. Check the page implementation in the multi-threaded case.
*/ */
@ -153,7 +291,7 @@ START_TEST(pageThreadTest) {
Tinit(); Tinit();
fail_unless(1, NULL); fail_unless(1, NULL);
Page * p = loadPage(1); Page * p = loadPage(2);
fail_unless(1, NULL); fail_unless(1, NULL);
for(i = 0; i < THREAD_COUNT; i++) { for(i = 0; i < THREAD_COUNT; i++) {
@ -179,6 +317,11 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, pageCheckMacros);
tcase_add_test(tc, pageNoThreadMultPageTest);
tcase_add_test(tc, pageNoThreadTest); tcase_add_test(tc, pageNoThreadTest);
tcase_add_test(tc, pageThreadTest); tcase_add_test(tc, pageThreadTest);

View file

@ -47,8 +47,8 @@ terms specified in this license.
#include <lladd/transactional.h> #include <lladd/transactional.h>
#include "../check_includes.h" #include "../check_includes.h"
#define LOG_NAME "check_transactional2.log" #define LOG_NAME "check_transactional2.log"
#define THREAD_COUNT 5 #define THREAD_COUNT 25
#define RECORDS_PER_THREAD 5000 #define RECORDS_PER_THREAD 1000
void arraySet(int * array, int val) { void arraySet(int * array, int val) {
int i; int i;
@ -444,11 +444,11 @@ Suite * check_suite(void) {
TCase *tc = tcase_create("transactional_smokeTest"); TCase *tc = tcase_create("transactional_smokeTest");
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
/* tcase_add_test(tc, transactional_smokeTest); tcase_add_test(tc, transactional_smokeTest);
tcase_add_test(tc, transactional_blobSmokeTest); */ tcase_add_test(tc, transactional_blobSmokeTest);
/* tcase_add_test(tc, transactional_nothreads_commit); tcase_add_test(tc, transactional_nothreads_commit);
tcase_add_test(tc, transactional_threads_commit); */ tcase_add_test(tc, transactional_threads_commit);
/* tcase_add_test(tc, transactional_nothreads_abort); */ tcase_add_test(tc, transactional_nothreads_abort);
tcase_add_test(tc, transactional_threads_abort); tcase_add_test(tc, transactional_threads_abort);
tcase_add_test(tc, transactional_blobs_nothreads_abort); tcase_add_test(tc, transactional_blobs_nothreads_abort);
/* tcase_add_test(tc, transactional_blobs_threads_abort); */ /* tcase_add_test(tc, transactional_blobs_threads_abort); */