Implemented pages that store fixed sized records efficiently, and a java-style ArrayList data structure that efficiently supports (relatively) clustered, O(1) access time expandable arrays. (This will be used for the hash implementation...)

This commit is contained in:
Sears Russell 2004-10-06 06:08:09 +00:00
parent 9268b1d9cf
commit 5064e3fac2
20 changed files with 705 additions and 76 deletions

View file

@ -1,4 +1,25 @@
2004-07-20 sears <sears@Morphix>
2004-10-02 sears@euglenoid <sears@euglenoid>
* alloc.h, instantSet.h, noop.h, prepare.h:
Added test cases for Tprepare(), implemented some redo-only operations, and started to clean up record allocation/deallocation.
Also, numerous bugfixes.
2004-08-21 sears@euglenoid <sears@euglenoid>
* pageOperations.h:
Implemented a freespace manager that should safely allocate space, even in the face of crashes, and can reclaim unused space (unless an application opens more than one simultaneous transaction that performs allocations)
Fixed some blob bugs (by adding extra fdatasync() calls).
Began factoring out the page management code so that it is an extenstion, and a less integral part of lladd.
2004-08-03 sears@euglenoid <sears@euglenoid>
* lladdhash.h, pageOperations.h:
Added (untested) support for whole-page operations, lladdhash now works.
rr 2004-07-20 sears <sears@Morphix>
* bufferManager.c, common.c, operations/alloc.c, page.c, page.h, pageCache.c, pageFile.c, transactional2.c:
Continuing work on multi-threading. r/w access to buffer manager getting close, but still buggy.

View file

@ -2,7 +2,7 @@ EXTRA_DIST = reconf
SUBDIRS = src test utilities
AM_CFLAGS = -g -Wall -pedantic
docs:
docs: coverage
doxygen doc/Doxyfile-api
doxygen doc/Doxyfile-developers

View file

@ -67,7 +67,7 @@ terms specified in this license.
/* @define error codes
*/
#define OUT_OF_MEM 1
#define FILE_OPEN_ERROR 2
#define FILE_OPRN_ERROR 2
#define FILE_READ_ERROR 3
#define FILE_WRITE_ERROR 4
#define FILE_WRITE_OPEN_ERROR 5
@ -75,10 +75,11 @@ terms specified in this license.
#define PAGE_SIZE 4096
/*#define MAX_BUFFER_SIZE 100003 */
/* #define MAX_BUFFER_SIZE 100003 */
/*#define MAX_BUFFER_SIZE 10007*/
/*#define MAX_BUFFER_SIZE 5003*/
#define MAX_BUFFER_SIZE 71
#define MAX_BUFFER_SIZE 2003
/* #define MAX_BUFFER_SIZE 71 */
/*#define MAX_BUFFER_SIZE 7 */
/*#define BUFFER_ASOOCIATIVE 2 */
@ -109,6 +110,9 @@ terms specified in this license.
#define OPERATION_UNALLOC_FREED 17
#define OPERATION_NOOP 18
#define OPERATION_INSTANT_SET 19
#define OPERATION_ARRAY_LIST_ALLOC 20
#define OPERATION_INITIALIZE_FIXED_PAGE 21
#define OPERATION_UNINITIALIZE_PAGE 22
/* number above should be less than number below */
#define MAX_OPERATIONS 40

View file

@ -116,9 +116,17 @@ typedef struct {
The other option:
- Get rid of operations that span records entirely by
splitting complex logical operations into simpler one.
splitting complex logical operations into simpler ones.
We chose the second option for now.
We chose the second option for now. This implies that the
entries must be written to the log in an order, that if
repeated, guarantees that the structure will be in a logically
consistent state after the REDO phase, regardless of what
prefix of the log actually makes it to disk. Note that
pinning pages before the log entry hits disk is inadequate, in
general, since other transactions could read dirty information
from the pinned pages, producsing nonsensical log entries that
preceed the current transaction's log entry.
*/
/**
@ -139,6 +147,7 @@ typedef struct {
#include "operations/pageOperations.h"
#include "operations/noop.h"
#include "operations/instantSet.h"
#include "operations/arrayList.h"
extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */

View file

@ -0,0 +1,29 @@
#include <lladd/operations.h>
#ifndef __ARRAY_LIST_H
#define __ARRAY_LIST_H
/**
@file
@ingroup OPERATIONS
$Id$
*/
recordid TarrayListAlloc(int xid, int count, int multiplier, int size);
Operation getArrayListAlloc();
Operation getInitFixed();
Operation getUnInitPage();
/** Initialized a fixed page with the maximum possible number of slots
allocated. The rid.size field is used to determine the size of
record that the slots can hold. */
#define TinitFixed(xid, rid) Tupdate(xid, rid, NULL, OPERATION_INITIALIZE_FIXED_PAGE)
/** Un-initializes a page. */
#define TunInitPage(xid, rid) Tupdate(xid, rid, NULL, OPERATION_UNINITIALIZE_PAGE)
recordid dereferenceArrayListRid(Page * p, int offset);
int TarrayListExtend(int xid, recordid rid, int slots);
#endif

View file

@ -9,6 +9,7 @@ liblladd_a_SOURCES=common.c stats.c io.c bufferManager.c linkedlist.c operations
operations/pageOperations.c page/indirect.c operations/decrement.c \
operations/increment.c operations/prepare.c operations/set.c \
operations/alloc.c operations/noop.c operations/instantSet.c \
page/slotted.c operations/lladdhash.c page/header.c
page/slotted.c operations/lladdhash.c page/header.c page/fixed.c \
operations/arrayList.c
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -50,34 +50,30 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat)
return 0;
}
/** @todo Currently, we just leak store space on dealloc. */
/** @todo Currently, we leak empty pages on dealloc. */
static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
/* Page * loadedPage = loadPage(rid.page); */
/** Has no effect during normal operation, other than updating the LSN. */
/* slottedPostRalloc(p, lsn, rid); */
/* Page * loadedPage = loadPage(rid.page); */
assert(rid.page == p->id);
slottedDeRalloc(p, lsn, rid);
/* releasePage(loadedPage); */
return 0;
}
static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
/* operate(xid, p, lsn, rid, dat); */
if(rid.size >= BLOB_THRESHOLD_SIZE) {
rid.size = BLOB_REC_SIZE; /* Don't reuse blob space yet... */
}
slottedPostRalloc(p, lsn, rid);
/** @todo dat should be the pointer to the space in the blob store. */
writeRecord(xid, p, lsn, rid, dat);
return 0;
}
static pthread_mutex_t talloc_mutex;
Operation getAlloc() {
pthread_mutex_init(&talloc_mutex, NULL);
Operation o = {
OPERATION_ALLOC, /* ID */
0,
@ -111,15 +107,28 @@ Operation getRealloc() {
recordid Talloc(int xid, long size) {
recordid rid;
Page * p = NULL;
if(size >= BLOB_THRESHOLD_SIZE) {
rid = preAllocBlob(xid, size);
} else {
rid = slottedPreRalloc(xid, size);
pthread_mutex_lock(&talloc_mutex);
rid = slottedPreRalloc(xid, size, &p);
assert(p != NULL);
}
Tupdate(xid,rid, NULL, OPERATION_ALLOC);
if(p != NULL) {
/* release the page that preAllocBlob pinned for us. */
/* @todo alloc.c pins multiple pages -> Will deadlock with small buffer sizes.. */
releasePage(p);
pthread_mutex_unlock(&talloc_mutex);
/*pthread_mutex_unlock(&talloc_mutex); */
}
return rid;
}
@ -128,7 +137,8 @@ void Tdealloc(int xid, recordid rid) {
void * preimage = malloc(rid.size);
Page * p = loadPage(rid.page);
readRecord(xid, p, rid, preimage);
releasePage(p); /** @todo race in Tdealloc; do we care? */
/** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */
Tupdate(xid, rid, preimage, OPERATION_DEALLOC);
releasePage(p);
free(preimage);
}

View file

@ -0,0 +1,285 @@
#include <config.h>
#include <lladd/common.h>
#include "../page/fixed.h"
#include <lladd/operations/pageOperations.h>
#include <lladd/operations/arrayList.h>
#include <lladd/transactional.h>
#include <lladd/bufferManager.h>
#include <assert.h>
#include <math.h>
/**
Implement resizable arrays, just like java's ArrayList class.
Essentially, the base page contains a fixed size array of rids
pointing at contiguous blocks of pages. Each block is twice as
big as the previous block.
The base block is of type FIXED_PAGE, of int's. The first few slots are reserved:
*/
typedef struct {
int firstPage;
int initialSize;
int multiplier;
int size;
int maxOffset;
} TarrayListParameters;
static TarrayListParameters pageToTLP(Page * p);
static int getBlockContainingOffset(TarrayListParameters tlp, int offset, int * firstSlotInBlock);
/*----------------------------------------------------------------------------*/
recordid TarrayListAlloc(int xid, int count, int multiplier, int size) {
int firstPage = TpageAllocMany(xid, count+1);
TarrayListParameters tlp;
tlp.firstPage = firstPage;
tlp.initialSize = count;
tlp.multiplier = multiplier;
tlp.size = size;
tlp.maxOffset = 0;
recordid rid;
rid.page = firstPage;
rid.size = 0;
rid.slot = 0;
Tupdate(xid, rid, &tlp, OPERATION_ARRAY_LIST_ALLOC);
return rid;
}
static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
const TarrayListParameters * tlp = dat;
int firstPage = tlp->firstPage;
int count = tlp->initialSize;
int multiplier = tlp->multiplier;
int size = tlp->size;
/* Page * p = loadPage(firstPage); */
fixedPageInitialize(p, sizeof(int), recordsPerPage(sizeof(int)));
/* recordid countRid = fixedRawRalloc(p);
recordid multiplierRid = fixedRawRalloc(p);
recordid slotSizeRid = fixedRawRalloc(p); */
#define MAX_OFFSET_POSITION 3
/* recordid maxOffset = fixedRawRalloc(p); */
#define FIRST_DATA_PAGE_OFFSET 4
/* recordid firstDataPageRid = fixedRawRalloc(p); */
recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid;
countRid.page = multiplierRid.page = slotSizeRid.page = maxOffset.page = firstDataPageRid.page = p->id;
countRid.size = multiplierRid.size = slotSizeRid.size = maxOffset.size = firstDataPageRid.size = sizeof(int);
countRid.slot = 0;
multiplierRid.slot = 1;
slotSizeRid.slot = 2;
maxOffset.slot = 3;
firstDataPageRid.slot = 4;
int firstDataPage = firstPage + 1;
/* Allocing this page -> implicit lock. */
fixedWriteUnlocked(p, countRid, (byte*)&count);
fixedWriteUnlocked(p, multiplierRid, (byte*)&multiplier);
fixedWriteUnlocked(p, firstDataPageRid, (byte*)&firstDataPage);
fixedWriteUnlocked(p, slotSizeRid, (byte*)&size);
int minusOne = -1;
fixedWriteUnlocked(p, maxOffset, (byte*)&minusOne);
/* Write lsn... */
*page_type_ptr(p) = ARRAY_LIST_PAGE;
pageWriteLSN(p, lsn);
recordid ret;
ret.page = firstPage;
ret.slot = 0; /* slot = # of slots in array... */
ret.size = size;
return 0;
}
Operation getArrayListAlloc() {
Operation o = {
OPERATION_ARRAY_LIST_ALLOC, /* ID */
sizeof(TarrayListParameters),
OPERATION_NOOP, /* Since TpageAllocMany will be undone, the page we touch will be nuked anyway, so set this to NO-OP. */
&operateAlloc
};
return o;
}
/*----------------------------------------------------------------------------*/
int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p = loadPage(rid.page);
TarrayListParameters tlp = pageToTLP(p);
releasePage(p);
int lastCurrentBlock;
if(tlp.maxOffset == -1) {
lastCurrentBlock = -1;
} else{
lastCurrentBlock = getBlockContainingOffset(tlp, tlp.maxOffset, NULL);
}
int lastNewBlock = getBlockContainingOffset(tlp, tlp.maxOffset+slots, NULL);
DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock);
recordid tmp; /* recordid of slot in base page that holds new block. */
tmp.page = rid.page;
tmp.size = sizeof(int);
recordid tmp2; /* recordid of newly created pages. */
tmp2.slot = 0;
tmp2.size = tlp.size;
for(int i = lastCurrentBlock+1; i <= lastNewBlock; i++) {
/* Alloc block i */
int blockSize = tlp.initialSize * powl(tlp.multiplier, i);
int newFirstPage = TpageAllocMany(xid, blockSize);
DEBUG("block %d\n", i);
for(int j = 0; j < blockSize; j++) {
DEBUG("page %d (%d)\n", j, j + newFirstPage);
tmp2.page = j + newFirstPage;
Tupdate(xid, tmp2, NULL, OPERATION_INITIALIZE_FIXED_PAGE);
}
tmp.slot = i + FIRST_DATA_PAGE_OFFSET;
/** @todo what does this do to recovery?? */
/** @todo locking for arrayList... */
*page_type_ptr(p) = FIXED_PAGE;
Tset(xid, tmp, &newFirstPage);
*page_type_ptr(p) = ARRAY_LIST_PAGE;
DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage);
}
tmp.slot = MAX_OFFSET_POSITION;
int newMaxOffset = tlp.maxOffset+slots;
*page_type_ptr(p) = FIXED_PAGE;
Tset(xid, tmp, &newMaxOffset);
*page_type_ptr(p) = ARRAY_LIST_PAGE;
return 0;
}
static int operateInitFixed(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
fixedPageInitialize(p, rid.size, recordsPerPage(rid.size));
pageWriteLSN(p, lsn);
return 0;
}
static int operateUnInitPage(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
*page_type_ptr(p) = UNINITIALIZED_PAGE;
pageWriteLSN(p, lsn);
return 0;
}
Operation getInitFixed() {
Operation o = {
OPERATION_INITIALIZE_FIXED_PAGE,
0, /* The necessary parameters are hidden in the rid */
OPERATION_UNINITIALIZE_PAGE,
&operateInitFixed
};
return o;
}
Operation getUnInitPage() {
Operation o = {
OPERATION_UNINITIALIZE_PAGE,
PAGE_SIZE,
NO_INVERSE_WHOLE_PAGE,
&operateUnInitPage
};
return o;
}
/*----------------------------------------------------------------------------*/
recordid dereferenceArrayListRid(Page * p, int offset) {
TarrayListParameters tlp = pageToTLP(p);
int rec_per_page = recordsPerPage((size_t)tlp.size);
int lastHigh = 0;
int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */
assert(tlp.maxOffset >= offset);
pageRidSlot = getBlockContainingOffset(tlp, offset, &lastHigh);
int dataSlot = offset - lastHigh; /* The offset in the block of interest of the slot we want. */
int blockPage = dataSlot / rec_per_page; /* The page in the block of interest that contains the slot we want */
int blockSlot = dataSlot - blockPage * rec_per_page;
/* recordid tmp;
tmp.page = tlp.firstPage;
tmp.size = sizeof(int);
tmp.slot = pageRidSlot + FIRST_DATA_PAGE_OFFSET; */
int thePage;
assert(pageRidSlot + FIRST_DATA_PAGE_OFFSET < fixedPageCount(p));
/* fixedReadUnlocked(p, tmp, (byte*)&thePage); *//* reading immutable record.. */
thePage = *(int*)fixed_record_ptr(p, pageRidSlot + FIRST_DATA_PAGE_OFFSET);
recordid rid;
rid.page = thePage + blockPage;
rid.slot = blockSlot;
rid.size = tlp.size;
return rid;
}
static int getBlockContainingOffset(TarrayListParameters tlp, int offset, int * firstSlotInBlock) {
int rec_per_page = recordsPerPage((size_t)tlp.size);
long thisHigh = rec_per_page * tlp.initialSize;
int lastHigh = 0;
int pageRidSlot = 0;
int currentPageLength = tlp.initialSize;
while(((long)offset) >= thisHigh) {
pageRidSlot ++;
lastHigh = thisHigh;
currentPageLength *= tlp.multiplier;
thisHigh += rec_per_page * currentPageLength;
}
if(firstSlotInBlock) {
*firstSlotInBlock = lastHigh;
}
return pageRidSlot;
}
static TarrayListParameters pageToTLP(Page * p) {
TarrayListParameters tlp;
tlp.firstPage = p->id;
tlp.initialSize = *(int*)fixed_record_ptr(p, 0);
tlp.multiplier = *(int*)fixed_record_ptr(p, 1);
tlp.size = *(int*)fixed_record_ptr(p, 2);
tlp.maxOffset = *(int*)fixed_record_ptr(p, 3);
return tlp;
}

View file

@ -8,10 +8,6 @@
#include "../page/header.h"
#include "../pageFile.h"
static int freelist;
static int freepage;
static pthread_mutex_t pageAllocMutex;
@ -293,11 +289,6 @@ int TpageAlloc(int xid /*, int type */) {
return newpage;
}
/** Allocs an extent of pages. @todo CONCURRENCY BUG TpageAllocMany
can not be concurrent until ralloc uses TpageAlloc to allocate new
records. (And. concurrency for TpageAllocMany hasn't been
implemented yet...
*/
int TpageAllocMany(int xid, int count /*, int type*/) {
/* int firstPage = -1;
int lastPage = -1; */
@ -320,15 +311,6 @@ int TpageAllocMany(int xid, int count /*, int type*/) {
rid.page = newpage;
/* for(int i = 0 ; i < count; i++) {
int thisPage = TpageAlloc(xid, type);
if(lastPage == -1) {
firstPage = lastPage = thisPage;
} else {
assert((lastPage +1) == thisPage);
lastPage = thisPage;
}
} */
pthread_mutex_unlock(&pageAllocMutex);
return newpage;
}

View file

@ -86,6 +86,7 @@ terms specified in this license.
#include "pageFile.h"
#include "page/slotted.h"
#include "page/fixed.h"
/* TODO: Combine with buffer size... */
static int nextPage = 0;
@ -248,8 +249,12 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
if(rid.size > BLOB_THRESHOLD_SIZE) {
writeBlob(xid, p, lsn, rid, dat);
} else {
} else if(*page_type_ptr(p) == SLOTTED_PAGE) {
slottedWrite(xid, p, lsn, rid, dat);
} else if(*page_type_ptr(p) == FIXED_PAGE) {
fixedWrite(p, rid, dat);
} else {
abort();
}
assert( (p->id == rid.page) && (p->memAddr != NULL) );
@ -261,10 +266,17 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
void readRecord(int xid, Page * p, recordid rid, void *buf) {
assert(rid.page == p->id);
int page_type = *page_type_ptr(p);
if(rid.size > BLOB_THRESHOLD_SIZE) {
readBlob(xid, p, rid, buf);
} else {
} else if(page_type == SLOTTED_PAGE) {
slottedRead(xid, p, rid, buf);
} else if(page_type == FIXED_PAGE) {
fixedRead(p, rid, buf);
} else {
abort();
}
assert(rid.page == p->id);
}

View file

@ -98,7 +98,8 @@ BEGIN_C_DECLS
#define INDIRECT_PAGE 2
#define LLADD_HEADER_PAGE 3
#define LLADD_FREE_PAGE 4
#define FIXED_PAGE 5
#define ARRAY_LIST_PAGE 6
#define lsn_ptr(page) (((lsn_t *)(&((page)->memAddr[PAGE_SIZE])))-1)
#define page_type_ptr(page) (((int*)lsn_ptr((page)))-1)
#define end_of_usable_space_ptr(page) page_type_ptr((page))

96
src/lladd/page/fixed.c Normal file
View file

@ -0,0 +1,96 @@
#include "../page.h"
#include "fixed.h"
#include <assert.h>
int recordsPerPage(size_t size) {
return (USABLE_SIZE_OF_PAGE - 2*sizeof(short)) / size;
}
void fixedPageInitialize(Page * page, size_t size, int count) {
assert(*page_type_ptr(page) == UNINITIALIZED_PAGE);
*page_type_ptr(page) = FIXED_PAGE;
*recordsize_ptr(page) = size;
assert(count <= recordsPerPage(size));
*recordcount_ptr(page)= count;
}
short fixedPageCount(Page * page) {
assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE);
return *recordcount_ptr(page);
}
short fixedPageRecordSize(Page * page) {
assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE);
return *recordsize_ptr(page);
}
recordid fixedRawRallocMany(Page * page, int count) {
assert(*page_type_ptr(page) == FIXED_PAGE);
recordid rid;
writelock(page->rwlatch, 33);
if(*recordcount_ptr(page) + count <= recordsPerPage(*recordsize_ptr(page))) {
rid.page = page->id;
rid.slot = *recordcount_ptr(page);
rid.size = *recordsize_ptr(page);
*recordcount_ptr(page)+=count;
} else {
rid.page = -1;
rid.slot = -1;
rid.size = -1;
}
unlock(page->rwlatch);
return rid;
}
recordid fixedRawRalloc(Page *page) {
assert(*page_type_ptr(page) == FIXED_PAGE);
return fixedRawRallocMany(page, 1);
}
static void checkRid(Page * page, recordid rid) {
assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE);
assert(page->id == rid.page);
assert(*recordsize_ptr(page) == rid.size);
/* assert(recordsPerPage(rid.size) > rid.slot); */
int recCount = *recordcount_ptr(page);
assert(recCount > rid.slot);
}
void fixedReadUnlocked(Page * page, recordid rid, byte * buf) {
if(!memcpy(buf, fixed_record_ptr(page, rid.slot), rid.size)) {
perror("memcpy");
abort();
}
}
void fixedRead(Page * page, recordid rid, byte * buf) {
readlock(page->rwlatch, 57);
checkRid(page, rid);
fixedReadUnlocked(page, rid, buf);
unlock(page->rwlatch);
}
void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) {
if(!memcpy(fixed_record_ptr(page, rid.slot), dat, rid.size)) {
perror("memcpy");
abort();
}
}
void fixedWrite(Page * page, recordid rid, const byte* dat) {
readlock(page->rwlatch, 73);
checkRid(page, rid);
fixedWriteUnlocked(page, rid, dat);
unlock(page->rwlatch);
}

20
src/lladd/page/fixed.h Normal file
View file

@ -0,0 +1,20 @@
#include "../page.h"
#ifndef __FIXED_H
#define __FIXED_H
#define recordsize_ptr(page) shorts_from_end((page), 1)
#define recordcount_ptr(page) shorts_from_end((page), 2)
#define fixed_record_ptr(page, n) bytes_from_start((page), *recordsize_ptr((page)) * (n))
int recordsPerPage(size_t size);
void fixedPageInitialize(Page * page, size_t size, int count);
short fixedPageCount(Page * page);
short fixedPageRecordSize(Page * page);
recordid fixedRawRallocMany(Page * page, int count);
recordid fixedRawRalloc(Page *page);
void fixedRead(Page * page, recordid rid, byte * buf);
void fixedWrite(Page * page, recordid rid, const byte* dat);
void fixedReadUnlocked(Page * page, recordid rid, byte * buf);
void fixedWriteUnlocked(Page * page, recordid rid, const byte* dat);
#endif

View file

@ -112,20 +112,21 @@ static void slottedCompact(Page * page) {
certainly asking for trouble, so lastFreepage_mutex is static.)
*/
static pthread_mutex_t lastFreepage_mutex;
static unsigned int lastFreepage = -1;
/*static pthread_mutex_t lastFreepage_mutex; */
static unsigned int lastFreepage = -1;
void slottedPageInit() {
pthread_mutex_init(&lastFreepage_mutex , NULL);
/*pthread_mutex_init(&lastFreepage_mutex , NULL); */
lastFreepage = -1;
}
void slottedPageDeinit() {
pthread_mutex_destroy(&lastFreepage_mutex);
/* pthread_mutex_destroy(&lastFreepage_mutex); */
}
void slottedPageInitialize(Page * page) {
/* printf("Initializing page %d\n", page->id);
fflush(NULL); */
@ -156,37 +157,37 @@ int slottedFreespace(Page * page) {
optimizations later. Perhaps it's better to cluster allocations
from the same xid on the same page, or something...)
*/
recordid slottedPreRalloc(int xid, long size) {
recordid slottedPreRalloc(int xid, long size, Page ** pp) {
recordid ret;
Page * p;
/* Page * p; */
/* DEBUG("Rallocing record of size %ld\n", (long int)size); */
assert(size < BLOB_THRESHOLD_SIZE);
pthread_mutex_lock(&lastFreepage_mutex);
/* pthread_mutex_lock(&lastFreepage_mutex); */
/** @todo is ((unsigned int) foo) == -1 portable? Gotta love C.*/
if(lastFreepage == -1) {
lastFreepage = TpageAlloc(xid/*, SLOTTED_PAGE*/);
p = loadPage(lastFreepage);
slottedPageInitialize(p);
*pp = loadPage(lastFreepage);
slottedPageInitialize(*pp);
} else {
p = loadPage(lastFreepage);
*pp = loadPage(lastFreepage);
}
if(slottedFreespace(p) < size ) {
releasePage(p);
if(slottedFreespace(*pp) < size ) {
releasePage(*pp);
lastFreepage = TpageAlloc(xid/*, SLOTTED_PAGE*/);
p = loadPage(lastFreepage);
slottedPageInitialize(p);
*pp = loadPage(lastFreepage);
slottedPageInitialize(*pp);
}
ret = slottedRawRalloc(p, size);
ret = slottedRawRalloc(*pp, size);
releasePage(p);
/* releasePage(p); */ /* This gets called in Talloc() now. That prevents the page from being prematurely stolen. */
pthread_mutex_unlock(&lastFreepage_mutex);
/* pthread_mutex_unlock(&lastFreepage_mutex); */
DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size);
@ -290,8 +291,8 @@ recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid) {
} else {
int ijk = rid.size;
int lmn = *slot_length_ptr(page, rid.slot);
/* int ijk = rid.size;
int lmn = *slot_length_ptr(page, rid.slot); */
assert((rid.size == *slot_length_ptr(page, rid.slot)) ||
(*slot_length_ptr(page, rid.slot) >= PAGE_SIZE));
@ -305,7 +306,6 @@ recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid) {
return rid;
}
void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid) {
readlock(page->rwlatch, 443);

View file

@ -89,7 +89,7 @@ void slottedPageInitialize(Page * p);
* @see postRallocSlot the implementation of the second phase.
*
*/
recordid slottedPreRalloc(int xid, long size);
recordid slottedPreRalloc(int xid, long size, Page**p);
/**
* The second phase of slot allocation. Called after the log entry
* has been produced, and during recovery.

View file

@ -25,7 +25,7 @@
static int stable = -1;
static pthread_mutex_t stable_mutex;
static long myLseek(int f, long offset, int whence);
/* static long myLseek(int f, long offset, int whence); */
static long myLseekNoLock(int f, long offset, int whence);
void pageRead(Page *ret) {

View file

@ -56,9 +56,13 @@ void setupOperationsTable() {
operationsTable[OPERATION_UNALLOC_FREED] = getUnallocFreedPage();
operationsTable[OPERATION_NOOP] = getNoop();
operationsTable[OPERATION_INSTANT_SET] = getInstantSet();
operationsTable[OPERATION_ARRAY_LIST_ALLOC] = getArrayListAlloc();
operationsTable[OPERATION_INITIALIZE_FIXED_PAGE] = getInitFixed();
operationsTable[OPERATION_UNINITIALIZE_PAGE] = getUnInitPage();
}
int Tinit() {
pthread_mutex_init(&transactional_2_mutex, NULL);
@ -128,6 +132,10 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
releasePage(p);
rid = dereferenceRID(rid);
p = loadPage(rid.page);
} else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRid(p, rid.slot);
releasePage(p);
p = loadPage(rid.page);
}
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
@ -147,15 +155,21 @@ void Tread(int xid, recordid rid, void * dat) {
Page * p = loadPage(rid.page);
int page_type = *page_type_ptr(p);
if(page_type == SLOTTED_PAGE) {
readRecord(xid, p, rid, dat);
} else if(page_type == INDIRECT_PAGE) {
releasePage(p);
rid = dereferenceRID(rid);
p = loadPage(rid.page);
readRecord(xid, p, rid, dat);
} else if(page_type == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRid(p, rid.slot);
releasePage(p);
p = loadPage(rid.page);
} else {
abort();
}
readRecord(xid, p, rid, dat);
releasePage(p);
}

View file

@ -80,14 +80,18 @@ void * workerThread(void * p) {
return NULL;
}
static pthread_mutex_t ralloc_mutex;
void * workerThreadWriting(void * q) {
int offset = *(int*)q;
recordid rids[RECORDS_PER_THREAD];
for(int i = 0 ; i < RECORDS_PER_THREAD; i++) {
rids[i] = slottedPreRalloc(1, sizeof(int));
Page * tmp;
pthread_mutex_lock(&ralloc_mutex);
rids[i] = slottedPreRalloc(1, sizeof(int), &tmp);
slottedPostRalloc(tmp, 1, rids[i]);
releasePage(tmp);
pthread_mutex_unlock(&ralloc_mutex);
/* printf("\nRID:\t%d,%d\n", rids[i].page, rids[i].slot); */
/* fflush(NULL); */
@ -203,8 +207,9 @@ START_TEST(pageSingleThreadWriterTest) {
int i = 100;
Tinit();
pthread_mutex_init(&ralloc_mutex, NULL);
workerThreadWriting(&i);
pthread_mutex_destroy(&ralloc_mutex);
Tdeinit();
}END_TEST
@ -214,7 +219,7 @@ START_TEST(pageThreadedWritersTest) {
int i;
Tinit();
pthread_mutex_init(&ralloc_mutex, NULL);
for(i = 0; i < RECORD_THREAD_COUNT; i++) {
int * j = malloc(sizeof(int));
*j = i;
@ -223,7 +228,7 @@ START_TEST(pageThreadedWritersTest) {
for(i = 0; i < RECORD_THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);
}
pthread_mutex_destroy(&ralloc_mutex);
Tdeinit();
}END_TEST

View file

@ -52,6 +52,8 @@ terms specified in this license.
#include "../../src/lladd/page/slotted.h"
#define LOG_NAME "check_operations.log"
#include <stdio.h>
void simulateBufferManagerCrash();
extern int numActiveXactions;
@ -73,7 +75,8 @@ START_TEST(operation_physical_do_undo) {
Tinit();
rid = slottedPreRalloc(xid, sizeof(int));
rid = slottedPreRalloc(xid, sizeof(int), &p);
releasePage(p);
buf = 1;
arg = 2;
@ -375,7 +378,87 @@ START_TEST(operation_instant_set) {
} END_TEST
START_TEST(operation_array_list) {
Tinit();
int xid = Tbegin();
recordid rid = TarrayListAlloc(xid, 4, 2, sizeof(int));
TarrayListExtend(xid, rid, 100000);
printf("commit");
fflush(stdout);
Tcommit(xid);
printf("done1\n");
fflush(stdout);
xid = Tbegin();
recordid rid2;
rid2.page = rid.page;
rid2.slot = 0;
rid2.size = sizeof(int);
for(int i = 0; i < 100000; i++) {
rid2.slot = i;
Tset(xid, rid2, &i);
}
for(int i = 0; i < 100000; i++) {
rid2.slot = i;
int j;
Tread(xid, rid2, &j);
assert(i == j);
}
printf("commit");
fflush(stdout);
Tcommit(xid);
printf("-done2\n");
fflush(stdout);
xid = Tbegin();
for(int i = 0; i < 100000; i++) {
int j = 0-i;
rid2.slot = i;
Tset(xid, rid2, &j);
}
for(int i = 0; i < 100000; i++) {
rid2.slot = i;
int j = 0-i;
int k;
Tread(xid, rid2, &k);
assert(k == j);
}
printf("abort");
fflush(stdout);
Tabort(xid);
printf("-done\n");
fflush(stdout);
xid = Tbegin();
for(int i = 0; i < 100000; i++) {
rid2.slot = i;
int j;
Tread(xid, rid2, &j);
assert(i == j);
}
printf("commit");
fflush(stdout);
Tcommit(xid);
printf("done3\n");
fflush(stdout);
Tdeinit();
} END_TEST
/**
Add suite declarations here
@ -389,6 +472,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, operation_physical_do_undo);
tcase_add_test(tc, operation_instant_set);
tcase_add_test(tc, operation_prepare);
tcase_add_test(tc, operation_array_list);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);

View file

@ -46,6 +46,7 @@ terms specified in this license.
#include "../../src/lladd/page.h"
#include "../../src/lladd/page/slotted.h"
#include "../../src/lladd/page/fixed.h"
#include <lladd/bufferManager.h>
#include <lladd/transactional.h>
@ -107,6 +108,39 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) {
}
static void* fixed_worker_thread(void * arg_ptr) {
Page * p = (Page*)arg_ptr;
int i;
lsn_t this_lsn;
int j;
int first = 1;
recordid rid;
for(i = 0; i < 100; i++) {
pthread_mutex_lock(&lsn_mutex);
this_lsn = lsn;
lsn++;
pthread_mutex_unlock(&lsn_mutex);
if(! first ) {
fixedRead(p, rid, (byte*)&j);
assert((j + 1) == i);
/* slottedDeRalloc(p, lsn, rid); */
sched_yield();
}
first = 0;
rid = fixedRawRalloc(p);
fixedWrite(p, rid, (byte*)&i);
sched_yield();
assert(pageReadLSN(p) <= lsn);
}
return NULL;
}
static void* worker_thread(void * arg_ptr) {
Page * p = (Page*)arg_ptr;
int i;
@ -292,13 +326,34 @@ START_TEST(pageThreadTest) {
pthread_join(workers[i], NULL);
}
unlock(p->loadlatch);
/* unlock(p->loadlatch); */
releasePage(p);
Tdeinit();
} END_TEST
START_TEST(fixedPageThreadTest) {
pthread_t workers[THREAD_COUNT];
int i;
pthread_mutex_init(&random_mutex, NULL);
pthread_mutex_init(&lsn_mutex, NULL);
Tinit();
Page * p = loadPage(2);
fixedPageInitialize(p, sizeof(int), 0);
for(i = 0; i < THREAD_COUNT; i++) {
pthread_create(&workers[i], NULL, fixed_worker_thread, p);
}
for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);
}
releasePage(p);
} END_TEST
Suite * check_suite(void) {
Suite *s = suite_create("page");
/* Begin a new test */
@ -313,6 +368,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, pageNoThreadMultPageTest);
tcase_add_test(tc, pageNoThreadTest);
tcase_add_test(tc, pageThreadTest);
tcase_add_test(tc, fixedPageThreadTest);
/* --------------------------------------------- */