Continuing work on multi-threading. r/w access to buffer manager getting close, but still buggy.
This commit is contained in:
parent
490dd86c09
commit
0ce77903fb
11 changed files with 319 additions and 40 deletions
|
@ -99,8 +99,8 @@ extern int errno;
|
||||||
#define lsn_t long
|
#define lsn_t long
|
||||||
|
|
||||||
|
|
||||||
/*#define DEBUGGING
|
/*#define DEBUGGING */
|
||||||
#define PROFILE_LATCHES*/
|
#define PROFILE_LATCHES
|
||||||
|
|
||||||
#ifdef DEBUGGING
|
#ifdef DEBUGGING
|
||||||
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */
|
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */
|
||||||
|
|
|
@ -68,6 +68,8 @@ terms specified in this license.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static pthread_mutex_t lastFreepage_mutex;
|
static pthread_mutex_t lastFreepage_mutex;
|
||||||
|
pthread_mutex_t add_pending_mutex;
|
||||||
|
|
||||||
static unsigned int lastFreepage = 0;
|
static unsigned int lastFreepage = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -77,7 +79,7 @@ static unsigned int lastFreepage = 0;
|
||||||
*/
|
*/
|
||||||
Page * loadPage(int pageid);
|
Page * loadPage(int pageid);
|
||||||
|
|
||||||
|
pthread_cond_t addPendingOK;
|
||||||
|
|
||||||
int bufInit() {
|
int bufInit() {
|
||||||
|
|
||||||
|
@ -89,7 +91,9 @@ int bufInit() {
|
||||||
|
|
||||||
lastFreepage = 0;
|
lastFreepage = 0;
|
||||||
pthread_mutex_init(&lastFreepage_mutex , NULL);
|
pthread_mutex_init(&lastFreepage_mutex , NULL);
|
||||||
|
pthread_cond_init(&addPendingOK, NULL);
|
||||||
|
pthread_mutex_init(&add_pending_mutex, NULL);
|
||||||
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -121,13 +125,13 @@ Page * loadPage (int pageid) {
|
||||||
|
|
||||||
Page * lastRallocPage = 0;
|
Page * lastRallocPage = 0;
|
||||||
|
|
||||||
|
/** @todo ralloc ignores it's xid parameter; change the interface? */
|
||||||
recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
|
recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
|
||||||
|
|
||||||
recordid ret;
|
recordid ret;
|
||||||
Page * p;
|
Page * p;
|
||||||
|
|
||||||
DEBUG("Rallocing record of size %ld\n", (long int)size);
|
/* DEBUG("Rallocing record of size %ld\n", (long int)size); */
|
||||||
|
|
||||||
assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT);
|
assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT);
|
||||||
|
|
||||||
|
@ -140,7 +144,7 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) {
|
||||||
unlock(p->loadlatch);
|
unlock(p->loadlatch);
|
||||||
pthread_mutex_unlock(&lastFreepage_mutex);
|
pthread_mutex_unlock(&lastFreepage_mutex);
|
||||||
|
|
||||||
DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size);
|
/* DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); */
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -233,18 +237,35 @@ void setSlotType(int pageid, int slot, int type) {
|
||||||
*/
|
*/
|
||||||
void addPendingEvent(int pageid){
|
void addPendingEvent(int pageid){
|
||||||
|
|
||||||
Page * p = loadPage(pageid);
|
Page * p;
|
||||||
|
|
||||||
pthread_mutex_lock(&(p->pending_mutex));
|
p = loadPage(pageid);
|
||||||
|
|
||||||
assert(!(p->waiting));
|
pthread_mutex_lock(&add_pending_mutex);
|
||||||
|
|
||||||
|
while(p->waiting) {
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&add_pending_mutex);
|
||||||
|
|
||||||
|
unlock(p->loadlatch);
|
||||||
|
DEBUG("B");
|
||||||
|
pthread_mutex_lock(&add_pending_mutex);
|
||||||
|
pthread_cond_wait(&addPendingOK, &add_pending_mutex);
|
||||||
|
pthread_mutex_unlock(&add_pending_mutex);
|
||||||
|
|
||||||
|
p = loadPage(pageid);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&add_pending_mutex);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
p->pending++;
|
p->pending++;
|
||||||
|
|
||||||
pthread_mutex_unlock(&(p->pending_mutex));
|
pthread_mutex_unlock(&add_pending_mutex);
|
||||||
|
|
||||||
unlock(p->loadlatch);
|
unlock(p->loadlatch);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -262,12 +283,14 @@ void addPendingEvent(int pageid){
|
||||||
*/
|
*/
|
||||||
void removePendingEvent(int pageid) {
|
void removePendingEvent(int pageid) {
|
||||||
|
|
||||||
Page * p = loadPage(pageid);
|
Page * p;
|
||||||
|
|
||||||
pthread_mutex_lock(&(p->pending_mutex));
|
p = loadPage(pageid);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&(add_pending_mutex));
|
||||||
p->pending--;
|
p->pending--;
|
||||||
|
|
||||||
|
assert(p->id == pageid);
|
||||||
assert(p->pending >= 0);
|
assert(p->pending >= 0);
|
||||||
|
|
||||||
if(p->waiting && !p->pending) {
|
if(p->waiting && !p->pending) {
|
||||||
|
@ -275,9 +298,11 @@ void removePendingEvent(int pageid) {
|
||||||
pthread_cond_signal(&(p->noMorePending));
|
pthread_cond_signal(&(p->noMorePending));
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&(p->pending_mutex));
|
pthread_mutex_unlock(&(add_pending_mutex));
|
||||||
|
|
||||||
unlock(p->loadlatch);
|
unlock(p->loadlatch);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -114,7 +114,38 @@ int __lladd_pthread_mutex_destroy(lladd_pthread_mutex_t *mutex) {
|
||||||
*/
|
*/
|
||||||
int __lladd_pthread_cond_wait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex,
|
int __lladd_pthread_cond_wait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex,
|
||||||
char * file, int line, char * cond_name, char * mutex_name) {
|
char * file, int line, char * cond_name, char * mutex_name) {
|
||||||
return pthread_cond_wait(cond, &mutex->mutex);
|
int ret;
|
||||||
|
char * location;
|
||||||
|
int location_length;
|
||||||
|
|
||||||
|
profile_tuple * tup = pblHtLookup(mutex->lockpoints, mutex->last_acquired_at, strlen(mutex->last_acquired_at)+1);
|
||||||
|
|
||||||
|
released_lock(tup);
|
||||||
|
released_lock(&(mutex->tup));
|
||||||
|
|
||||||
|
free(mutex->last_acquired_at);
|
||||||
|
|
||||||
|
ret = pthread_cond_wait(cond, &mutex->mutex);
|
||||||
|
|
||||||
|
location_length = asprintf(&location, "%s %d", file, line);
|
||||||
|
|
||||||
|
tup = pblHtLookup(mutex->lockpoints, location, location_length+1);
|
||||||
|
|
||||||
|
mutex->last_acquired_at = location;
|
||||||
|
|
||||||
|
if(!tup) {
|
||||||
|
tup = malloc(sizeof(profile_tuple));
|
||||||
|
|
||||||
|
init_tuple(tup);
|
||||||
|
|
||||||
|
pblHtInsert(mutex->lockpoints, location, location_length+1, tup);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
acquired_lock(&(mutex->tup), 0);
|
||||||
|
acquired_lock(tup, 0);
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int __lladd_pthread_cond_timedwait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, void *abstime,
|
int __lladd_pthread_cond_timedwait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, void *abstime,
|
||||||
|
|
|
@ -7,6 +7,8 @@
|
||||||
#include <lladd/bufferManager.h>
|
#include <lladd/bufferManager.h>
|
||||||
#include "../blobManager.h"
|
#include "../blobManager.h"
|
||||||
/**
|
/**
|
||||||
|
@file
|
||||||
|
|
||||||
Implementation of Talloc() as an operation
|
Implementation of Talloc() as an operation
|
||||||
|
|
||||||
This is a bit strange compared to other operations, as it happens
|
This is a bit strange compared to other operations, as it happens
|
||||||
|
@ -20,6 +22,10 @@
|
||||||
space alloced during the crash is leaked. This doesn't seem to be
|
space alloced during the crash is leaked. This doesn't seem to be
|
||||||
too big of a deal, but it should be fixed someday. A more serious
|
too big of a deal, but it should be fixed someday. A more serious
|
||||||
problem results from crashes during blob allocation.
|
problem results from crashes during blob allocation.
|
||||||
|
|
||||||
|
@ingroup OPERATIONS
|
||||||
|
|
||||||
|
$Id$
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
|
@ -600,7 +600,7 @@ Page *pageAlloc(int id) {
|
||||||
page->rwlatch = initlock();
|
page->rwlatch = initlock();
|
||||||
page->loadlatch = initlock();
|
page->loadlatch = initlock();
|
||||||
|
|
||||||
pthread_mutex_init(&page->pending_mutex, NULL);
|
/* pthread_mutex_init(&page->pending_mutex, NULL);*/
|
||||||
pthread_cond_init(&page->noMorePending, NULL);
|
pthread_cond_init(&page->noMorePending, NULL);
|
||||||
|
|
||||||
page->memAddr = malloc(PAGE_SIZE);
|
page->memAddr = malloc(PAGE_SIZE);
|
||||||
|
@ -614,7 +614,8 @@ Page *pageAlloc(int id) {
|
||||||
pthread_mutex_unlock(&pageAllocMutex);
|
pthread_mutex_unlock(&pageAllocMutex);
|
||||||
|
|
||||||
|
|
||||||
|
page->pending = 0;
|
||||||
|
page->waiting = 0;
|
||||||
|
|
||||||
return page;
|
return page;
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,8 +125,6 @@ typedef struct Page_s {
|
||||||
this properly, and there are no read-only functions for the
|
this properly, and there are no read-only functions for the
|
||||||
pending field. */
|
pending field. */
|
||||||
|
|
||||||
pthread_mutex_t pending_mutex; /* pthread_mutex_t */
|
|
||||||
|
|
||||||
pthread_cond_t noMorePending; /* pthread_cond_t */
|
pthread_cond_t noMorePending; /* pthread_cond_t */
|
||||||
|
|
||||||
int waiting;
|
int waiting;
|
||||||
|
@ -154,6 +152,8 @@ typedef struct Page_s {
|
||||||
int pending;
|
int pending;
|
||||||
} Page;
|
} Page;
|
||||||
|
|
||||||
|
extern pthread_cond_t addPendingOK;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* initializes all the important variables needed in all the
|
* initializes all the important variables needed in all the
|
||||||
* functions dealing with pages.
|
* functions dealing with pages.
|
||||||
|
|
|
@ -213,9 +213,10 @@ Page * getPage(int pageid, int locktype) {
|
||||||
|
|
||||||
ret = pblHtLookup(activePages, &pageid, sizeof(int));
|
ret = pblHtLookup(activePages, &pageid, sizeof(int));
|
||||||
|
|
||||||
// Unfortunately, this is a heuristic, as a race condition exists.
|
/* Unfortunately, this is a heuristic, as a race condition exists.
|
||||||
// (Until we obtain a readlock on ret, we have no way of knowing if
|
(Until we obtain a readlock on ret, we have no way of knowing if
|
||||||
// we've gotten the correct page.)
|
we've gotten the correct page.) */
|
||||||
|
|
||||||
if(ret) {
|
if(ret) {
|
||||||
cacheHitOnPage(ret);
|
cacheHitOnPage(ret);
|
||||||
assert(ret->id == -1 || ret->id == pageid);
|
assert(ret->id == -1 || ret->id == pageid);
|
||||||
|
@ -227,7 +228,7 @@ Page * getPage(int pageid, int locktype) {
|
||||||
ret = dummy_page;
|
ret = dummy_page;
|
||||||
}
|
}
|
||||||
|
|
||||||
writelock(ret->loadlatch, 217);
|
readlock(ret->loadlatch, 217);
|
||||||
|
|
||||||
while(ret->id != pageid) { /* Either we got a stale mapping from the HT, or no mapping at all. */
|
while(ret->id != pageid) { /* Either we got a stale mapping from the HT, or no mapping at all. */
|
||||||
|
|
||||||
|
@ -264,7 +265,7 @@ Page * getPage(int pageid, int locktype) {
|
||||||
|
|
||||||
pthread_mutex_unlock(&loadPagePtr_mutex);
|
pthread_mutex_unlock(&loadPagePtr_mutex);
|
||||||
|
|
||||||
writelock(ret->loadlatch, 217);
|
readlock(ret->loadlatch, 217);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,10 @@
|
||||||
#include "logger/logWriter.h"
|
#include "logger/logWriter.h"
|
||||||
|
|
||||||
static FILE * stable = NULL;
|
static FILE * stable = NULL;
|
||||||
|
/** Defined in bufferManager.c */
|
||||||
|
extern pthread_mutex_t add_pending_mutex;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
This function blocks until there are no events pending for this page.
|
This function blocks until there are no events pending for this page.
|
||||||
|
@ -21,15 +25,16 @@ static FILE * stable = NULL;
|
||||||
|
|
||||||
|
|
||||||
void finalize(Page * p) {
|
void finalize(Page * p) {
|
||||||
pthread_mutex_lock(&(p->pending_mutex));
|
pthread_mutex_lock(&(add_pending_mutex));
|
||||||
p->waiting++;
|
p->waiting++;
|
||||||
|
|
||||||
while(p->pending) {
|
while(p->pending) {
|
||||||
|
DEBUG("A");
|
||||||
pthread_cond_wait(&(p->noMorePending), &(p->pending_mutex));
|
pthread_cond_wait(&(p->noMorePending), &(add_pending_mutex));
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&(p->pending_mutex));
|
pthread_mutex_unlock(&(add_pending_mutex));
|
||||||
|
pthread_cond_broadcast(&addPendingOK);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -90,6 +95,8 @@ void pageWrite(Page * ret) {
|
||||||
long pageoffset = ret->id * PAGE_SIZE;
|
long pageoffset = ret->id * PAGE_SIZE;
|
||||||
long offset ;
|
long offset ;
|
||||||
|
|
||||||
|
assert(ret->pending == 0);
|
||||||
|
|
||||||
if(flushedLSN() < pageReadLSN(ret)) {
|
if(flushedLSN() < pageReadLSN(ret)) {
|
||||||
DEBUG("pageWrite is calling syncLog()!\n");
|
DEBUG("pageWrite is calling syncLog()!\n");
|
||||||
syncLog();
|
syncLog();
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
#include <config.h>
|
#include <config.h>
|
||||||
#include <lladd/common.h>
|
#include <lladd/common.h>
|
||||||
|
#include "latches.h"
|
||||||
#include <lladd/transactional.h>
|
#include <lladd/transactional.h>
|
||||||
|
|
||||||
#include <lladd/recovery.h>
|
#include <lladd/recovery.h>
|
||||||
|
@ -14,6 +14,16 @@
|
||||||
TransactionLog XactionTable[MAX_TRANSACTIONS];
|
TransactionLog XactionTable[MAX_TRANSACTIONS];
|
||||||
int numActiveXactions = 0;
|
int numActiveXactions = 0;
|
||||||
int xidCount = 0;
|
int xidCount = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
Locking for transactional2.c works as follows:
|
||||||
|
|
||||||
|
numActiveXactions, xidCount are protected, XactionTable is not.
|
||||||
|
This implies that we do not support multi-threaded transactions,
|
||||||
|
at least for now.
|
||||||
|
*/
|
||||||
|
pthread_mutex_t transactional_2_mutex;
|
||||||
|
|
||||||
#define INVALID_XTABLE_XID -1
|
#define INVALID_XTABLE_XID -1
|
||||||
|
|
||||||
/** Needed for debugging -- sometimes we don't want to run all of Tinit() */
|
/** Needed for debugging -- sometimes we don't want to run all of Tinit() */
|
||||||
|
@ -33,6 +43,8 @@ void setupOperationsTable() {
|
||||||
|
|
||||||
int Tinit() {
|
int Tinit() {
|
||||||
|
|
||||||
|
pthread_mutex_init(&transactional_2_mutex, NULL);
|
||||||
|
|
||||||
setupOperationsTable();
|
setupOperationsTable();
|
||||||
|
|
||||||
/* pageInit(); */
|
/* pageInit(); */
|
||||||
|
@ -42,6 +54,7 @@ int Tinit() {
|
||||||
|
|
||||||
InitiateRecovery();
|
InitiateRecovery();
|
||||||
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,6 +62,9 @@ int Tinit() {
|
||||||
int Tbegin() {
|
int Tbegin() {
|
||||||
|
|
||||||
int i, index = 0;
|
int i, index = 0;
|
||||||
|
int xidCount_tmp;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&transactional_2_mutex);
|
||||||
|
|
||||||
if( numActiveXactions == MAX_TRANSACTIONS )
|
if( numActiveXactions == MAX_TRANSACTIONS )
|
||||||
return EXCEED_MAX_TRANSACTIONS;
|
return EXCEED_MAX_TRANSACTIONS;
|
||||||
|
@ -63,16 +79,26 @@ int Tbegin() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert( i < MAX_TRANSACTIONS );
|
xidCount_tmp = xidCount;
|
||||||
|
/* Don't want to block while we're logging... */
|
||||||
|
pthread_mutex_unlock(&transactional_2_mutex);
|
||||||
|
|
||||||
XactionTable[index] = LogTransBegin(xidCount);
|
assert( i < MAX_TRANSACTIONS );
|
||||||
|
|
||||||
|
XactionTable[index] = LogTransBegin(xidCount_tmp);
|
||||||
|
|
||||||
return XactionTable[index].xid;
|
return XactionTable[index].xid;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Tupdate(int xid, recordid rid, const void *dat, int op) {
|
void Tupdate(int xid, recordid rid, const void *dat, int op) {
|
||||||
LogEntry * e;
|
LogEntry * e;
|
||||||
|
|
||||||
|
#ifdef DEBUGGING
|
||||||
|
pthread_mutex_lock(&transactional_2_mutex);
|
||||||
assert(numActiveXactions <= MAX_TRANSACTIONS);
|
assert(numActiveXactions <= MAX_TRANSACTIONS);
|
||||||
|
pthread_mutex_unlock(&transactional_2_mutex);
|
||||||
|
#endif
|
||||||
|
|
||||||
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], rid, op, dat);
|
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], rid, op, dat);
|
||||||
|
|
||||||
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
|
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
|
||||||
|
@ -89,12 +115,20 @@ void Tread(int xid, recordid rid, void * dat) {
|
||||||
|
|
||||||
int Tcommit(int xid) {
|
int Tcommit(int xid) {
|
||||||
lsn_t lsn;
|
lsn_t lsn;
|
||||||
|
#ifdef DEBUGGING
|
||||||
|
pthread_mutex_lock(&transactional_2_mutex);
|
||||||
assert(numActiveXactions <= MAX_TRANSACTIONS);
|
assert(numActiveXactions <= MAX_TRANSACTIONS);
|
||||||
|
pthread_mutex_unlock(&transactional_2_mutex);
|
||||||
|
#endif
|
||||||
|
|
||||||
lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]);
|
lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]);
|
||||||
bufTransCommit(xid, lsn); /* unlocks pages */
|
bufTransCommit(xid, lsn); /* unlocks pages */
|
||||||
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
|
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
|
||||||
|
pthread_mutex_lock(&transactional_2_mutex);
|
||||||
numActiveXactions--;
|
numActiveXactions--;
|
||||||
assert( numActiveXactions >= 0 );
|
assert( numActiveXactions >= 0 );
|
||||||
|
pthread_mutex_unlock(&transactional_2_mutex);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,9 +141,11 @@ int Tabort(int xid) {
|
||||||
bufTransAbort(xid, lsn);
|
bufTransAbort(xid, lsn);
|
||||||
|
|
||||||
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
|
XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID;
|
||||||
numActiveXactions--;
|
|
||||||
|
|
||||||
|
pthread_mutex_lock(&transactional_2_mutex);
|
||||||
|
numActiveXactions--;
|
||||||
assert( numActiveXactions >= 0 );
|
assert( numActiveXactions >= 0 );
|
||||||
|
pthread_mutex_unlock(&transactional_2_mutex);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,10 +177,14 @@ void Trevive(int xid, long lsn) {
|
||||||
} else {
|
} else {
|
||||||
XactionTable[index].xid = xid;
|
XactionTable[index].xid = xid;
|
||||||
XactionTable[index].prevLSN = lsn;
|
XactionTable[index].prevLSN = lsn;
|
||||||
|
pthread_mutex_lock(&transactional_2_mutex);
|
||||||
numActiveXactions++;
|
numActiveXactions++;
|
||||||
|
pthread_mutex_unlock(&transactional_2_mutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TsetXIDCount(int xid) {
|
void TsetXIDCount(int xid) {
|
||||||
|
pthread_mutex_lock(&transactional_2_mutex);
|
||||||
xidCount = xid;
|
xidCount = xid;
|
||||||
|
pthread_mutex_unlock(&transactional_2_mutex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,45 +17,104 @@
|
||||||
#define LOG_NAME "check_bufferMananger.log"
|
#define LOG_NAME "check_bufferMananger.log"
|
||||||
|
|
||||||
#define NUM_PAGES 1000
|
#define NUM_PAGES 1000
|
||||||
#define THREAD_COUNT 5
|
#define THREAD_COUNT 25
|
||||||
#define READS_PER_THREAD 50000
|
#define READS_PER_THREAD 10000
|
||||||
|
#define RECORDS_PER_THREAD 10000
|
||||||
|
#define RECORD_THREAD_COUNT 25
|
||||||
void initializePages() {
|
void initializePages() {
|
||||||
|
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
|
printf("Initialization starting\n"); fflush(NULL);
|
||||||
|
|
||||||
for(i = 0 ; i < NUM_PAGES; i++) {
|
for(i = 0 ; i < NUM_PAGES; i++) {
|
||||||
|
Page * p;
|
||||||
recordid rid;
|
recordid rid;
|
||||||
rid.page = i;
|
rid.page = i;
|
||||||
rid.slot = 0;
|
rid.slot = 0;
|
||||||
rid.size = sizeof(int);
|
rid.size = sizeof(int);
|
||||||
|
p = loadPage(rid.page);
|
||||||
|
assert(p->id != -1);
|
||||||
|
pageSlotRalloc(p, 0, rid);
|
||||||
|
/* addPendingEvent(rid.page); */
|
||||||
writeRecord(1, 1, rid, &i);
|
writeRecord(1, 1, rid, &i);
|
||||||
|
/* removePendingEvent(rid.page); */
|
||||||
|
assert(p->pending == 0);
|
||||||
|
unlock(p->loadlatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
printf("Initialization complete.\n"); fflush(NULL);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void * workerThread(void * p) {
|
void * workerThread(void * p) {
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for(i = 0 ; i < READS_PER_THREAD; i++) {
|
for(i = 0 ; i < READS_PER_THREAD; i++) {
|
||||||
recordid rid;
|
recordid rid;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0));
|
int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0));
|
||||||
|
|
||||||
if(! (i % 5000) ) {
|
if(! (i % 500) ) {
|
||||||
printf("%d", i / 5000); fflush(NULL);
|
printf("%d", i / 500); fflush(NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
rid.page = k;
|
rid.page = k;
|
||||||
rid.slot = 0;
|
rid.slot = 0;
|
||||||
rid.size = sizeof(int);
|
rid.size = sizeof(int);
|
||||||
|
|
||||||
|
addPendingEvent(rid.page);
|
||||||
readRecord(1, rid, &j);
|
readRecord(1, rid, &j);
|
||||||
|
assert(rid.page == k);
|
||||||
|
removePendingEvent(rid.page);
|
||||||
assert(k == j);
|
assert(k == j);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void * workerThreadWriting(void * p) {
|
||||||
|
|
||||||
|
int offset = *(int*)p;
|
||||||
|
recordid rids[RECORDS_PER_THREAD];
|
||||||
|
for(int i = 0 ; i < RECORDS_PER_THREAD; i++) {
|
||||||
|
rids[i] = ralloc(1, sizeof(int));
|
||||||
|
}
|
||||||
|
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
|
||||||
|
int val = i + offset;
|
||||||
|
int oldpage = rids[i].page;
|
||||||
|
addPendingEvent(rids[i].page);
|
||||||
|
writeRecord(1, 0, rids[i], &val);
|
||||||
|
assert(oldpage == rids[i].page);
|
||||||
|
removePendingEvent(rids[i].page);
|
||||||
|
|
||||||
|
if(! (i % 1000) ) {
|
||||||
|
printf("W%d", i / 1000); fflush(NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
|
||||||
|
int val;
|
||||||
|
|
||||||
|
addPendingEvent(rids[i].page);
|
||||||
|
readRecord(1, rids[i], &val);
|
||||||
|
|
||||||
|
if(! (i % 1000) ) {
|
||||||
|
printf("R%d", i / 1000); fflush(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
assert(val == i+offset);
|
||||||
|
|
||||||
|
removePendingEvent(rids[i].page);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
START_TEST(pageSingleThreadTest) {
|
START_TEST(pageSingleThreadTest) {
|
||||||
Tinit();
|
Tinit();
|
||||||
|
|
||||||
|
@ -101,15 +160,43 @@ START_TEST(pageLoadTest) {
|
||||||
Tdeinit();
|
Tdeinit();
|
||||||
} END_TEST
|
} END_TEST
|
||||||
|
|
||||||
|
START_TEST(pageSingleThreadWriterTest) {
|
||||||
|
int i = 100;
|
||||||
|
|
||||||
|
Tinit();
|
||||||
|
|
||||||
|
workerThreadWriting(&i);
|
||||||
|
|
||||||
|
Tdeinit();
|
||||||
|
}END_TEST
|
||||||
|
|
||||||
|
START_TEST(pageThreadedWritersTest) {
|
||||||
|
pthread_t workers[RECORD_THREAD_COUNT];
|
||||||
|
int i;
|
||||||
|
|
||||||
|
Tinit();
|
||||||
|
|
||||||
|
for(i = 0; i < RECORD_THREAD_COUNT; i++) {
|
||||||
|
pthread_create(&workers[i], NULL, workerThreadWriting, &i);
|
||||||
|
}
|
||||||
|
for(i = 0; i < RECORD_THREAD_COUNT; i++) {
|
||||||
|
pthread_join(workers[i], NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
Tdeinit();
|
||||||
|
}END_TEST
|
||||||
|
|
||||||
Suite * check_suite(void) {
|
Suite * check_suite(void) {
|
||||||
Suite *s = suite_create("logWriter");
|
Suite *s = suite_create("bufferManager");
|
||||||
/* Begin a new test */
|
/* Begin a new test */
|
||||||
TCase *tc = tcase_create("writeNew");
|
TCase *tc = tcase_create("multithreaded");
|
||||||
|
|
||||||
/* Sub tests are added, one per line, here */
|
/* Sub tests are added, one per line, here */
|
||||||
|
|
||||||
/*tcase_add_test(tc, pageSingleThreadTest); */
|
tcase_add_test(tc, pageSingleThreadTest);
|
||||||
tcase_add_test(tc, pageLoadTest);
|
tcase_add_test(tc, pageLoadTest);
|
||||||
|
tcase_add_test(tc, pageSingleThreadWriterTest);
|
||||||
|
tcase_add_test(tc, pageThreadedWritersTest);
|
||||||
|
|
||||||
/* --------------------------------------------- */
|
/* --------------------------------------------- */
|
||||||
|
|
||||||
|
|
|
@ -39,12 +39,63 @@ permission to use and distribute the software in accordance with the
|
||||||
terms specified in this license.
|
terms specified in this license.
|
||||||
---*/
|
---*/
|
||||||
#include <config.h>
|
#include <config.h>
|
||||||
|
#include <lladd/common.h>
|
||||||
|
#include <../../src/lladd/latches.h>
|
||||||
#include <check.h>
|
#include <check.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
#include <lladd/transactional.h>
|
#include <lladd/transactional.h>
|
||||||
#include "../check_includes.h"
|
#include "../check_includes.h"
|
||||||
#define LOG_NAME "check_transactional2.log"
|
#define LOG_NAME "check_transactional2.log"
|
||||||
|
#define THREAD_COUNT 25
|
||||||
|
#define RECORDS_PER_THREAD 10000
|
||||||
|
|
||||||
|
/** Allocate a bunch of stuff, set it, read it, commit it, and read it again. */
|
||||||
|
void * writingWorkerThread ( void * v ) {
|
||||||
|
int offset = * (int *) v;
|
||||||
|
recordid * rids = malloc(RECORDS_PER_THREAD * sizeof(recordid));
|
||||||
|
int xid = Tbegin();
|
||||||
|
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
|
||||||
|
rids[i] = Talloc(xid, sizeof(int));
|
||||||
|
if(! (i %100)) {
|
||||||
|
printf("A%d", i/100);fflush(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
|
||||||
|
int tmp = i + offset;
|
||||||
|
Tset(xid, rids[i], &tmp);
|
||||||
|
if(! (i %100)) {
|
||||||
|
printf("W%d", i/100); fflush(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
|
||||||
|
int j;
|
||||||
|
Tread(xid, rids[i], &j);
|
||||||
|
assert(i + offset == j);
|
||||||
|
if(! (i %100)) {
|
||||||
|
printf("R%d", i/100);fflush(NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Tcommit(xid);
|
||||||
|
|
||||||
|
xid = Tbegin();
|
||||||
|
|
||||||
|
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
|
||||||
|
int j;
|
||||||
|
Tread(xid, rids[i], &j);
|
||||||
|
assert(i + offset == j);
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Assuming that the Tset() operation is implemented correctly, checks
|
Assuming that the Tset() operation is implemented correctly, checks
|
||||||
that doUpdate, redoUpdate and undoUpdate are working correctly, for
|
that doUpdate, redoUpdate and undoUpdate are working correctly, for
|
||||||
|
@ -142,6 +193,33 @@ START_TEST(transactional_blobSmokeTest) {
|
||||||
}
|
}
|
||||||
END_TEST
|
END_TEST
|
||||||
|
|
||||||
|
START_TEST(transactional_nothreads_commit) {
|
||||||
|
int five = 5;
|
||||||
|
Tinit();
|
||||||
|
writingWorkerThread(&five);
|
||||||
|
Tdeinit();
|
||||||
|
} END_TEST
|
||||||
|
|
||||||
|
START_TEST(transactional_threads_commit) {
|
||||||
|
pthread_t workers[THREAD_COUNT];
|
||||||
|
int i;
|
||||||
|
|
||||||
|
Tinit();
|
||||||
|
|
||||||
|
for(i = 0; i < THREAD_COUNT; i++) {
|
||||||
|
int arg = i + 100;
|
||||||
|
pthread_create(&workers[i], NULL, writingWorkerThread, &arg);
|
||||||
|
|
||||||
|
}
|
||||||
|
for(i = 0; i < THREAD_COUNT; i++) {
|
||||||
|
pthread_join(workers[i], NULL);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
Tdeinit();
|
||||||
|
} END_TEST
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Add suite declarations here
|
Add suite declarations here
|
||||||
*/
|
*/
|
||||||
|
@ -153,6 +231,9 @@ Suite * check_suite(void) {
|
||||||
/* Sub tests are added, one per line, here */
|
/* Sub tests are added, one per line, here */
|
||||||
tcase_add_test(tc, transactional_smokeTest);
|
tcase_add_test(tc, transactional_smokeTest);
|
||||||
tcase_add_test(tc, transactional_blobSmokeTest);
|
tcase_add_test(tc, transactional_blobSmokeTest);
|
||||||
|
tcase_add_test(tc, transactional_nothreads_commit);
|
||||||
|
tcase_add_test(tc, transactional_threads_commit);
|
||||||
|
/** @todo still need to make blobs reentrant! */
|
||||||
/* --------------------------------------------- */
|
/* --------------------------------------------- */
|
||||||
tcase_add_checked_fixture(tc, setup, teardown);
|
tcase_add_checked_fixture(tc, setup, teardown);
|
||||||
suite_add_tcase(s, tc);
|
suite_add_tcase(s, tc);
|
||||||
|
|
Loading…
Reference in a new issue