diff --git a/lladd/bufferManager.h b/lladd/bufferManager.h index 341a831..990287e 100644 --- a/lladd/bufferManager.h +++ b/lladd/bufferManager.h @@ -45,18 +45,11 @@ terms specified in this license. pageManager - Provides cached page handling, delegates to blob manager when necessary. Doesn't implement an eviction policy. - That is left to a cacheManager. (Multiple cacheManagers can be + That is left to a cacheManager. (Multiple cacheManagers could be used with a single page manager.) - @todo Allow error checking! - @todo Make linux provide a better version of malloc(). We need to - directly DMA pages into and out of userland, or setup mmap() so - that it takes a flag that makes it page mmapped() pages to swap - instead of back to disk. (munmap() and msync() would still hit the - on-disk copy) - @todo Refactoring for lock manager Possible interface for lockManager: @@ -84,12 +77,13 @@ terms specified in this license. * $Id$ */ +#include +#include + #ifndef __BUFFERMANAGER_H__ #define __BUFFERMANAGER_H__ - -#include -#include +BEGIN_C_DECLS /** Page is defined in bufferManager.h as an incomplete type to enforce @@ -114,7 +108,6 @@ Page * loadPage(int pageid); */ void releasePage(Page * p); - /** * initialize buffer manager * @return 0 on success @@ -122,39 +115,17 @@ void releasePage(Page * p); */ int bufInit(); - /** - * @param pageid ID of page you want to read - * @return LSN found on disk + * will write out any dirty pages, assumes that there are no running + * transactions */ -/*long readLSN(int pageid); */ - -/** - * @param xid transaction id @param lsn the lsn that the updated - * record will reflect. This is needed by recovery, and undo. (The - * lsn of a page must always increase. Undos are handled by passing - * in the LSN of the CLR that records the undo.) - * - * @param rid recordid where you want to write @param dat data you - * wish to write - */ -void writeRecord(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat); - -/** - * @param xid transaction ID - * @param rid - * @param dat buffer for data - */ -void readRecord(int xid, Page * page, recordid rid, void *dat); +void bufDeinit(); /** * all actions necessary when committing a transaction. Can assume that the log - * has been written as well as any other actions that do not depend on the + * has been written as well as any other udpates that do not depend on the * buffer manager * - * Basicly, this call is here because we used to do copy on write, and - * it might be useful when locking is implemented. - * * @param xid transaction ID * @param lsn the lsn at which the transaction aborted. (Currently * unused, but may be useful for other implementations of the buffer @@ -175,17 +146,10 @@ int bufTransCommit(int xid, lsn_t lsn); * manager.) * * @return 0 on success - * * @return error code on failure */ int bufTransAbort(int xid, lsn_t lsn); -/** - * will write out any dirty pages, assumes that there are no running - * transactions - */ -void bufDeinit(); - -/*void setSlotType(int pageid, int slot, int type); */ +END_C_DECLS #endif diff --git a/lladd/operations.h b/lladd/operations.h index d9b117a..f62f39c 100644 --- a/lladd/operations.h +++ b/lladd/operations.h @@ -61,21 +61,25 @@ terms specified in this license. BEGIN_C_DECLS -/* @type Function +/** * function pointer that the operation will run */ typedef int (*Function)(int xid, Page * p, lsn_t lsn, recordid r, const void *d); -/* @type Operation +/** - * @param sizeofData size of the data that function accepts (as void*) - * @param undo index into operations table of undo function (takes same args) - * @param run what function to actually run - */ +*/ -/* @type Special cases +/** + If the Operation struct's sizeofData is set to this value, then the + size field of the recordid is used to determine the size of the + argument passed into the operation. */ #define SIZEOF_RECORD -1 +/** If the Operation struct's undo field is set to this value, then + physical logging is used in lieu of logical logging. + */ + #define NO_INVERSE -1 typedef struct { /** @@ -88,12 +92,6 @@ typedef struct { that the operation affects will be used instead. */ long sizeofData; - /** - Does this operation supply an undo operation? - - --Unneeded; just set undo to the special value NO_INVERSE. - */ - /* int invertible; */ /** Implementing operations that may span records is subtle. Recovery assumes that page writes (and therefore logical @@ -123,8 +121,11 @@ typedef struct { We chose the second option for now. */ - int undo; - Function run; + /** + index into operations table of undo function + */ + int undo; + Function run; } Operation; /* These need to be installed, since they are required by applications that use LLADD. */ diff --git a/src/apps/cht/cht.c b/src/apps/cht/cht.c index 49faf5b..cb9e142 100644 --- a/src/apps/cht/cht.c +++ b/src/apps/cht/cht.c @@ -285,7 +285,7 @@ int _chtEval(DfaSet * dfaSet, state_name init_xact_cht(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) { - TwoPCMachineState * state = (TwoPCMachineState*) &(stateMachine->app_state); + /* TwoPCMachineState * state = (TwoPCMachineState*) &(stateMachine->app_state);*/ TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); CHTAppState * app_state_cht = app_state_2pc->app_state; diff --git a/src/libdfa/libdfa.c b/src/libdfa/libdfa.c index e09210e..526a105 100644 --- a/src/libdfa/libdfa.c +++ b/src/libdfa/libdfa.c @@ -140,14 +140,14 @@ void recover(DfaSet * dfaSet) { StateMachine sm_stack; StateMachine * sm = &sm_stack; StateMachine * this; - int ret = (jbHtFirst(dfaSet->smash->xid, dfaSet->smash->hash, sm) != -1); + int ret = (jbHtFirst(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1); while(ret) { this = getSmash(dfaSet->smash, sm->machine_id); printf("StateMachine %ld\n", sm->machine_id); this->worker_thread = spawn_worker_thread(dfaSet, sm->machine_id); - ret = (jbHtNext(dfaSet->smash->xid, dfaSet->smash->hash, sm) != -1); + ret = (jbHtNext(dfaSet->smash->xid, dfaSet->smash->hash, (byte*)sm) != -1); } } diff --git a/src/libdfa/smash.c b/src/libdfa/smash.c index 125188e..3c6cc98 100644 --- a/src/libdfa/smash.c +++ b/src/libdfa/smash.c @@ -93,7 +93,7 @@ StateMachine * _insertSmash(smash_t * smash, state_machine_id id) { new->current_state = START_STATE; /* printf("Insert %ld\n", id); */ - ret = (-1 != jbHtInsert(smash->xid, smash->hash, &id, sizeof(state_machine_id), new, sizeof(StateMachine))); + ret = (-1 != jbHtInsert(smash->xid, smash->hash, (byte*)&id, sizeof(state_machine_id), (byte*)new, sizeof(StateMachine))); pblHtInsert(smash->memHash, &id, sizeof(state_machine_id), new); /* Tcommit(smash->xid); smash->xid = Tbegin(); */ @@ -136,7 +136,7 @@ StateMachine * insertSmash(smash_t * smash, state_machine_id id) { pthread_mutex_lock(smash->lock); - if(jbHtLookup(smash->xid, smash->hash, &(smash->next_sm_id), sizeof(state_machine_id), &junk) != -1) { + if(jbHtLookup(smash->xid, smash->hash, (byte*)&(smash->next_sm_id), sizeof(state_machine_id), (byte*)&junk) != -1) { pthread_mutex_unlock(smash->lock); return NULL; } @@ -167,7 +167,7 @@ int freeSmash (smash_t * smash, state_machine_id id) { free(old->sleepCond); pblHtRemove(smash->memHash, &(id), sizeof(state_machine_id)); - ret = jbHtRemove(smash->xid, smash->hash, &(id), sizeof(state_machine_id), NULL) != -1; + ret = jbHtRemove(smash->xid, smash->hash, (byte*)&(id), sizeof(state_machine_id), NULL) != -1; free(old); @@ -194,7 +194,7 @@ int _setSmash(smash_t * smash, state_machine_id id) { StateMachine * machine; machine = _getSmash(smash, id); - return (-1 != jbHtInsert(smash->xid, smash->hash, &id, sizeof(state_machine_id), machine, sizeof(StateMachine))); + return (-1 != jbHtInsert(smash->xid, smash->hash, (byte*)&id, sizeof(state_machine_id),(byte*) machine, sizeof(StateMachine))); } diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index 6b03333..439eec0 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -3,6 +3,6 @@ lib_LIBRARIES=liblladd.a #liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a # removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c -liblladd_a_SOURCES=common.c stats.c io.c bufferManager.c linkedlist.c operations.c pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c #operations/lladdhash.c +liblladd_a_SOURCES=common.c stats.c io.c bufferManager.c linkedlist.c operations.c pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c page/slotted.c #operations/lladdhash.c AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index 98661db..f32e1be 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -50,13 +50,13 @@ terms specified in this license. #include #include -#include "page.h" - #include + +#include "page.h" #include "blobManager.h" #include - #include "pageFile.h" + #include @@ -138,44 +138,6 @@ void releasePage (Page * p) { unlock(p->loadlatch); } -void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { - - /* Page *p; */ - - if(rid.size > BLOB_THRESHOLD_SIZE) { - /* DEBUG("Writing blob.\n"); */ - writeBlob(xid, p, lsn, rid, dat); - - } else { - /* DEBUG("Writing record.\n"); */ - - assert( (p->id == rid.page) && (p->memAddr != NULL) ); - - /** @todo This assert should be here, but the tests are broken, so it causes bogus failures. */ - /*assert(pageReadLSN(*p) <= lsn);*/ - - pageWriteRecord(xid, p, rid, lsn, dat); - - assert( (p->id == rid.page) && (p->memAddr != NULL) ); - - } -} - -void readRecord(int xid, Page * p, recordid rid, void *buf) { - if(rid.size > BLOB_THRESHOLD_SIZE) { - /* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n", - xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */ - /* @todo should readblob take a page pointer? */ - readBlob(xid, p, rid, buf); - } else { - assert(rid.page == p->id); - /* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n", - xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */ - pageReadRecord(xid, p, rid, buf); - assert(rid.page == p->id); - } -} - int bufTransCommit(int xid, lsn_t lsn) { commitBlobs(xid); @@ -316,6 +278,6 @@ Page * getPage(int pageid, int locktype) { } Page *loadPage(int pageid) { - Page * ret = getPage(pageid, RW); + Page * ret = getPage(pageid, RO); return ret; } diff --git a/src/lladd/logger/logger2.c b/src/lladd/logger/logger2.c index d5c6c74..f57240d 100644 --- a/src/lladd/logger/logger2.c +++ b/src/lladd/logger/logger2.c @@ -45,7 +45,8 @@ terms specified in this license. #include #include "logWriter.h" -#include +#include "page.h" +/*#include */ #include TransactionLog LogTransBegin(int xid) { TransactionLog tl; @@ -100,6 +101,7 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, readRecord(l->xid, p, rid, preImage); DEBUG("got preimage"); } + e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage); diff --git a/src/lladd/operations/decrement.c b/src/lladd/operations/decrement.c index 79a33bb..7f3ed73 100644 --- a/src/lladd/operations/decrement.c +++ b/src/lladd/operations/decrement.c @@ -46,7 +46,8 @@ terms specified in this license. *********************************************/ #include -#include +/*#include */ +#include "../page.h" static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) { int i; diff --git a/src/lladd/operations/increment.c b/src/lladd/operations/increment.c index 83fc378..780ecb0 100644 --- a/src/lladd/operations/increment.c +++ b/src/lladd/operations/increment.c @@ -46,7 +46,8 @@ terms specified in this license. **********************************************/ #include -#include +/*#include */ +#include "../page.h" static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) { int i; diff --git a/src/lladd/operations/set.c b/src/lladd/operations/set.c index e19be38..e8a79b3 100644 --- a/src/lladd/operations/set.c +++ b/src/lladd/operations/set.c @@ -46,7 +46,9 @@ terms specified in this license. **********************************************/ #include -#include +/*#include */ +#include "../page.h" + static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) { writeRecord(xid, p, lsn, rid, dat); return 0; diff --git a/src/lladd/page.c b/src/lladd/page.c index 422501e..f711a93 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -40,52 +40,6 @@ permission to use and distribute the software in accordance with the terms specified in this license. ---*/ -/************************************************************************ - * implementation of pages - - STRUCTURE OF A PAGE - - +-------------------------------------------+-----------------------+--+ - | DATA SECTION +--------->| RID: (PAGE, 0) | | - | +-----------------+ | +-----------------------+ | - | +-->| RID: (PAGE, 1) | | | - | | +-----------------+ | | - | | | | - | +-----------------+ | +----------------------------+ - | | | +--->| RID: (PAGE, n) | - | | | | +----------------------------+ - |======================================================================| - |^ FREE SPACE | | | | - |+-----------------------|-------|---|--------------------+ | - | | | | | | - | +-------------|-------|---+ | | - | | | | | | - | +---|---+-----+---|---+---|---+--------------+-----|------+-----+ - | | slotn | ... | slot1 | slot0 | num of slots | free space | LSN | - +------+-------+-----+-------+-------+--------------+------------+-----+ - - NOTE: - - slots are zero indexed. - - slots are of implemented as (offset, length) - - Latching summary: - - Each page has an associated read/write lock. This lock only - protects the internal layout of the page, and the members of the - page struct. Here is how it is held in various circumstances: - - Record allocation: Write lock - Record read: Read lock - Read LSN Read lock - Record write *READ LOCK* - Write LSN Write lock - - Any circumstance where these locks are held during an I/O operation - is a bug. - - $Id$ - -************************************************************************/ /* _XOPEN_SOURCE is needed for posix_memalign */ #define _XOPEN_SOURCE 600 #include @@ -103,28 +57,11 @@ terms specified in this license. #include "blobManager.h" #include "pageFile.h" +#include "page/slotted.h" + /* TODO: Combine with buffer size... */ static int nextPage = 0; -static const byte *slotMemAddr(const byte *memAddr, int slotNum) ; - -/** @todo: Why does only one of the get/set First/Second HalfOfWord take an unsigned int? */ -static int getFirstHalfOfWord(unsigned int *memAddr); -static int getSecondHalfOfWord(int *memAddr); -static void setFirstHalfOfWord(int *memAddr, int value); -static void setSecondHalfOfWord(int *memAddr, int value); - -static int readFreeSpace(byte *memAddr); -static void writeFreeSpace(byte *memAddr, int newOffset); -static int readNumSlots(byte *memAddr); -static void writeNumSlots(byte *memAddr, int numSlots); - -static int getSlotOffset(byte *memAddr, int slot) ; -static int getSlotLength(byte *memAddr, int slot) ; -static void setSlotOffset(byte *memAddr, int slot, int offset) ; -static void setSlotLength(byte *memAddr, int slot, int length) ; - - /** Invariant: This lock should be held while updating lastFreepage, or while performing any operation that may decrease the amount of @@ -140,82 +77,12 @@ static unsigned int lastFreepage = 0; -/** @todo replace static ints in page.c with #defines. */ - -/* ------ */ - -static int SLOT_OFFSET_SIZE; -static int SLOT_LENGTH_SIZE; -static int SLOT_SIZE; - -static int LSN_SIZE; -static int FREE_SPACE_SIZE; -static int NUMSLOTS_SIZE; - -static int START_OF_LSN; -static int START_OF_FREE_SPACE; -static int START_OF_NUMSLOTS; - -static int MASK_0000FFFF; -static int MASK_FFFF0000; - /* ------ */ static pthread_mutex_t pageAllocMutex; /** We need one dummy page for locking purposes, so this array has one extra page in it. */ Page pool[MAX_BUFFER_SIZE+1]; - -/* ------------------ STATIC FUNCTIONS. NONE OF THESE ACQUIRE LOCKS - ON THE MEMORY THAT IS PASSED INTO THEM -------------*/ - -static int isValidSlot(byte *memAddr, int slot); -static void invalidateSlot(byte *memAddr, int slot); - -/** - The caller of this function must already have a writelock on the - page. -*/ -static void pageCompact(Page * page); - -static int getFirstHalfOfWord(unsigned int *memAddr) { - unsigned int word = *memAddr; - word = (word >> (2*BITS_PER_BYTE)); /* & MASK_0000FFFF; */ - return word; -} - - -static int getSecondHalfOfWord(int *memAddr) { - int word = *memAddr; - word = word & MASK_0000FFFF; - return word; -} - -static void setFirstHalfOfWord(int *memAddr, int value){ - int word = *memAddr; - word = word & MASK_0000FFFF; - word = word | (value << (2*BITS_PER_BYTE)); - *memAddr = word; -} - - -static void setSecondHalfOfWord(int *memAddr, int value) { - int word = *memAddr;; - word = word & MASK_FFFF0000; - word = word | (value & MASK_0000FFFF); - *memAddr = word; -} - -/** - * slotMemAddr() calculates the memory address of the given slot. It does this - * by going to the end of the page, then walking backwards, past the LSN field - * (LSN_SIZE), past the 'free space' and 'num of slots' fields (NUMSLOTS_SIZE), - * and then past a slotNum slots (slotNum * SLOT_SIZE). - */ -static const byte *slotMemAddr(const byte *memAddr, int slotNum) { - return (memAddr + PAGE_SIZE) - (LSN_SIZE + FREE_SPACE_SIZE + NUMSLOTS_SIZE + ((slotNum+1) * SLOT_SIZE)); -} - /** * pageWriteLSN() assumes that the page is already loaded in memory. It takes * as a parameter a Page. The Page struct contains the new LSN and the page @@ -223,159 +90,30 @@ static const byte *slotMemAddr(const byte *memAddr, int slotNum) { * * @param page You must have a writelock on page before calling this function. */ -static void pageWriteLSN(Page * page) { +void pageWriteLSN(Page * page) { /* unlocked since we're only called by a function that holds the writelock. */ - *(long *)(page->memAddr + START_OF_LSN) = page->LSN; - -} - -static int unlocked_freespace(Page * page); - -/** - Just like freespace(), but doesn't obtain a lock. (So that other methods in this file can use it.) -*/ -static int unlocked_freespace(Page * page) { - int space; - space= (slotMemAddr(page->memAddr, readNumSlots(page->memAddr)) - (page->memAddr + readFreeSpace(page->memAddr))); - return (space < 0) ? 0 : space; + /* *(long *)(page->memAddr + START_OF_LSN) = page->LSN; */ + *lsn_ptr(page) = page->LSN; } /** - * readFreeSpace() assumes that the page is already loaded in memory. It takes - * as a parameter the memory address of the loaded page in memory and returns - * the offset at which the free space section of this page begins. + * pageReadLSN() assumes that the page is already loaded in memory. It takes + * as a parameter a Page and returns the LSN that is currently written on that + * page in memory. */ -static int readFreeSpace(byte *memAddr) { - return getSecondHalfOfWord((int*)(memAddr + START_OF_NUMSLOTS)); -} +lsn_t pageReadLSN(const Page * page) { + lsn_t ret; -/** - * writeFreeSpace() assumes that the page is already loaded in memory. It takes - * as parameters the memory address of the loaded page in memory and a new offset - * in the page that will denote the point at which free space begins. - */ -static void writeFreeSpace(byte *memAddr, int newOffset) { - setSecondHalfOfWord((int*)(memAddr + START_OF_NUMSLOTS), newOffset); -} + readlock(page->rwlatch, 259); + /* ret = *(long *)(page->memAddr + START_OF_LSN); */ + ret = *lsn_ptr(page); + readunlock(page->rwlatch); -/** - * readNumSlots() assumes that the page is already loaded in memory. It takes - * as a parameter the memory address of the loaded page in memory, and returns - * the memory address at which the free space section of this page begins. - */ -static int readNumSlots(byte *memAddr) { - return getFirstHalfOfWord((unsigned int*)(memAddr + START_OF_NUMSLOTS)); -} - -/** - * writeNumSlots() assumes that the page is already loaded in memory. It takes - * as parameters the memory address of the loaded page in memory and an int - * to which the value of the numSlots field in the page will be set to. - */ -static void writeNumSlots(byte *memAddr, int numSlots) { - setFirstHalfOfWord((int*)(unsigned int*)(memAddr + START_OF_NUMSLOTS), numSlots); -} - -static int isValidSlot(byte *memAddr, int slot) { - return (getSlotOffset(memAddr, slot) != INVALID_SLOT) ? 1 : 0; -} - -static void invalidateSlot(byte *memAddr, int slot) { - setSlotOffset(memAddr, slot, INVALID_SLOT); + return ret; } -/** - - Move all of the records to the beginning of the page in order to - increase the available free space. - - @todo If we were supporting multithreaded operation, this routine - would need to pin the pages that it works on. -*/ -static void pageCompact(Page * page) { - - int i; - byte buffer[PAGE_SIZE]; - int freeSpace = 0; - int numSlots; - int meta_size; - int slot_length; - int last_used_slot = -1; - - numSlots = readNumSlots(page->memAddr); - - /* DEBUG("Compact: numSlots=%d\n", numSlots); */ - meta_size = LSN_SIZE + FREE_SPACE_SIZE + NUMSLOTS_SIZE + (SLOT_SIZE*numSlots); - - /* Can't compact in place, slot numbers can come in different orders than - the physical space allocated to them. */ - memcpy(buffer + PAGE_SIZE - meta_size, page->memAddr + PAGE_SIZE - meta_size, meta_size); - - for (i = 0; i < numSlots; i++) { - /* DEBUG("i = %d\n", i); */ - if (isValidSlot(page->memAddr, i)) { - /* DEBUG("Buffer offset: %d\n", freeSpace); */ - slot_length = getSlotLength(page->memAddr, i); - memcpy(buffer + freeSpace, page->memAddr + getSlotOffset(page->memAddr, i), slot_length); - setSlotOffset(buffer, i, freeSpace); - freeSpace += slot_length; - last_used_slot = i; - } - } - - - /* if (last_used_slot < numSlots) { */ - writeNumSlots(buffer, last_used_slot + 1); - /*} */ - - /* DEBUG("freeSpace = %d, num slots = %d\n", freeSpace, last_used_slot + 1); */ - - writeFreeSpace(buffer, freeSpace); - - memcpy(page->memAddr, buffer, PAGE_SIZE); - -} - -/** - * getSlotOffset() assumes that the page is already loaded in memory. It takes - * as parameters the memory address of the page loaded in memory, and a slot - * number. It returns the offset corresponding to that slot. - */ -static int getSlotOffset(byte *memAddr, int slot) { - return getFirstHalfOfWord((unsigned int*)slotMemAddr(memAddr, slot)); -} - -/** - * getSlotLength() assumes that the page is already loaded in memory. It takes - * as parameters the memory address of the page loaded in memory, and a slot - * number. It returns the length corresponding to that slot. - */ -static int getSlotLength(byte *memAddr, int slot) { - return getSecondHalfOfWord((int*)(unsigned int*)slotMemAddr(memAddr, slot)); -} - -/** - * setSlotOffset() assumes that the page is already loaded in memory. It takes - * as parameters the memory address of the page loaded in memory, a slot number, - * and an offset. It sets the offset of the given slot to the offset passed in - * as a parameter. - */ -static void setSlotOffset(byte *memAddr, int slot, int offset) { - setFirstHalfOfWord((int*)slotMemAddr(memAddr, slot), offset); -} - -/** - * setSlotLength() assumes that the page is already loaded in memory. It takes - * as parameters the memory address of the page loaded in memory, a slot number, - * and a length. It sets the length of the given slot to the length passed in - * as a parameter. - */ -static void setSlotLength(byte *memAddr, int slot, int length) { - setSecondHalfOfWord((int*)(unsigned int*)slotMemAddr(memAddr, slot), length); -} - static void pageReallocNoLock(Page *p, int id) { p->id = id; p->LSN = 0; @@ -409,20 +147,20 @@ void pageInit() { * and the greatest offset at which a record could possibly * start is at the end of the page */ - SLOT_LENGTH_SIZE = SLOT_OFFSET_SIZE = 2; /* in bytes */ + /* SLOT_LENGTH_SIZE = SLOT_OFFSET_SIZE = 2; / * in bytes * / SLOT_SIZE = SLOT_OFFSET_SIZE + SLOT_LENGTH_SIZE; LSN_SIZE = sizeof(long); FREE_SPACE_SIZE = NUMSLOTS_SIZE = 2; - /* START_OF_LSN is the offset in the page to the lsn */ + / * START_OF_LSN is the offset in the page to the lsn * / START_OF_LSN = PAGE_SIZE - LSN_SIZE; START_OF_FREE_SPACE = START_OF_LSN - FREE_SPACE_SIZE; START_OF_NUMSLOTS = START_OF_FREE_SPACE - NUMSLOTS_SIZE; MASK_0000FFFF = (1 << (2*BITS_PER_BYTE)) - 1; MASK_FFFF0000 = ~MASK_0000FFFF; - +*/ pthread_mutex_init(&pageAllocMutex, NULL); for(int i = 0; i < MAX_BUFFER_SIZE+1; i++) { @@ -446,111 +184,13 @@ void pageDeInit() { } } -typedef struct { - int page; - int slot; - /** If pageptr is not null, then it is used by the iterator methods. - Otherwise, they re-load the pages and obtain short latches for - each call. */ - Page * pageptr; -} page_iterator_t; - -void pageIteratorInit(recordid rid, page_iterator_t * pit, Page * p) { - pit->page = rid.page; - pit->slot = rid.slot; - pit->pageptr = p; - assert((!p) || (p->id == pit->page)); -} - -int nextSlot(page_iterator_t * pit, recordid * rid) { - Page * p; - int numSlots; - int done = 0; - int ret; - if(pit->pageptr) { - p = pit->pageptr; - } else { - p = loadPage(pit->page); - } - - numSlots = readNumSlots(p->memAddr); - while(pit->slot < numSlots && !done) { - - if(isValidSlot(p->memAddr, pit->slot)) { - done = 1; - } else { - pit->slot ++; - } - - } - if(!done) { - ret = 0; - } else { - ret = 1; - rid->page = pit->page; - rid->slot = pit->slot; - rid->size = getSlotLength(p->memAddr, rid->slot); - if(rid->size >= PAGE_SIZE) { - - if(rid->size == BLOB_SLOT) { - blob_record_t br; - pageReadRecord(-1, p, *rid, (byte*)&br); - rid->size = br.size; - } - } - } - - if(!pit->pageptr) { - releasePage(p); - } - - return ret; - -} - - void pageCommit(int xid) { - /* rmTouch(xid); */ } void pageAbort(int xid) { - /* rmTouch(xid); */ } -/** - * pageReadLSN() assumes that the page is already loaded in memory. It takes - * as a parameter a Page and returns the LSN that is currently written on that - * page in memory. - */ -lsn_t pageReadLSN(const Page * page) { - lsn_t ret; - - readlock(page->rwlatch, 259); - ret = *(long *)(page->memAddr + START_OF_LSN); - readunlock(page->rwlatch); - - return ret; -} - -/** - * freeSpace() assumes that the page is already loaded in memory. It takes - * as a parameter a Page, and returns an estimate of the amount of free space - * available to a new slot on this page. (This is the amount of unused space - * in the page, minus the size of a new slot entry.) This is either exact, - * or an underestimate. - * - * @todo is it ever safe to call freespace without a lock on the page? - * - */ -int freespace(Page * page) { - int ret; - readlock(page->rwlatch, 292); - ret = unlocked_freespace(page); - readunlock(page->rwlatch); - return ret; -} - /** @todo ralloc ignores it's xid parameter; change the interface? */ recordid ralloc(int xid, long size) { @@ -581,161 +221,8 @@ recordid ralloc(int xid, long size) { -recordid pageRalloc(Page * page, int size) { - int freeSpace; - int numSlots; - int i; - - writelock(page->rwlatch, 342); - if(unlocked_freespace(page) < size) { - - pageCompact(page); - - /* Make sure there's enough free space... */ - - /*#ifdef DEBUGGING*/ - assert (unlocked_freespace(page) >= (int)size); /*Expensive, so skip it when debugging is off. */ - /*#endif */ - - } - freeSpace = readFreeSpace(page->memAddr); - numSlots = readNumSlots(page->memAddr); - recordid rid; - - - rid.page = page->id; - rid.slot = numSlots; - rid.size = size; - - - /* - Reuse an old (invalid) slot entry. Why was this here? - - @todo is slot reuse in page.c a performance bottleneck? - - */ - for (i = 0; i < numSlots; i++) { - if (!isValidSlot(page->memAddr, i)) { - rid.slot = i; - break; - } - } - - if (rid.slot == numSlots) { - writeNumSlots(page->memAddr, numSlots+1); - } - - setSlotOffset(page->memAddr, rid.slot, freeSpace); - setSlotLength(page->memAddr, rid.slot, rid.size); - writeFreeSpace(page->memAddr, freeSpace + rid.size); - - writeunlock(page->rwlatch); - - /* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */ - - return rid; -} - - -/** Only used for recovery, to make sure that consistent RID's are created - * on log playback. */ -recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) { - int freeSpace; - int numSlots; - - writelock(page->rwlatch, 376); - - freeSpace = readFreeSpace(page->memAddr); - numSlots= readNumSlots(page->memAddr); - - /* printf("!"); fflush(NULL); */ - -/* if(rid.size > BLOB_THRESHOLD_SIZE) { - return blobSlotAlloc(page, lsn_t lsn, recordid rid); - }*/ - - /* assert(rid.slot >= numSlots); */ - - /** @todo for recovery, pageSlotRalloc assumes no other thread added a slot - between when ralloc and it were called. (This may be a - safe assumption..) */ - - if(getSlotLength(page->memAddr, rid.slot) == 0) { - - /* if(rid.slot >= numSlots) { */ - - if (unlocked_freespace(page) < rid.size) { /*freeSpace < rid.size) { */ - pageCompact(page); - freeSpace = readFreeSpace(page->memAddr); - assert (freeSpace < rid.size); - } - - setSlotOffset(page->memAddr, rid.slot, freeSpace); - setSlotLength(page->memAddr, rid.slot, rid.size); - writeFreeSpace(page->memAddr, freeSpace + rid.size); - /* printf("?"); fflush(NULL);*/ - } else { - assert((rid.size == getSlotLength(page->memAddr, rid.slot)) || - (getSlotLength(page->memAddr, rid.slot) >= PAGE_SIZE)); /* Fails. Why? */ - } - writeunlock(page->rwlatch); - - return rid; -} - - -void pageDeRalloc(Page * page, recordid rid) { - - readlock(page->rwlatch, 443); - invalidateSlot(page->memAddr, rid.slot); - unlock(page->rwlatch); -} - -/* - This should trust the rid (since the caller needs to - override the size in special circumstances) - - @todo If the rid size has been overridden, we should check to make - sure that this really is a special record. -*/ -void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) { - byte *recAddress; - int slot_length; - readlock(page->rwlatch, 519); - - assert(page->id == rid.page); - recAddress = page->memAddr + getSlotOffset(page->memAddr, rid.slot); - - slot_length = getSlotLength(page->memAddr, rid.slot); - - assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE)); - - memcpy(buff, recAddress, rid.size); - unlock(page->rwlatch); - -} - -void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data) { - - byte *rec; - int len; - readlock(page->rwlatch, 529); - assert(rid.size < PAGE_SIZE); - - rec = page->memAddr + getSlotOffset(page->memAddr, rid.slot); - len = getSlotLength(page->memAddr, rid.slot); - assert(rid.size == len || len >= PAGE_SIZE); - if(memcpy(rec, data, rid.size) == NULL ) { - printf("ERROR: MEM_WRITE_ERROR on %s line %d", __FILE__, __LINE__); - exit(MEM_WRITE_ERROR); - } - - page->LSN = lsn; - pageWriteLSN(page); - unlock(page->rwlatch); - -} +/** @todo Does pageRealloc really need to obtain a lock? */ void pageRealloc(Page *p, int id) { writelock(p->rwlatch, 10); pageReallocNoLock(p,id); @@ -756,93 +243,54 @@ Page *pageAlloc(int id) { page = &(pool[nextPage]); - /* We have an implicit lock on rwlatch, since we allocated it, but - haven't returned yet. */ - /* page->rwlatch = initlock(); - page->loadlatch = initlock(); - - page->memAddr = malloc(PAGE_SIZE); */ - nextPage++; - assert(nextPage <= MAX_BUFFER_SIZE + 1); /* There's a dummy page that we need to keep around, thus the +1 */ + /* There's a dummy page that we need to keep around, thus the +1 */ + assert(nextPage <= MAX_BUFFER_SIZE + 1); pthread_mutex_unlock(&pageAllocMutex); return page; } -void printPage(byte *memAddr) { - int i = 0; - for (i = 0; i < PAGE_SIZE; i++) { - if((*(char *)(memAddr+i)) == 0) { - printf("#"); - }else { - printf("%c", *(char *)(memAddr+i)); - } - if((i+1)%4 == 0) - printf(" "); - } +void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { + + /* writelock(p->rwlatch, 225); *//* Need a writelock so that we can update the lsn. */ + + if(rid.size > BLOB_THRESHOLD_SIZE) { + /* DEBUG("Writing blob.\n"); */ + writeBlob(xid, p, lsn, rid, dat); + + } else { + /* DEBUG("Writing record.\n"); */ + + assert( (p->id == rid.page) && (p->memAddr != NULL) ); + + /** @todo This assert should be here, but the tests are broken, so it causes bogus failures. */ + /*assert(pageReadLSN(*p) <= lsn);*/ + + pageWriteRecord(xid, p, lsn, rid, dat); + + assert( (p->id == rid.page) && (p->memAddr != NULL) ); + + } + + /* p->LSN = lsn; + pageWriteLSN(p); + unlock(p->rwlatch); */ } -#define num 20 -int pageTest() { - - Page * page = malloc(sizeof(Page)); - - recordid rid[num]; - char *str[num] = {"one", - "two", - "three", - "four", - "five", - "six", - "seven", - "eight", - "nine", - "ten", - "eleven", - "twelve", - "thirteen", - "fourteen", - "fifteen", - "sixteen", - "seventeen", - "eighteen", - "nineteen", - "twenty"}; - int i; - - page->memAddr = (byte *)malloc(PAGE_SIZE); - memset(page->memAddr, 0, PAGE_SIZE); - for (i = 0; i < num; i++) { - rid[i] = pageRalloc(page, strlen(str[i]) + 1); - pageWriteRecord(0, page, rid[i], 1, (byte*)str[i]); - } - printPage(page->memAddr); - - for (i = 0; i < num; i+= 2) - pageDeRalloc(page, rid[i]); - - pageCompact(page); - printf("\n\n\n"); - printPage(page->memAddr); - return 0; +void readRecord(int xid, Page * p, recordid rid, void *buf) { + if(rid.size > BLOB_THRESHOLD_SIZE) { + /* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n", + xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */ + /* @todo should readblob take a page pointer? */ + readBlob(xid, p, rid, buf); + } else { + assert(rid.page == p->id); + /* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n", + xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */ + pageReadRecord(xid, p, rid, buf); + assert(rid.page == p->id); + } } -/** @todo: Should the caller need to obtain the writelock when calling pageSetSlotType? */ -void pageSetSlotType(Page * p, int slot, int type) { - assert(type > PAGE_SIZE); - writelock(p->rwlatch, 686); - setSlotLength(p->memAddr, slot, type); - unlock(p->rwlatch); -} - -int pageGetSlotType(Page * p, int slot, int type) { - int ret; - readlock(p->rwlatch, 693); - ret = getSlotLength(p->memAddr, slot); - unlock(p->rwlatch); - - /* getSlotType does the locking for us. */ - return ret > PAGE_SIZE ? ret : NORMAL_SLOT; -} diff --git a/src/lladd/page.h b/src/lladd/page.h index bbd45ed..80a8f5b 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -45,10 +45,39 @@ terms specified in this license. * * interface for dealing with slotted pages * + * This file provides a re-entrant interface for pages that contain + * variable-size records. + * * @ingroup LLADD_CORE * $Id$ * - * @todo update docs in this file. + * @todo The slotted pages implementation, and the rest of the page + * structure should be seperated, and each page should have a 'type' + * slot so that we can implement multiple page types on top of LLADD. + +Slotted page layout: + + END: + lsn (4 bytes) + type (2 bytes) + free space (2 bytes) + num of slots (2 bytes) + freelist head(2 bytes) + slot 0 (2 bytes) + slot 1 (2 bytes) + ... + slot n (2 bytes) + ... + unused + ... + record n (x bytes) + ... + record 0 (y bytes) + record 1 (z bytes) + + START + + **/ #ifndef __PAGE_H__ @@ -57,16 +86,31 @@ terms specified in this license. #include #include #include "latches.h" -/** @todo page.h includes things that it shouldn't! (Or, page.h shouldn't be an installed header.) */ +/** @todo page.h includes things that it shouldn't, and page.h should eventually be an installed header. */ #include - -/*#ifdef __BUFFERMANAGER_H__ - #error bufferManager.h must be included after page.h -#endif*/ - #include + + + BEGIN_C_DECLS +/* +#define LSN_SIZE sizeof(lsn_t) +#define START_OF_LSN (PAGE_SIZE - LSN_SIZE) +#define PAGE_TYPE_SIZE 0 +#define START_OF_PAGE_TYPE (START_OF_LSN - PAGE_TYPE_SIZE) +#define USABLE_SPACE_SIZE (START_OF_PAGE_TYPE)*/ + +#define lsn_ptr(page) (((lsn_t *)(&((page)->memAddr[PAGE_SIZE])))-1) +#define page_type_ptr(page) (((int*)lsn_ptr((page)))-1) +#define end_of_usable_space_ptr(page) page_type_ptr((page)) + +#define shorts_from_end(page, count) (((short*)end_of_usable_space_ptr((page)))-(count)) +#define bytes_from_start(page, count) (((byte*)((page)->memAddr))+(count)) + + + +/*#define invalidateSlot(page, n) (*slot_ptr((page), (n)) = INVALID_SLOT)*/ /** The page type contains in-memory information about pages. This @@ -76,9 +120,7 @@ BEGIN_C_DECLS In particular, our current page replacement policy requires two doubly linked lists, - @todo In general, we pass around page structs (as opposed to page - pointers). This is starting to become cumbersome, as the page - struct is becoming more complex...) + @todo The Page struct should be tuned for better memory utilization. */ struct Page_s { /** @todo Shouldn't Page.id be a long? */ @@ -148,42 +190,9 @@ struct Page_s { @see rwlatch, getPage(), pageRalloc(), pageRead() - - */ rwl * loadlatch; - - /** This mutex protects the pending field. We don't use rwlatch for - this, since we also need to use a condition variable to update - this properly, and there are no read-only functions for the - pending field. */ - - /* pthread_cond_t noMorePending; */ /* pthread_cond_t */ - - /* int waiting; */ - - /** - In the multi-threaded case, before we steal a page, we need to - know that all pending actions have been completed. Here, we - track that on a per-resident page basis, by incrementing the - pending field each time we generate a log entry that will result - in a write to the corresponding page. - - (For a concrete example of why this is needed, imagine two - threads write to different records on the same page, and get - LSN's 1 and 2. If 2 happens to write first, then the page is - stolen, and then we crash, recovery will not know that the page - does not reflect LSN 1.) - - "Pending events" are calls to functions that take lsn's. - Currently, those functions are writeRecord and pageSlotRalloc. - - @todo work out what happens with kickPage() and loadPage() more - carefully. - - */ - /* int pending; */ }; /** @@ -191,7 +200,6 @@ struct Page_s { * functions dealing with pages. */ void pageInit(); - void pageDeInit(); /** @@ -209,11 +217,22 @@ void pageDeInit(); lsn_t pageReadLSN(const Page * page); /** - * assumes that the page is already loaded in memory. It takes as a - * parameter a Page, and returns an estimate of the amount of free space on this - * page. This is either exact, or an underestimate. + * @param xid transaction id @param lsn the lsn that the updated + * record will reflect. This is needed by recovery, and undo. (The + * lsn of a page must always increase. Undos are handled by passing + * in the LSN of the CLR that records the undo.) + * + * @param rid recordid where you want to write @param dat data you + * wish to write */ -int freespace(Page * page); +void writeRecord(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat); + +/** + * @param xid transaction ID + * @param rid + * @param dat buffer for data + */ +void readRecord(int xid, Page * page, recordid rid, void *dat); /** * allocate a record. This must be done in two phases. The first @@ -231,7 +250,6 @@ int freespace(Page * page); recordid ralloc(int xid, long size); - /** * assumes that the page is already loaded in memory. It takes as * parameters a Page and the size in bytes of the new record. pageRalloc() @@ -252,34 +270,18 @@ recordid ralloc(int xid, long size); * @todo Makes no attempt to reuse old recordid's. */ recordid pageRalloc(Page * page, int size); - -void pageDeRalloc(Page * page, recordid rid); - -void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data); - -void pageReadRecord(int xid, Page * page, recordid rid, byte *buff); - -void pageCommit(int xid); - -void pageAbort(int xid); - -/*void pageReallocNoLock(Page * p, int id); */ -/** @todo Do we need a locking version of pageRealloc? */ -void pageRealloc(Page * p, int id); - -Page* pageAlloc(int id); - recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid); void pageDeRalloc(Page * page, recordid rid); -/*int pageTest(); */ +void pageCommit(int xid); +void pageAbort(int xid); + +Page* pageAlloc(int id); +void pageRealloc(Page * p, int id); -int pageGetSlotType(Page * p, int slot, int type); +int pageGetSlotType(Page * p, int slot, int type); void pageSetSlotType(Page * p, int slot, int type); -Page * loadPage(int page); -Page * getPage(int page, int mode); - END_C_DECLS #endif diff --git a/src/lladd/page/slotted.c b/src/lladd/page/slotted.c new file mode 100644 index 0000000..93aa918 --- /dev/null +++ b/src/lladd/page/slotted.c @@ -0,0 +1,412 @@ +/************************************************************************ + * implementation of pages + + STRUCTURE OF A PAGE + + +-------------------------------------------+-----------------------+--+ + | DATA SECTION +--------->| RID: (PAGE, 0) | | + | +-----------------+ | +-----------------------+ | + | +-->| RID: (PAGE, 1) | | | + | | +-----------------+ | | + | | | | + | +-----------------+ | +----------------------------+ + | | | +--->| RID: (PAGE, n) | + | | | | +----------------------------+ + |======================================================================| + |^ FREE SPACE | | | | + |+-----------------------|-------|---|--------------------+ | + | | | | | | + | +-------------|-------|---+ | | + | | | | | | + | +---|---+-----+---|---+---|---+--------------+-----|------+-----+ + | | slotn | ... | slot1 | slot0 | num of slots | free space | LSN | + +------+-------+-----+-------+-------+--------------+------------+-----+ + + NOTE: + - slots are zero indexed. + - slots are of implemented as (offset, length) + + Latching summary: + + Each page has an associated read/write lock. This lock only + protects the internal layout of the page, and the members of the + page struct. Here is how it is held in various circumstances: + + Record allocation: Write lock + Record read: Read lock + Read LSN Read lock + Record write *READ LOCK* + Write LSN Write lock + + Any circumstance where these locks are held during an I/O operation + is a bug. + + $Id$ + +************************************************************************/ +#include "../page.h" +#include "../blobManager.h" +#include "slotted.h" +#include + +/* ------------------ STATIC FUNCTIONS. NONE OF THESE ACQUIRE LOCKS + ON THE MEMORY THAT IS PASSED INTO THEM -------------*/ + +static void __really_do_ralloc(Page * page, recordid rid) ; + +/** + +Move all of the records to the beginning of the page in order to +increase the available free space. + +The caller of this function must have a writelock on the page. +*/ +static void pageCompact(Page * page) { + + int i; + Page bufPage; + byte buffer[PAGE_SIZE]; + + int numSlots; + int meta_size; + + bufPage.id = -1; + bufPage.memAddr = buffer; + + /* Can't compact in place, slot numbers can come in different orders than + the physical space allocated to them. */ + + memset(buffer, -1, PAGE_SIZE); + + meta_size = (((int)page->memAddr) + PAGE_SIZE ) - (int)end_of_usable_space_ptr(page); + /* *slot_length_ptr(page, (*numslots_ptr(page))-1);*/ + + memcpy(buffer + PAGE_SIZE - meta_size, page->memAddr + PAGE_SIZE - meta_size, meta_size); + + pageInitialize(&bufPage); + + numSlots = *numslots_ptr(page); + for (i = 0; i < numSlots; i++) { + /* printf("i = %d\n", i); */ + if (isValidSlot(page, i)) { + /* printf("copying %d\n", i); + fflush(NULL); */ + /* DEBUG("Buffer offset: %d\n", freeSpace); */ + recordid rid; + + rid.page = -1; + rid.slot = i; + rid.size = *slot_length_ptr(page, i); + + __really_do_ralloc(&bufPage, rid); + + memcpy(record_ptr(&bufPage, rid.slot), record_ptr(page, rid.slot), rid.size); + + } else { + *slot_ptr(&bufPage, i) = INVALID_SLOT; + *slot_length_ptr(&bufPage, i) = *freelist_ptr(page); + *freelist_ptr(page) = i; + } + } + + /* Rebuild the freelist. */ + + /* *freelist_ptr(&bufPage) = 0; + for (i = 0; i < numSlots; i++) { + if (!isValidSlot(&bufPage, i)) { + *slot_length_ptr(&bufPage, i) = *freelist_ptr(&bufPage); + *freelist_ptr(&bufPage) = i; + break; + } + } + */ + + memcpy(page->memAddr, buffer, PAGE_SIZE); + +} + +void pageInitialize(Page * page) { + /* printf("Initializing page %d\n", page->id); + fflush(NULL); */ + memset(page->memAddr, 0, PAGE_SIZE); + *freespace_ptr(page) = 0; + *numslots_ptr(page) = 0; + *freelist_ptr(page) = INVALID_SLOT; +} + +int unlocked_freespace(Page * page) { + return (int)slot_length_ptr(page, *numslots_ptr(page)) - (int)(page->memAddr + *freespace_ptr(page)); +} + +/** + * freeSpace() assumes that the page is already loaded in memory. It takes + * as a parameter a Page, and returns an estimate of the amount of free space + * available to a new slot on this page. (This is the amount of unused space + * in the page, minus the size of a new slot entry.) This is either exact, + * or an underestimate. + * + * @todo is it ever safe to call freespace without a lock on the page? + * + */ +int freespace(Page * page) { + int ret; + readlock(page->rwlatch, 292); + ret = unlocked_freespace(page); + readunlock(page->rwlatch); + return ret; +} + + + +recordid pageRalloc(Page * page, int size) { + + writelock(page->rwlatch, 342); + + recordid rid; + + rid.page = page->id; + rid.slot = *numslots_ptr(page); + rid.size = size; + + /* + Reuse an old (invalid) slot entry. + + @todo This is terribly slow, but seems to be necessary, or + we will leak slot ids. Is there a better (non n^2) way? + + Perhaps we could use the empty slots to construct a linked + list of free pages. (The slot length could be the offset + of the next slot on the list, and we could use the standard + INVALID_SLOT value to distinguish between the types.) + + */ + /* Old way */ + + /* int i; + for (i = 0; i < numSlots; i++) { + if (!isValidSlot(page, i)) { + rid.slot = i; + break; + } + } */ + + + /* new way @todo leaks slot zero (until pageCompact is called)*/ + if(*freelist_ptr(page) != INVALID_SLOT) { + rid.slot = *freelist_ptr(page); + /* printf("Reusing old slot %d\n", rid.slot); */ + *freelist_ptr(page) = *slot_length_ptr(page, rid.slot); + *slot_length_ptr(page, rid.slot) = 0; + } else { + /* printf("Allocating new slot\n"); */ + } + fflush(NULL); + + __really_do_ralloc(page, rid); + + /* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */ + + writeunlock(page->rwlatch); + + return rid; +} + +static void __really_do_ralloc(Page * page, recordid rid) { + + int freeSpace; + + assert(rid.size > 0); + + if(unlocked_freespace(page) < rid.size) { + pageCompact(page); + + /* Make sure there's enough free space... */ + assert (unlocked_freespace(page) >= rid.size); + } + + freeSpace = *freespace_ptr(page); + + + if(*numslots_ptr(page) <= rid.slot) { + /* printf("Incrementing numSlots."); */ + *numslots_ptr(page) = rid.slot + 1; + } + + DEBUG("Num slots %d\trid.slot %d\n", *numslots_ptr(page), rid.slot); + + *freespace_ptr(page) = freeSpace + rid.size; + + *slot_ptr(page, rid.slot) = freeSpace; + /* assert(!*slot_length_ptr(page, rid.slot) || (-1 == *slot_length_ptr(page, rid.slot)));*/ + *slot_length_ptr(page, rid.slot) = rid.size; + +} + +/** Only used for recovery, to make sure that consistent RID's are created + * on log playback. */ +recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) { + + writelock(page->rwlatch, 376); + + if(*slot_length_ptr(page, rid.slot) == 0 /*|| *slot_length_ptr(page, rid.slot) == -1*/) { + + __really_do_ralloc(page, rid); + + } else { + + assert((rid.size == *slot_length_ptr(page, rid.slot)) || + (*slot_length_ptr(page, rid.slot) >= PAGE_SIZE)); + + } + + writeunlock(page->rwlatch); + + return rid; +} + + +void pageDeRalloc(Page * page, recordid rid) { + + readlock(page->rwlatch, 443); + + *slot_ptr(page, rid.slot) = INVALID_SLOT; + *slot_length_ptr(page, rid.slot) = *freelist_ptr(page); + *freelist_ptr(page) = rid.slot; + + unlock(page->rwlatch); +} + +/* + This should trust the rid (since the caller needs to + override the size in special circumstances) + + @todo If the rid size has been overridden, we should check to make + sure that this really is a special record. +*/ +void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) { + + int slot_length; + readlock(page->rwlatch, 519); + + assert(page->id == rid.page); + slot_length = *slot_length_ptr(page, rid.slot); + assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE)); + + if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) { + perror("memcpy"); + abort(); + } + + unlock(page->rwlatch); + +} + +void pageWriteRecord(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) { + int slot_length; + + writelock(page->rwlatch, 529); + + assert(rid.size < PAGE_SIZE); + assert(page->id == rid.page); + + slot_length = *slot_length_ptr(page, rid.slot); + assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE)); + + if(!memcpy(record_ptr(page, rid.slot), data, rid.size)) { + perror("memcpy"); + abort(); + } + + page->LSN = lsn; + /* *lsn_ptr(page) = lsn */ + pageWriteLSN(page); + unlock(page->rwlatch); + +} + + +/** @todo: Should the caller need to obtain the writelock when calling pageSetSlotType? */ +void pageSetSlotType(Page * p, int slot, int type) { + assert(type > PAGE_SIZE); + writelock(p->rwlatch, 686); + *slot_length_ptr(p, slot) = type; + unlock(p->rwlatch); +} + +int pageGetSlotType(Page * p, int slot, int type) { + int ret; + readlock(p->rwlatch, 693); + ret = *slot_length_ptr(p, slot); + unlock(p->rwlatch); + + /* getSlotType does the locking for us. */ + return ret > PAGE_SIZE ? ret : NORMAL_SLOT; +} + + +/* +typedef struct { + int page; + int slot; + / ** If pageptr is not null, then it is used by the iterator methods. + Otherwise, they re-load the pages and obtain short latches for + each call. * / + Page * pageptr; +} page_iterator_t; + + + +void pageIteratorInit(recordid rid, page_iterator_t * pit, Page * p) { + pit->page = rid.page; + pit->slot = rid.slot; + pit->pageptr = p; + assert((!p) || (p->id == pit->page)); +} + +int nextSlot(page_iterator_t * pit, recordid * rid) { + Page * p; + int numSlots; + int done = 0; + int ret; + if(pit->pageptr) { + p = pit->pageptr; + } else { + p = loadPage(pit->page); + } + + numSlots = readNumSlots(p->memAddr); + while(pit->slot < numSlots && !done) { + + if(isValidSlot(p->memAddr, pit->slot)) { + done = 1; + } else { + pit->slot ++; + } + + } + if(!done) { + ret = 0; + } else { + ret = 1; + rid->page = pit->page; + rid->slot = pit->slot; + rid->size = getSlotLength(p->memAddr, rid->slot); + if(rid->size >= PAGE_SIZE) { + + if(rid->size == BLOB_SLOT) { + blob_record_t br; + pageReadRecord(-1, p, *rid, (byte*)&br); + rid->size = br.size; + } + } + } + + if(!pit->pageptr) { + releasePage(p); + } + + return ret; + +} +*/ + diff --git a/src/lladd/page/slotted.h b/src/lladd/page/slotted.h new file mode 100644 index 0000000..6b61774 --- /dev/null +++ b/src/lladd/page/slotted.h @@ -0,0 +1,21 @@ + +void pageWriteRecord(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data); +void pageReadRecord(int xid, Page * page, recordid rid, byte *buff); + +/** + * assumes that the page is already loaded in memory. It takes as a + * parameter a Page, and returns an estimate of the amount of free space on this + * page. This is either exact, or an underestimate. + * @todo how should this be handled? */ +int freespace(Page * p); + +void pageInitialize(Page * p); + +#define freespace_ptr(page) shorts_from_end((page), 1) +#define numslots_ptr(page) shorts_from_end((page), 2) +#define freelist_ptr(page) shorts_from_end((page), 3) +#define slot_ptr(page, n) shorts_from_end((page), (2*(n))+4) +#define slot_length_ptr(page, n) shorts_from_end((page), (2*(n))+5) +#define record_ptr(page, n) bytes_from_start((page), *slot_ptr((page), (n))) +#define isValidSlot(page, n) ((*slot_ptr((page), (n)) == INVALID_SLOT) ? 0 : 1) + diff --git a/src/lladd/pageFile.c b/src/lladd/pageFile.c index 989b7d3..89bf8fb 100644 --- a/src/lladd/pageFile.c +++ b/src/lladd/pageFile.c @@ -4,6 +4,7 @@ */ #include "page.h" +#include "page/slotted.h" #include @@ -20,48 +21,19 @@ #include static int stable = -1; -/** Defined in bufferManager.c */ -extern pthread_mutex_t add_pending_mutex; static pthread_mutex_t stable_mutex; static long myLseek(int f, long offset, int whence); static long myLseekNoLock(int f, long offset, int whence); - void pageRead(Page *ret) { - /* long fileSize; */ long pageoffset; long offset; - /** @todo pageRead() is using fseek to calculate the file size on each read, which is inefficient. */ pageoffset = ret->id * PAGE_SIZE; - /* flockfile(stable); */ pthread_mutex_lock(&stable_mutex); - /* fileSize = myLseekNoLock(stable, 0, SEEK_END); */ - - /* DEBUG("Reading page %d\n", ret->id); */ - - /* if(!ret->memAddr) { - ret->memAddr = malloc(PAGE_SIZE); - } - if(!ret->memAddr) { - perror("pageFile.c"); - fflush(NULL); - } - assert(ret->memAddr); */ - - /** @todo was manual extension of the storefile really necessary? */ - - /* if ((ret->id)*PAGE_SIZE >= fileSize) { - myLseekNoLock(stable, (ret->id - 1) * PAGE_SIZE -1, SEEK_SET); - if(1 != fwrite("", 1, 1, stable)) { - if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); } - if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); } - } - }*/ - offset = myLseekNoLock(stable, pageoffset, SEEK_SET); assert(offset == pageoffset); @@ -73,7 +45,7 @@ void pageRead(Page *ret) { offset = myLseekNoLock(stable, pageoffset, SEEK_SET); assert(offset == pageoffset); if(fileSize <= pageoffset) { - memset(ret->memAddr, 0, PAGE_SIZE); + pageInitialize(ret); write(stable, ret->memAddr, PAGE_SIZE); } } else if(read_size == -1) { @@ -129,20 +101,6 @@ void pageWrite(Page * ret) { void openPageFile() { DEBUG("Opening storefile.\n"); - /* if( ! (stable = fopen(STORE_FILE, "r+"))) { / * file may not exist * / - byte* zero = calloc(1, PAGE_SIZE); - - if(!(stable = fopen(STORE_FILE, "w+"))) { perror("Couldn't open or create store file"); abort(); } - - / * Write out one page worth of zeros to get started. * / - - / * if(1 != fwrite(zero, PAGE_SIZE, 1, stable)) { assert (0); } * / - - free(zero); - } - - DEBUG("storefile opened.\n"); - */ stable = open (STORE_FILE, O_CREAT | O_RDWR | O_DIRECT, S_IRWXU | S_IRWXG | S_IRWXO); diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 8dff2b4..3e50809 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -6,6 +6,7 @@ #include #include "logger/logWriter.h" #include +#include "page.h" #include #include diff --git a/test/lladd/check_page.c b/test/lladd/check_page.c index f7c6cdf..6bf5723 100644 --- a/test/lladd/check_page.c +++ b/test/lladd/check_page.c @@ -45,6 +45,7 @@ terms specified in this license. #include #include "../../src/lladd/page.h" +#include "../../src/lladd/page/slotted.h" #include #include @@ -54,7 +55,6 @@ terms specified in this license. #include "../check_includes.h" - #define LOG_NAME "check_page.log" #define RECORD_SIZE sizeof(int) @@ -65,9 +65,58 @@ terms specified in this license. Page * loadPage(int pageid); pthread_mutex_t random_mutex; - static lsn_t lsn; static pthread_mutex_t lsn_mutex; + +static void * multiple_simultaneous_pages ( void * arg_ptr) { + Page * p = (Page*)arg_ptr; + int i; + lsn_t this_lsn; + int j; + int first = 1; + int k; + recordid rid[100]; + + for(i = 0; i < 10000; i++) { + pthread_mutex_lock(&lsn_mutex); + this_lsn = lsn; + lsn++; + pthread_mutex_unlock(&lsn_mutex); + + if(! first ) { + /* addPendingEvent(p); */ + /*pageReadRecord(1, p, rid, (byte*)&j);*/ + for(k = 0; k < 100; k++) { + readRecord(1, p, rid[k], (byte*)&j); + + assert((j + 1) == i + k); + pageDeRalloc(p, rid[k]); + sched_yield(); + } + } + + first = 0; + + for(k = 0; k < 100; k++) { + + rid[k] = pageRalloc(p, sizeof(short)); + i +=k; + /* printf("Slot = %d\n", rid[k].slot); */ + writeRecord(1, p, lsn, rid[k], (byte*)&i); + i -=k; + sched_yield(); + + + + } + + assert(pageReadLSN(p) <= lsn); + } + + return NULL; +} + + static void* worker_thread(void * arg_ptr) { Page * p = (Page*)arg_ptr; int i; @@ -83,7 +132,8 @@ static void* worker_thread(void * arg_ptr) { if(! first ) { /* addPendingEvent(p); */ - pageReadRecord(1, p, rid, (byte*)&j); + /*pageReadRecord(1, p, rid, (byte*)&j);*/ + readRecord(1, p, rid, (byte*)&j); assert((j + 1) == i); pageDeRalloc(p, rid); sched_yield(); @@ -92,8 +142,7 @@ static void* worker_thread(void * arg_ptr) { first = 0; rid = pageRalloc(p, sizeof(int)); - /* addPendingEvent(p); */ - pageWriteRecord(1, p, rid, lsn, (byte*)&i); + writeRecord(1, p, lsn, rid, (byte*)&i); sched_yield(); assert(pageReadLSN(p) <= lsn); @@ -122,7 +171,7 @@ START_TEST(pageNoThreadTest) pthread_mutex_init(&lsn_mutex, NULL); - + Tinit(); p = loadPage(0); @@ -136,6 +185,95 @@ START_TEST(pageNoThreadTest) } END_TEST +/** + @test +*/ + +START_TEST(pageCheckMacros) { + Page p; + byte buffer[PAGE_SIZE]; + memset(buffer, -1, PAGE_SIZE); + + p.memAddr = buffer; + + lsn_t lsn = 5; + + *lsn_ptr(&p) = lsn; + *page_type_ptr(&p) = 10; + *freespace_ptr(&p) = 15; + *numslots_ptr(&p) = 20; + *slot_ptr(&p, 0) = 30; + *slot_ptr(&p, 1) = 35; + *slot_ptr(&p, 40) = 40; + *slot_length_ptr(&p, 0) = 31; + *slot_length_ptr(&p, 1) = 36; + *slot_length_ptr(&p, 40) = 41; + + *bytes_from_start(&p, 0) = 50; + *bytes_from_start(&p, 1) = 51; + *bytes_from_start(&p, 2) = 52; + *bytes_from_start(&p, 3) = 53; + *bytes_from_start(&p, 4) = 54; + + assert(*lsn_ptr(&p) == lsn); + assert(*page_type_ptr(&p) == 10); + assert(*end_of_usable_space_ptr(&p) == 10); + assert(*freespace_ptr(&p) == 15); + assert(*numslots_ptr(&p) == 20); + assert(*slot_ptr(&p, 0) == 30); + assert(*slot_ptr(&p, 1) == 35); + assert(*slot_ptr(&p, 40) == 40); + assert(*slot_length_ptr(&p, 0) == 31); + assert(*slot_length_ptr(&p, 1) == 36); + assert(*slot_length_ptr(&p, 40) == 41); + + assert(*bytes_from_start(&p, 0) == 50); + assert(*bytes_from_start(&p, 1) == 51); + assert(*bytes_from_start(&p, 2) == 52); + assert(*bytes_from_start(&p, 3) == 53); + assert(*bytes_from_start(&p, 4) == 54); + + assert(isValidSlot(&p, 0)); + assert(isValidSlot(&p, 1)); + assert(isValidSlot(&p, 40)); + + /* invalidateSlot(&p, 0); + invalidateSlot(&p, 1); + invalidateSlot(&p, 40); + + assert(!isValidSlot(&p, 0)); + assert(!isValidSlot(&p, 1)); + assert(!isValidSlot(&p, 40));*/ + + +} END_TEST +/** + @test + + Page test that allocates multiple records + +*/ +START_TEST(pageNoThreadMultPageTest) +{ + Page * p; + /* p->id = 0;*/ + + + pthread_mutex_init(&lsn_mutex, NULL); + + Tinit(); + + p = loadPage(1); + + multiple_simultaneous_pages(p); + + unlock(p->loadlatch); + + Tdeinit(); + +} +END_TEST + /** Check the page implementation in the multi-threaded case. */ @@ -153,7 +291,7 @@ START_TEST(pageThreadTest) { Tinit(); fail_unless(1, NULL); - Page * p = loadPage(1); + Page * p = loadPage(2); fail_unless(1, NULL); for(i = 0; i < THREAD_COUNT; i++) { @@ -179,8 +317,13 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ + tcase_add_test(tc, pageCheckMacros); + + + + tcase_add_test(tc, pageNoThreadMultPageTest); tcase_add_test(tc, pageNoThreadTest); - tcase_add_test(tc, pageThreadTest); + tcase_add_test(tc, pageThreadTest); /* --------------------------------------------- */ diff --git a/test/lladd/check_transactional2.c b/test/lladd/check_transactional2.c index 299fc6c..3e6be64 100644 --- a/test/lladd/check_transactional2.c +++ b/test/lladd/check_transactional2.c @@ -47,8 +47,8 @@ terms specified in this license. #include #include "../check_includes.h" #define LOG_NAME "check_transactional2.log" -#define THREAD_COUNT 5 -#define RECORDS_PER_THREAD 5000 +#define THREAD_COUNT 25 +#define RECORDS_PER_THREAD 1000 void arraySet(int * array, int val) { int i; @@ -444,11 +444,11 @@ Suite * check_suite(void) { TCase *tc = tcase_create("transactional_smokeTest"); /* Sub tests are added, one per line, here */ - /* tcase_add_test(tc, transactional_smokeTest); - tcase_add_test(tc, transactional_blobSmokeTest); */ - /* tcase_add_test(tc, transactional_nothreads_commit); - tcase_add_test(tc, transactional_threads_commit); */ - /* tcase_add_test(tc, transactional_nothreads_abort); */ + tcase_add_test(tc, transactional_smokeTest); + tcase_add_test(tc, transactional_blobSmokeTest); + tcase_add_test(tc, transactional_nothreads_commit); + tcase_add_test(tc, transactional_threads_commit); + tcase_add_test(tc, transactional_nothreads_abort); tcase_add_test(tc, transactional_threads_abort); tcase_add_test(tc, transactional_blobs_nothreads_abort); /* tcase_add_test(tc, transactional_blobs_threads_abort); */