From 5064e3fac2ef59948f70785de900c1ed6448c059 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Wed, 6 Oct 2004 06:08:09 +0000 Subject: [PATCH] Implemented pages that store fixed sized records efficiently, and a java-style ArrayList data structure that efficiently supports (relatively) clustered, O(1) access time expandable arrays. (This will be used for the hash implementation...) --- ChangeLog | 23 ++- Makefile.am | 4 +- lladd/constants.h | 10 +- lladd/operations.h | 13 +- lladd/operations/arrayList.h | 29 +++ src/lladd/Makefile.am | 3 +- src/lladd/operations/alloc.c | 34 +-- src/lladd/operations/arrayList.c | 285 ++++++++++++++++++++++++++ src/lladd/operations/pageOperations.c | 18 -- src/lladd/page.c | 16 +- src/lladd/page.h | 3 +- src/lladd/page/fixed.c | 96 +++++++++ src/lladd/page/fixed.h | 20 ++ src/lladd/page/slotted.c | 42 ++-- src/lladd/page/slotted.h | 2 +- src/lladd/pageFile.c | 2 +- src/lladd/transactional2.c | 20 +- test/lladd/check_bufferManager.c | 17 +- test/lladd/check_operations.c | 86 +++++++- test/lladd/check_page.c | 58 +++++- 20 files changed, 705 insertions(+), 76 deletions(-) create mode 100644 lladd/operations/arrayList.h create mode 100644 src/lladd/operations/arrayList.c create mode 100644 src/lladd/page/fixed.c create mode 100644 src/lladd/page/fixed.h diff --git a/ChangeLog b/ChangeLog index 38536a0..ba55a86 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,25 @@ -2004-07-20 sears +2004-10-02 sears@euglenoid + + * alloc.h, instantSet.h, noop.h, prepare.h: + Added test cases for Tprepare(), implemented some redo-only operations, and started to clean up record allocation/deallocation. + + Also, numerous bugfixes. + +2004-08-21 sears@euglenoid + + * pageOperations.h: + Implemented a freespace manager that should safely allocate space, even in the face of crashes, and can reclaim unused space (unless an application opens more than one simultaneous transaction that performs allocations) + + Fixed some blob bugs (by adding extra fdatasync() calls). + + Began factoring out the page management code so that it is an extenstion, and a less integral part of lladd. + +2004-08-03 sears@euglenoid + + * lladdhash.h, pageOperations.h: + Added (untested) support for whole-page operations, lladdhash now works. + +rr 2004-07-20 sears * bufferManager.c, common.c, operations/alloc.c, page.c, page.h, pageCache.c, pageFile.c, transactional2.c: Continuing work on multi-threading. r/w access to buffer manager getting close, but still buggy. diff --git a/Makefile.am b/Makefile.am index 83e9517..2b41418 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,7 +2,7 @@ EXTRA_DIST = reconf SUBDIRS = src test utilities AM_CFLAGS = -g -Wall -pedantic -docs: +docs: coverage doxygen doc/Doxyfile-api doxygen doc/Doxyfile-developers @@ -18,4 +18,4 @@ coverage: precoverage check genhtml -o doc/coverage tmp.info rm tmp.info -.PHONY: docs precoverage coverage \ No newline at end of file +.PHONY: docs precoverage coverage diff --git a/lladd/constants.h b/lladd/constants.h index 1bab788..d309a2b 100644 --- a/lladd/constants.h +++ b/lladd/constants.h @@ -67,7 +67,7 @@ terms specified in this license. /* @define error codes */ #define OUT_OF_MEM 1 -#define FILE_OPEN_ERROR 2 +#define FILE_OPRN_ERROR 2 #define FILE_READ_ERROR 3 #define FILE_WRITE_ERROR 4 #define FILE_WRITE_OPEN_ERROR 5 @@ -75,10 +75,11 @@ terms specified in this license. #define PAGE_SIZE 4096 -/*#define MAX_BUFFER_SIZE 100003 */ +/* #define MAX_BUFFER_SIZE 100003 */ /*#define MAX_BUFFER_SIZE 10007*/ /*#define MAX_BUFFER_SIZE 5003*/ -#define MAX_BUFFER_SIZE 71 +#define MAX_BUFFER_SIZE 2003 +/* #define MAX_BUFFER_SIZE 71 */ /*#define MAX_BUFFER_SIZE 7 */ /*#define BUFFER_ASOOCIATIVE 2 */ @@ -109,6 +110,9 @@ terms specified in this license. #define OPERATION_UNALLOC_FREED 17 #define OPERATION_NOOP 18 #define OPERATION_INSTANT_SET 19 +#define OPERATION_ARRAY_LIST_ALLOC 20 +#define OPERATION_INITIALIZE_FIXED_PAGE 21 +#define OPERATION_UNINITIALIZE_PAGE 22 /* number above should be less than number below */ #define MAX_OPERATIONS 40 diff --git a/lladd/operations.h b/lladd/operations.h index 1c32a8a..09296bf 100644 --- a/lladd/operations.h +++ b/lladd/operations.h @@ -116,9 +116,17 @@ typedef struct { The other option: - Get rid of operations that span records entirely by - splitting complex logical operations into simpler one. + splitting complex logical operations into simpler ones. - We chose the second option for now. + We chose the second option for now. This implies that the + entries must be written to the log in an order, that if + repeated, guarantees that the structure will be in a logically + consistent state after the REDO phase, regardless of what + prefix of the log actually makes it to disk. Note that + pinning pages before the log entry hits disk is inadequate, in + general, since other transactions could read dirty information + from the pinned pages, producsing nonsensical log entries that + preceed the current transaction's log entry. */ /** @@ -139,6 +147,7 @@ typedef struct { #include "operations/pageOperations.h" #include "operations/noop.h" #include "operations/instantSet.h" +#include "operations/arrayList.h" extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */ diff --git a/lladd/operations/arrayList.h b/lladd/operations/arrayList.h new file mode 100644 index 0000000..5bfbac3 --- /dev/null +++ b/lladd/operations/arrayList.h @@ -0,0 +1,29 @@ +#include + +#ifndef __ARRAY_LIST_H +#define __ARRAY_LIST_H + +/** + @file + + @ingroup OPERATIONS + + $Id$ +*/ + +recordid TarrayListAlloc(int xid, int count, int multiplier, int size); + +Operation getArrayListAlloc(); +Operation getInitFixed(); +Operation getUnInitPage(); + +/** Initialized a fixed page with the maximum possible number of slots + allocated. The rid.size field is used to determine the size of + record that the slots can hold. */ +#define TinitFixed(xid, rid) Tupdate(xid, rid, NULL, OPERATION_INITIALIZE_FIXED_PAGE) +/** Un-initializes a page. */ +#define TunInitPage(xid, rid) Tupdate(xid, rid, NULL, OPERATION_UNINITIALIZE_PAGE) + +recordid dereferenceArrayListRid(Page * p, int offset); +int TarrayListExtend(int xid, recordid rid, int slots); +#endif diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index 05de0d5..a9bbb7b 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -9,6 +9,7 @@ liblladd_a_SOURCES=common.c stats.c io.c bufferManager.c linkedlist.c operations operations/pageOperations.c page/indirect.c operations/decrement.c \ operations/increment.c operations/prepare.c operations/set.c \ operations/alloc.c operations/noop.c operations/instantSet.c \ - page/slotted.c operations/lladdhash.c page/header.c + page/slotted.c operations/lladdhash.c page/header.c page/fixed.c \ + operations/arrayList.c AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/src/lladd/operations/alloc.c b/src/lladd/operations/alloc.c index 8c005b0..130d1c5 100644 --- a/src/lladd/operations/alloc.c +++ b/src/lladd/operations/alloc.c @@ -50,34 +50,30 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) return 0; } -/** @todo Currently, we just leak store space on dealloc. */ +/** @todo Currently, we leak empty pages on dealloc. */ static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { - /* Page * loadedPage = loadPage(rid.page); */ - /** Has no effect during normal operation, other than updating the LSN. */ - /* slottedPostRalloc(p, lsn, rid); */ - - /* Page * loadedPage = loadPage(rid.page); */ assert(rid.page == p->id); slottedDeRalloc(p, lsn, rid); - /* releasePage(loadedPage); */ - return 0; } static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) { - /* operate(xid, p, lsn, rid, dat); */ if(rid.size >= BLOB_THRESHOLD_SIZE) { rid.size = BLOB_REC_SIZE; /* Don't reuse blob space yet... */ } slottedPostRalloc(p, lsn, rid); + /** @todo dat should be the pointer to the space in the blob store. */ writeRecord(xid, p, lsn, rid, dat); return 0; } +static pthread_mutex_t talloc_mutex; + Operation getAlloc() { + pthread_mutex_init(&talloc_mutex, NULL); Operation o = { OPERATION_ALLOC, /* ID */ 0, @@ -111,14 +107,27 @@ Operation getRealloc() { recordid Talloc(int xid, long size) { recordid rid; - + Page * p = NULL; if(size >= BLOB_THRESHOLD_SIZE) { rid = preAllocBlob(xid, size); } else { - rid = slottedPreRalloc(xid, size); + pthread_mutex_lock(&talloc_mutex); + rid = slottedPreRalloc(xid, size, &p); + assert(p != NULL); } Tupdate(xid,rid, NULL, OPERATION_ALLOC); + + if(p != NULL) { + /* release the page that preAllocBlob pinned for us. */ + + /* @todo alloc.c pins multiple pages -> Will deadlock with small buffer sizes.. */ + releasePage(p); + pthread_mutex_unlock(&talloc_mutex); + + /*pthread_mutex_unlock(&talloc_mutex); */ + + } return rid; @@ -128,7 +137,8 @@ void Tdealloc(int xid, recordid rid) { void * preimage = malloc(rid.size); Page * p = loadPage(rid.page); readRecord(xid, p, rid, preimage); - releasePage(p); /** @todo race in Tdealloc; do we care? */ + /** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */ Tupdate(xid, rid, preimage, OPERATION_DEALLOC); + releasePage(p); free(preimage); } diff --git a/src/lladd/operations/arrayList.c b/src/lladd/operations/arrayList.c new file mode 100644 index 0000000..30c225c --- /dev/null +++ b/src/lladd/operations/arrayList.c @@ -0,0 +1,285 @@ +#include +#include + +#include "../page/fixed.h" +#include +#include +#include +#include + +#include +#include + +/** + Implement resizable arrays, just like java's ArrayList class. + + Essentially, the base page contains a fixed size array of rids + pointing at contiguous blocks of pages. Each block is twice as + big as the previous block. + + The base block is of type FIXED_PAGE, of int's. The first few slots are reserved: + +*/ + +typedef struct { + int firstPage; + int initialSize; + int multiplier; + int size; + int maxOffset; +} TarrayListParameters; + + +static TarrayListParameters pageToTLP(Page * p); +static int getBlockContainingOffset(TarrayListParameters tlp, int offset, int * firstSlotInBlock); + +/*----------------------------------------------------------------------------*/ + +recordid TarrayListAlloc(int xid, int count, int multiplier, int size) { + + int firstPage = TpageAllocMany(xid, count+1); + + TarrayListParameters tlp; + + tlp.firstPage = firstPage; + tlp.initialSize = count; + tlp.multiplier = multiplier; + tlp.size = size; + tlp.maxOffset = 0; + + recordid rid; + + rid.page = firstPage; + rid.size = 0; + rid.slot = 0; + + Tupdate(xid, rid, &tlp, OPERATION_ARRAY_LIST_ALLOC); + + return rid; +} + + +static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { + + const TarrayListParameters * tlp = dat; + + int firstPage = tlp->firstPage; + int count = tlp->initialSize; + int multiplier = tlp->multiplier; + int size = tlp->size; + + /* Page * p = loadPage(firstPage); */ + + fixedPageInitialize(p, sizeof(int), recordsPerPage(sizeof(int))); + + /* recordid countRid = fixedRawRalloc(p); + recordid multiplierRid = fixedRawRalloc(p); + recordid slotSizeRid = fixedRawRalloc(p); */ +#define MAX_OFFSET_POSITION 3 + /* recordid maxOffset = fixedRawRalloc(p); */ +#define FIRST_DATA_PAGE_OFFSET 4 + /* recordid firstDataPageRid = fixedRawRalloc(p); */ + + recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid; + countRid.page = multiplierRid.page = slotSizeRid.page = maxOffset.page = firstDataPageRid.page = p->id; + countRid.size = multiplierRid.size = slotSizeRid.size = maxOffset.size = firstDataPageRid.size = sizeof(int); + + countRid.slot = 0; + multiplierRid.slot = 1; + slotSizeRid.slot = 2; + maxOffset.slot = 3; + firstDataPageRid.slot = 4; + + int firstDataPage = firstPage + 1; + /* Allocing this page -> implicit lock. */ + fixedWriteUnlocked(p, countRid, (byte*)&count); + fixedWriteUnlocked(p, multiplierRid, (byte*)&multiplier); + fixedWriteUnlocked(p, firstDataPageRid, (byte*)&firstDataPage); + fixedWriteUnlocked(p, slotSizeRid, (byte*)&size); + int minusOne = -1; + fixedWriteUnlocked(p, maxOffset, (byte*)&minusOne); + + + /* Write lsn... */ + + *page_type_ptr(p) = ARRAY_LIST_PAGE; + + pageWriteLSN(p, lsn); + + recordid ret; + ret.page = firstPage; + ret.slot = 0; /* slot = # of slots in array... */ + ret.size = size; + + return 0; +} + +Operation getArrayListAlloc() { + Operation o = { + OPERATION_ARRAY_LIST_ALLOC, /* ID */ + sizeof(TarrayListParameters), + OPERATION_NOOP, /* Since TpageAllocMany will be undone, the page we touch will be nuked anyway, so set this to NO-OP. */ + &operateAlloc + }; + return o; +} +/*----------------------------------------------------------------------------*/ + +int TarrayListExtend(int xid, recordid rid, int slots) { + Page * p = loadPage(rid.page); + TarrayListParameters tlp = pageToTLP(p); + releasePage(p); + + int lastCurrentBlock; + if(tlp.maxOffset == -1) { + lastCurrentBlock = -1; + } else{ + lastCurrentBlock = getBlockContainingOffset(tlp, tlp.maxOffset, NULL); + } + int lastNewBlock = getBlockContainingOffset(tlp, tlp.maxOffset+slots, NULL); + + DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock); + + recordid tmp; /* recordid of slot in base page that holds new block. */ + tmp.page = rid.page; + tmp.size = sizeof(int); + + recordid tmp2; /* recordid of newly created pages. */ + tmp2.slot = 0; + tmp2.size = tlp.size; + for(int i = lastCurrentBlock+1; i <= lastNewBlock; i++) { + /* Alloc block i */ + int blockSize = tlp.initialSize * powl(tlp.multiplier, i); + int newFirstPage = TpageAllocMany(xid, blockSize); + DEBUG("block %d\n", i); + for(int j = 0; j < blockSize; j++) { + DEBUG("page %d (%d)\n", j, j + newFirstPage); + tmp2.page = j + newFirstPage; + Tupdate(xid, tmp2, NULL, OPERATION_INITIALIZE_FIXED_PAGE); + } + + tmp.slot = i + FIRST_DATA_PAGE_OFFSET; + /** @todo what does this do to recovery?? */ + /** @todo locking for arrayList... */ + *page_type_ptr(p) = FIXED_PAGE; + Tset(xid, tmp, &newFirstPage); + *page_type_ptr(p) = ARRAY_LIST_PAGE; + + DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage); + } + + tmp.slot = MAX_OFFSET_POSITION; + + int newMaxOffset = tlp.maxOffset+slots; + *page_type_ptr(p) = FIXED_PAGE; + Tset(xid, tmp, &newMaxOffset); + *page_type_ptr(p) = ARRAY_LIST_PAGE; + + return 0; + +} + + +static int operateInitFixed(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { + + fixedPageInitialize(p, rid.size, recordsPerPage(rid.size)); + + pageWriteLSN(p, lsn); + return 0; +} + +static int operateUnInitPage(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { + *page_type_ptr(p) = UNINITIALIZED_PAGE; + pageWriteLSN(p, lsn); + return 0; +} + +Operation getInitFixed() { + Operation o = { + OPERATION_INITIALIZE_FIXED_PAGE, + 0, /* The necessary parameters are hidden in the rid */ + OPERATION_UNINITIALIZE_PAGE, + &operateInitFixed + }; + return o; +} +Operation getUnInitPage() { + Operation o = { + OPERATION_UNINITIALIZE_PAGE, + PAGE_SIZE, + NO_INVERSE_WHOLE_PAGE, + &operateUnInitPage + }; + return o; +} + +/*----------------------------------------------------------------------------*/ + + +recordid dereferenceArrayListRid(Page * p, int offset) { + + TarrayListParameters tlp = pageToTLP(p); + + int rec_per_page = recordsPerPage((size_t)tlp.size); + int lastHigh = 0; + int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */ + + assert(tlp.maxOffset >= offset); + + pageRidSlot = getBlockContainingOffset(tlp, offset, &lastHigh); + + int dataSlot = offset - lastHigh; /* The offset in the block of interest of the slot we want. */ + int blockPage = dataSlot / rec_per_page; /* The page in the block of interest that contains the slot we want */ + int blockSlot = dataSlot - blockPage * rec_per_page; + + /* recordid tmp; + tmp.page = tlp.firstPage; + tmp.size = sizeof(int); + tmp.slot = pageRidSlot + FIRST_DATA_PAGE_OFFSET; */ + + int thePage; + + assert(pageRidSlot + FIRST_DATA_PAGE_OFFSET < fixedPageCount(p)); + /* fixedReadUnlocked(p, tmp, (byte*)&thePage); *//* reading immutable record.. */ + thePage = *(int*)fixed_record_ptr(p, pageRidSlot + FIRST_DATA_PAGE_OFFSET); + + recordid rid; + rid.page = thePage + blockPage; + rid.slot = blockSlot; + rid.size = tlp.size; + + return rid; + +} +static int getBlockContainingOffset(TarrayListParameters tlp, int offset, int * firstSlotInBlock) { + int rec_per_page = recordsPerPage((size_t)tlp.size); + long thisHigh = rec_per_page * tlp.initialSize; + int lastHigh = 0; + int pageRidSlot = 0; + int currentPageLength = tlp.initialSize; + + while(((long)offset) >= thisHigh) { + pageRidSlot ++; + lastHigh = thisHigh; + currentPageLength *= tlp.multiplier; + thisHigh += rec_per_page * currentPageLength; + } + if(firstSlotInBlock) { + *firstSlotInBlock = lastHigh; + } + return pageRidSlot; +} + +static TarrayListParameters pageToTLP(Page * p) { + + TarrayListParameters tlp; + tlp.firstPage = p->id; + tlp.initialSize = *(int*)fixed_record_ptr(p, 0); + tlp.multiplier = *(int*)fixed_record_ptr(p, 1); + tlp.size = *(int*)fixed_record_ptr(p, 2); + tlp.maxOffset = *(int*)fixed_record_ptr(p, 3); + + return tlp; +} + + diff --git a/src/lladd/operations/pageOperations.c b/src/lladd/operations/pageOperations.c index 0da6993..99e9187 100644 --- a/src/lladd/operations/pageOperations.c +++ b/src/lladd/operations/pageOperations.c @@ -8,10 +8,6 @@ #include "../page/header.h" #include "../pageFile.h" - - - - static int freelist; static int freepage; static pthread_mutex_t pageAllocMutex; @@ -293,11 +289,6 @@ int TpageAlloc(int xid /*, int type */) { return newpage; } -/** Allocs an extent of pages. @todo CONCURRENCY BUG TpageAllocMany - can not be concurrent until ralloc uses TpageAlloc to allocate new - records. (And. concurrency for TpageAllocMany hasn't been - implemented yet... -*/ int TpageAllocMany(int xid, int count /*, int type*/) { /* int firstPage = -1; int lastPage = -1; */ @@ -320,15 +311,6 @@ int TpageAllocMany(int xid, int count /*, int type*/) { rid.page = newpage; - /* for(int i = 0 ; i < count; i++) { - int thisPage = TpageAlloc(xid, type); - if(lastPage == -1) { - firstPage = lastPage = thisPage; - } else { - assert((lastPage +1) == thisPage); - lastPage = thisPage; - } - } */ pthread_mutex_unlock(&pageAllocMutex); return newpage; } diff --git a/src/lladd/page.c b/src/lladd/page.c index c574a8f..3780a6f 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -86,6 +86,7 @@ terms specified in this license. #include "pageFile.h" #include "page/slotted.h" +#include "page/fixed.h" /* TODO: Combine with buffer size... */ static int nextPage = 0; @@ -248,8 +249,12 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { if(rid.size > BLOB_THRESHOLD_SIZE) { writeBlob(xid, p, lsn, rid, dat); - } else { + } else if(*page_type_ptr(p) == SLOTTED_PAGE) { slottedWrite(xid, p, lsn, rid, dat); + } else if(*page_type_ptr(p) == FIXED_PAGE) { + fixedWrite(p, rid, dat); + } else { + abort(); } assert( (p->id == rid.page) && (p->memAddr != NULL) ); @@ -261,10 +266,17 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { void readRecord(int xid, Page * p, recordid rid, void *buf) { assert(rid.page == p->id); + + int page_type = *page_type_ptr(p); + if(rid.size > BLOB_THRESHOLD_SIZE) { readBlob(xid, p, rid, buf); - } else { + } else if(page_type == SLOTTED_PAGE) { slottedRead(xid, p, rid, buf); + } else if(page_type == FIXED_PAGE) { + fixedRead(p, rid, buf); + } else { + abort(); } assert(rid.page == p->id); } diff --git a/src/lladd/page.h b/src/lladd/page.h index 8912cfc..1f2168d 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -98,7 +98,8 @@ BEGIN_C_DECLS #define INDIRECT_PAGE 2 #define LLADD_HEADER_PAGE 3 #define LLADD_FREE_PAGE 4 - +#define FIXED_PAGE 5 +#define ARRAY_LIST_PAGE 6 #define lsn_ptr(page) (((lsn_t *)(&((page)->memAddr[PAGE_SIZE])))-1) #define page_type_ptr(page) (((int*)lsn_ptr((page)))-1) #define end_of_usable_space_ptr(page) page_type_ptr((page)) diff --git a/src/lladd/page/fixed.c b/src/lladd/page/fixed.c new file mode 100644 index 0000000..f821405 --- /dev/null +++ b/src/lladd/page/fixed.c @@ -0,0 +1,96 @@ +#include "../page.h" + +#include "fixed.h" + +#include + + +int recordsPerPage(size_t size) { + return (USABLE_SIZE_OF_PAGE - 2*sizeof(short)) / size; +} + +void fixedPageInitialize(Page * page, size_t size, int count) { + assert(*page_type_ptr(page) == UNINITIALIZED_PAGE); + *page_type_ptr(page) = FIXED_PAGE; + *recordsize_ptr(page) = size; + assert(count <= recordsPerPage(size)); + *recordcount_ptr(page)= count; +} + +short fixedPageCount(Page * page) { + assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); + return *recordcount_ptr(page); +} + +short fixedPageRecordSize(Page * page) { + assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); + return *recordsize_ptr(page); +} + +recordid fixedRawRallocMany(Page * page, int count) { + + assert(*page_type_ptr(page) == FIXED_PAGE); + recordid rid; + + writelock(page->rwlatch, 33); + if(*recordcount_ptr(page) + count <= recordsPerPage(*recordsize_ptr(page))) { + rid.page = page->id; + rid.slot = *recordcount_ptr(page); + rid.size = *recordsize_ptr(page); + *recordcount_ptr(page)+=count; + } else { + rid.page = -1; + rid.slot = -1; + rid.size = -1; + } + unlock(page->rwlatch); + + return rid; +} + +recordid fixedRawRalloc(Page *page) { + assert(*page_type_ptr(page) == FIXED_PAGE); + return fixedRawRallocMany(page, 1); +} + +static void checkRid(Page * page, recordid rid) { + assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); + assert(page->id == rid.page); + assert(*recordsize_ptr(page) == rid.size); + /* assert(recordsPerPage(rid.size) > rid.slot); */ + int recCount = *recordcount_ptr(page); + assert(recCount > rid.slot); +} + +void fixedReadUnlocked(Page * page, recordid rid, byte * buf) { + if(!memcpy(buf, fixed_record_ptr(page, rid.slot), rid.size)) { + perror("memcpy"); + abort(); + } +} +void fixedRead(Page * page, recordid rid, byte * buf) { + readlock(page->rwlatch, 57); + checkRid(page, rid); + + fixedReadUnlocked(page, rid, buf); + + unlock(page->rwlatch); + +} + +void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) { + if(!memcpy(fixed_record_ptr(page, rid.slot), dat, rid.size)) { + perror("memcpy"); + abort(); + } +} + +void fixedWrite(Page * page, recordid rid, const byte* dat) { + readlock(page->rwlatch, 73); + + checkRid(page, rid); + + fixedWriteUnlocked(page, rid, dat); + + unlock(page->rwlatch); +} diff --git a/src/lladd/page/fixed.h b/src/lladd/page/fixed.h new file mode 100644 index 0000000..9abbadb --- /dev/null +++ b/src/lladd/page/fixed.h @@ -0,0 +1,20 @@ +#include "../page.h" + +#ifndef __FIXED_H +#define __FIXED_H + +#define recordsize_ptr(page) shorts_from_end((page), 1) +#define recordcount_ptr(page) shorts_from_end((page), 2) +#define fixed_record_ptr(page, n) bytes_from_start((page), *recordsize_ptr((page)) * (n)) +int recordsPerPage(size_t size); +void fixedPageInitialize(Page * page, size_t size, int count); +short fixedPageCount(Page * page); +short fixedPageRecordSize(Page * page); +recordid fixedRawRallocMany(Page * page, int count); +recordid fixedRawRalloc(Page *page); +void fixedRead(Page * page, recordid rid, byte * buf); +void fixedWrite(Page * page, recordid rid, const byte* dat); +void fixedReadUnlocked(Page * page, recordid rid, byte * buf); +void fixedWriteUnlocked(Page * page, recordid rid, const byte* dat); + +#endif diff --git a/src/lladd/page/slotted.c b/src/lladd/page/slotted.c index 47dca75..6bc7302 100644 --- a/src/lladd/page/slotted.c +++ b/src/lladd/page/slotted.c @@ -112,20 +112,21 @@ static void slottedCompact(Page * page) { certainly asking for trouble, so lastFreepage_mutex is static.) */ -static pthread_mutex_t lastFreepage_mutex; -static unsigned int lastFreepage = -1; + + +/*static pthread_mutex_t lastFreepage_mutex; */ + static unsigned int lastFreepage = -1; void slottedPageInit() { - pthread_mutex_init(&lastFreepage_mutex , NULL); + /*pthread_mutex_init(&lastFreepage_mutex , NULL); */ lastFreepage = -1; } void slottedPageDeinit() { - pthread_mutex_destroy(&lastFreepage_mutex); + /* pthread_mutex_destroy(&lastFreepage_mutex); */ } - void slottedPageInitialize(Page * page) { /* printf("Initializing page %d\n", page->id); fflush(NULL); */ @@ -156,37 +157,37 @@ int slottedFreespace(Page * page) { optimizations later. Perhaps it's better to cluster allocations from the same xid on the same page, or something...) */ -recordid slottedPreRalloc(int xid, long size) { +recordid slottedPreRalloc(int xid, long size, Page ** pp) { recordid ret; - Page * p; + /* Page * p; */ /* DEBUG("Rallocing record of size %ld\n", (long int)size); */ assert(size < BLOB_THRESHOLD_SIZE); - pthread_mutex_lock(&lastFreepage_mutex); + /* pthread_mutex_lock(&lastFreepage_mutex); */ /** @todo is ((unsigned int) foo) == -1 portable? Gotta love C.*/ if(lastFreepage == -1) { lastFreepage = TpageAlloc(xid/*, SLOTTED_PAGE*/); - p = loadPage(lastFreepage); - slottedPageInitialize(p); + *pp = loadPage(lastFreepage); + slottedPageInitialize(*pp); } else { - p = loadPage(lastFreepage); + *pp = loadPage(lastFreepage); } - if(slottedFreespace(p) < size ) { - releasePage(p); + if(slottedFreespace(*pp) < size ) { + releasePage(*pp); lastFreepage = TpageAlloc(xid/*, SLOTTED_PAGE*/); - p = loadPage(lastFreepage); - slottedPageInitialize(p); + *pp = loadPage(lastFreepage); + slottedPageInitialize(*pp); } - ret = slottedRawRalloc(p, size); + ret = slottedRawRalloc(*pp, size); - releasePage(p); + /* releasePage(p); */ /* This gets called in Talloc() now. That prevents the page from being prematurely stolen. */ - pthread_mutex_unlock(&lastFreepage_mutex); + /* pthread_mutex_unlock(&lastFreepage_mutex); */ DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); @@ -290,8 +291,8 @@ recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid) { } else { - int ijk = rid.size; - int lmn = *slot_length_ptr(page, rid.slot); + /* int ijk = rid.size; + int lmn = *slot_length_ptr(page, rid.slot); */ assert((rid.size == *slot_length_ptr(page, rid.slot)) || (*slot_length_ptr(page, rid.slot) >= PAGE_SIZE)); @@ -305,7 +306,6 @@ recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid) { return rid; } - void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid) { readlock(page->rwlatch, 443); diff --git a/src/lladd/page/slotted.h b/src/lladd/page/slotted.h index c621db3..9aef2f7 100644 --- a/src/lladd/page/slotted.h +++ b/src/lladd/page/slotted.h @@ -89,7 +89,7 @@ void slottedPageInitialize(Page * p); * @see postRallocSlot the implementation of the second phase. * */ -recordid slottedPreRalloc(int xid, long size); +recordid slottedPreRalloc(int xid, long size, Page**p); /** * The second phase of slot allocation. Called after the log entry * has been produced, and during recovery. diff --git a/src/lladd/pageFile.c b/src/lladd/pageFile.c index a9fe644..3cddf2e 100644 --- a/src/lladd/pageFile.c +++ b/src/lladd/pageFile.c @@ -25,7 +25,7 @@ static int stable = -1; static pthread_mutex_t stable_mutex; -static long myLseek(int f, long offset, int whence); +/* static long myLseek(int f, long offset, int whence); */ static long myLseekNoLock(int f, long offset, int whence); void pageRead(Page *ret) { diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index e38573d..ffbbb7b 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -56,9 +56,13 @@ void setupOperationsTable() { operationsTable[OPERATION_UNALLOC_FREED] = getUnallocFreedPage(); operationsTable[OPERATION_NOOP] = getNoop(); operationsTable[OPERATION_INSTANT_SET] = getInstantSet(); + operationsTable[OPERATION_ARRAY_LIST_ALLOC] = getArrayListAlloc(); + operationsTable[OPERATION_INITIALIZE_FIXED_PAGE] = getInitFixed(); + operationsTable[OPERATION_UNINITIALIZE_PAGE] = getUnInitPage(); } + int Tinit() { pthread_mutex_init(&transactional_2_mutex, NULL); @@ -128,7 +132,11 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) { releasePage(p); rid = dereferenceRID(rid); p = loadPage(rid.page); - } + } else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) { + rid = dereferenceArrayListRid(p, rid.slot); + releasePage(p); + p = loadPage(rid.page); + } e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); @@ -147,15 +155,21 @@ void Tread(int xid, recordid rid, void * dat) { Page * p = loadPage(rid.page); int page_type = *page_type_ptr(p); if(page_type == SLOTTED_PAGE) { - readRecord(xid, p, rid, dat); + } else if(page_type == INDIRECT_PAGE) { releasePage(p); rid = dereferenceRID(rid); p = loadPage(rid.page); - readRecord(xid, p, rid, dat); + + } else if(page_type == ARRAY_LIST_PAGE) { + rid = dereferenceArrayListRid(p, rid.slot); + releasePage(p); + p = loadPage(rid.page); + } else { abort(); } + readRecord(xid, p, rid, dat); releasePage(p); } diff --git a/test/lladd/check_bufferManager.c b/test/lladd/check_bufferManager.c index 7e019fa..203c90a 100644 --- a/test/lladd/check_bufferManager.c +++ b/test/lladd/check_bufferManager.c @@ -80,14 +80,18 @@ void * workerThread(void * p) { return NULL; } - +static pthread_mutex_t ralloc_mutex; void * workerThreadWriting(void * q) { int offset = *(int*)q; recordid rids[RECORDS_PER_THREAD]; for(int i = 0 ; i < RECORDS_PER_THREAD; i++) { - - rids[i] = slottedPreRalloc(1, sizeof(int)); + Page * tmp; + pthread_mutex_lock(&ralloc_mutex); + rids[i] = slottedPreRalloc(1, sizeof(int), &tmp); + slottedPostRalloc(tmp, 1, rids[i]); + releasePage(tmp); + pthread_mutex_unlock(&ralloc_mutex); /* printf("\nRID:\t%d,%d\n", rids[i].page, rids[i].slot); */ /* fflush(NULL); */ @@ -203,8 +207,9 @@ START_TEST(pageSingleThreadWriterTest) { int i = 100; Tinit(); - + pthread_mutex_init(&ralloc_mutex, NULL); workerThreadWriting(&i); + pthread_mutex_destroy(&ralloc_mutex); Tdeinit(); }END_TEST @@ -214,7 +219,7 @@ START_TEST(pageThreadedWritersTest) { int i; Tinit(); - + pthread_mutex_init(&ralloc_mutex, NULL); for(i = 0; i < RECORD_THREAD_COUNT; i++) { int * j = malloc(sizeof(int)); *j = i; @@ -223,7 +228,7 @@ START_TEST(pageThreadedWritersTest) { for(i = 0; i < RECORD_THREAD_COUNT; i++) { pthread_join(workers[i], NULL); } - + pthread_mutex_destroy(&ralloc_mutex); Tdeinit(); }END_TEST diff --git a/test/lladd/check_operations.c b/test/lladd/check_operations.c index 708abfb..0f59a43 100644 --- a/test/lladd/check_operations.c +++ b/test/lladd/check_operations.c @@ -52,6 +52,8 @@ terms specified in this license. #include "../../src/lladd/page/slotted.h" #define LOG_NAME "check_operations.log" +#include + void simulateBufferManagerCrash(); extern int numActiveXactions; @@ -73,7 +75,8 @@ START_TEST(operation_physical_do_undo) { Tinit(); - rid = slottedPreRalloc(xid, sizeof(int)); + rid = slottedPreRalloc(xid, sizeof(int), &p); + releasePage(p); buf = 1; arg = 2; @@ -375,7 +378,87 @@ START_TEST(operation_instant_set) { } END_TEST +START_TEST(operation_array_list) { + Tinit(); + + int xid = Tbegin(); + + recordid rid = TarrayListAlloc(xid, 4, 2, sizeof(int)); + + TarrayListExtend(xid, rid, 100000); + + printf("commit"); + fflush(stdout); + Tcommit(xid); + printf("done1\n"); + fflush(stdout); + + xid = Tbegin(); + + recordid rid2; + rid2.page = rid.page; + rid2.slot = 0; + rid2.size = sizeof(int); + + for(int i = 0; i < 100000; i++) { + rid2.slot = i; + Tset(xid, rid2, &i); + } + + for(int i = 0; i < 100000; i++) { + rid2.slot = i; + int j; + Tread(xid, rid2, &j); + assert(i == j); + } + + printf("commit"); + fflush(stdout); + Tcommit(xid); + printf("-done2\n"); + fflush(stdout); + + xid = Tbegin(); + + for(int i = 0; i < 100000; i++) { + int j = 0-i; + rid2.slot = i; + Tset(xid, rid2, &j); + } + + for(int i = 0; i < 100000; i++) { + rid2.slot = i; + int j = 0-i; + int k; + Tread(xid, rid2, &k); + assert(k == j); + } + + printf("abort"); + fflush(stdout); + Tabort(xid); + printf("-done\n"); + fflush(stdout); + + xid = Tbegin(); + for(int i = 0; i < 100000; i++) { + rid2.slot = i; + int j; + Tread(xid, rid2, &j); + assert(i == j); + } + + printf("commit"); + fflush(stdout); + Tcommit(xid); + printf("done3\n"); + fflush(stdout); + + + Tdeinit(); + +} END_TEST /** Add suite declarations here @@ -389,6 +472,7 @@ Suite * check_suite(void) { tcase_add_test(tc, operation_physical_do_undo); tcase_add_test(tc, operation_instant_set); tcase_add_test(tc, operation_prepare); + tcase_add_test(tc, operation_array_list); /* --------------------------------------------- */ tcase_add_checked_fixture(tc, setup, teardown); suite_add_tcase(s, tc); diff --git a/test/lladd/check_page.c b/test/lladd/check_page.c index e8f2f59..dccf030 100644 --- a/test/lladd/check_page.c +++ b/test/lladd/check_page.c @@ -46,6 +46,7 @@ terms specified in this license. #include "../../src/lladd/page.h" #include "../../src/lladd/page/slotted.h" +#include "../../src/lladd/page/fixed.h" #include #include @@ -107,6 +108,39 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) { } +static void* fixed_worker_thread(void * arg_ptr) { + Page * p = (Page*)arg_ptr; + int i; + lsn_t this_lsn; + int j; + int first = 1; + recordid rid; + + for(i = 0; i < 100; i++) { + pthread_mutex_lock(&lsn_mutex); + this_lsn = lsn; + lsn++; + pthread_mutex_unlock(&lsn_mutex); + + if(! first ) { + fixedRead(p, rid, (byte*)&j); + assert((j + 1) == i); + /* slottedDeRalloc(p, lsn, rid); */ + sched_yield(); + } + + first = 0; + + rid = fixedRawRalloc(p); + fixedWrite(p, rid, (byte*)&i); + sched_yield(); + + assert(pageReadLSN(p) <= lsn); + } + + return NULL; +} + static void* worker_thread(void * arg_ptr) { Page * p = (Page*)arg_ptr; int i; @@ -292,13 +326,34 @@ START_TEST(pageThreadTest) { pthread_join(workers[i], NULL); } - unlock(p->loadlatch); + /* unlock(p->loadlatch); */ + releasePage(p); Tdeinit(); } END_TEST +START_TEST(fixedPageThreadTest) { + pthread_t workers[THREAD_COUNT]; + int i; + pthread_mutex_init(&random_mutex, NULL); + pthread_mutex_init(&lsn_mutex, NULL); + Tinit(); + Page * p = loadPage(2); + fixedPageInitialize(p, sizeof(int), 0); + + for(i = 0; i < THREAD_COUNT; i++) { + pthread_create(&workers[i], NULL, fixed_worker_thread, p); + } + + for(i = 0; i < THREAD_COUNT; i++) { + pthread_join(workers[i], NULL); + } + + releasePage(p); +} END_TEST + Suite * check_suite(void) { Suite *s = suite_create("page"); /* Begin a new test */ @@ -313,6 +368,7 @@ Suite * check_suite(void) { tcase_add_test(tc, pageNoThreadMultPageTest); tcase_add_test(tc, pageNoThreadTest); tcase_add_test(tc, pageThreadTest); + tcase_add_test(tc, fixedPageThreadTest); /* --------------------------------------------- */