Refactored bufferMananger / pageCache, so that bufferMananger handles loading an unloading pages, and pageCache only implements the replacement policy.

Also, used memprof to detect and remove all memory leaks (at least all memory leaks that are encountered by check_transactional2), and fixed a number of misuses of the pblHash.
This commit is contained in:
Sears Russell 2004-07-27 01:04:35 +00:00
parent e53117a1a0
commit 54ba9b0347
13 changed files with 287 additions and 212 deletions

View file

@ -99,8 +99,8 @@ extern int errno;
#define lsn_t long #define lsn_t long
/*#define DEBUGGING */ /*#define DEBUGGING
/*#define PROFILE_LATCHES */ #define PROFILE_LATCHES */
#ifdef DEBUGGING #ifdef DEBUGGING
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */ /** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */

View file

@ -22,11 +22,25 @@ Page * getPage(int pageid, int locktype);
Note that pageCache does not perform any file I/O of its own. Note that pageCache does not perform any file I/O of its own.
@todo pageCache should not include page.h at all. It should treat @todo pageCache should not include page.h at all. It should treat
pages as (int, void*) pairs. pages as (int, void*) pairs. (But the page struct contains the
pointers that pageCache manipulates..)
*/ */
void pageCacheInit(); void pageCacheInit(Page * first);
void pageCacheDeinit(); void pageCacheDeinit();
/*Page * loadPage(int pageid); */
void cacheHitOnPage(Page * ret);
void cacheRemovePage(Page * ret);
void cacheInsertPage (Page * ret);
/** Return a page that is a reasonable candidate for replacement. This
function does not actually remove the page from cache.*/
Page * cacheStalePage();
#define INITIAL 0
#define FULL 1
extern int cache_state;
#endif #endif

View file

@ -146,6 +146,9 @@ void deletelock (rwl *lock)
pthread_mutex_destroy (lock->mut); pthread_mutex_destroy (lock->mut);
pthread_cond_destroy (lock->readOK); pthread_cond_destroy (lock->readOK);
pthread_cond_destroy (lock->writeOK); pthread_cond_destroy (lock->writeOK);
free(lock->mut);
free(lock->writeOK);
free(lock->readOK);
free (lock); free (lock);
return; return;

View file

@ -68,6 +68,7 @@ static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) {
if(xidHash == NULL) { if(xidHash == NULL) {
xidHash = pblHtCreate(); xidHash = pblHtCreate();
DEBUG("New blob xact: xid = %d\n", xid);
pblHtInsert(dirtyBlobs, &xid, sizeof(int), xidHash); pblHtInsert(dirtyBlobs, &xid, sizeof(int), xidHash);
} }
@ -152,6 +153,16 @@ void closeBlobStore() {
assert(!ret); assert(!ret);
blobf0 = NULL; blobf0 = NULL;
blobf1 = NULL; blobf1 = NULL;
pblHashTable_t * xidhash;
for(xidhash = pblHtFirst(dirtyBlobs); xidhash; xidhash = pblHtNext(dirtyBlobs)) {
fflush(NULL);
sync();
printf("WARNING!: Found list of dirty blobs for transaction: %ld\nIt is possible that these blobs were not synced to disk properly.\n\nThe data has now been flushed to disk, but this warning could indicate a bug that could cause data corruption.", *(lsn_t*)pblHtCurrentKey(dirtyBlobs));
fflush(NULL);
sync();
pblHtRemove(dirtyBlobs, 0, 0);
}
pblHtDelete(dirtyBlobs); pblHtDelete(dirtyBlobs);
@ -383,25 +394,29 @@ void abortBlobs(int xid) {
pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int));
pblHashTable_t * this_bucket; pblHashTable_t * this_bucket;
DEBUG("Blob cleanup xid=%d\n", xid);
if(rid_buckets) { /* Otherwise, there are no dirty blobs for this xid.. */ if(rid_buckets) { /* Otherwise, there are no dirty blobs for this xid.. */
for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) { for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) {
lsn_t * rid_lsn; lsn_t * rid_lsn;
int page_number; int page_number = *(int*)pblHtCurrentKey(rid_buckets);
/* All right, this_bucket contains all of the rids for this page. */ /* All right, this_bucket contains all of the rids for this page. */
for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) { for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) {
recordid * rid = pblHtCurrentKey(this_bucket); recordid * rid = pblHtCurrentKey(this_bucket);
page_number = rid->page; /* page_number = rid->page; */
pblHtRemove(this_bucket, rid, sizeof(recordid)); assert(page_number == rid->page);
pblHtRemove(this_bucket, 0, 0);/*rid, sizeof(recordid)); */
free(rid_lsn); free(rid_lsn);
} }
pblHtRemove(rid_buckets, &page_number, sizeof(int)); pblHtRemove(rid_buckets, 0, 0);
pblHtDelete(this_bucket); pblHtDelete(this_bucket);
} }
pblHtDelete(rid_buckets); pblHtDelete(rid_buckets);
pblHtRemove(dirtyBlobs, &xid, sizeof(int));
} }
pthread_mutex_unlock(&blob_hash_mutex); pthread_mutex_unlock(&blob_hash_mutex);

View file

@ -57,6 +57,7 @@ terms specified in this license.
#include <lladd/pageCache.h> #include <lladd/pageCache.h>
#include "pageFile.h" #include "pageFile.h"
#include <pbl/pbl.h>
/** /**
Invariant: This lock should be held while updating lastFreepage, or Invariant: This lock should be held while updating lastFreepage, or
@ -69,27 +70,71 @@ terms specified in this license.
this lock. this lock.
*/ */
static pblHashTable_t *activePages; /* page lookup */
static pthread_mutex_t loadPagePtr_mutex;
static pthread_mutex_t lastFreepage_mutex; static pthread_mutex_t lastFreepage_mutex;
static unsigned int lastFreepage = 0; static unsigned int lastFreepage = 0;
static Page * dummy_page;
int bufInit() { int bufInit() {
pageInit(); pageInit();
openPageFile(); openPageFile();
pageCacheInit();
openBlobStore();
pthread_mutex_init(&lastFreepage_mutex , NULL);
pthread_mutex_init(&loadPagePtr_mutex, NULL);
activePages = pblHtCreate();
lastFreepage = 0; lastFreepage = 0;
pthread_mutex_init(&lastFreepage_mutex , NULL);
dummy_page = pageAlloc(-1);
pageRealloc(dummy_page, -1);
Page *first;
first = pageAlloc(0);
pageRealloc(first, 0);
pblHtInsert(activePages, &first->id, sizeof(int), first);
openBlobStore();
pageCacheInit(first);
assert(activePages);
return 0; return 0;
} }
void bufDeinit() { void bufDeinit() {
closeBlobStore(); closeBlobStore();
Page *p;
DEBUG("pageCacheDeinit()");
for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtNext(activePages)) {
pblHtRemove( activePages, 0, 0 )
DEBUG("+");
/** @todo No one seems to set the dirty flag... */
/*if(p->dirty && (ret = pageWrite(p)/ *flushPage(*p)* /)) {
printf("ERROR: flushPage on %s line %d", __FILE__, __LINE__);
abort();
/ * exit(ret); * /
}*/
pageWrite(p);
}
pthread_mutex_destroy(&loadPagePtr_mutex);
pblHtDelete(activePages);
pageCacheDeinit(); pageCacheDeinit();
closePageFile(); closePageFile();
pageDeInit();
return; return;
} }
/** /**
@ -110,7 +155,7 @@ void releasePage (Page * p) {
Page * lastRallocPage = 0; Page * lastRallocPage = 0;
/** @todo ralloc ignores it's xid parameter; change the interface? */ /** @todo ralloc ignores it's xid parameter; change the interface? */
recordid ralloc(int xid, /*lsn_t lsn,*/ long size) { recordid ralloc(int xid, long size) {
recordid ret; recordid ret;
Page * p; Page * p;
@ -190,3 +235,131 @@ int bufTransAbort(int xid, lsn_t lsn) {
return 0; return 0;
} }
Page * getPage(int pageid, int locktype) {
Page * ret;
int spin = 0;
pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
}
while (ret && (ret->id != pageid)) {
unlock(ret->loadlatch);
pthread_mutex_unlock(&loadPagePtr_mutex);
sched_yield();
pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
// writelock(ret->loadlatch, 217);
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
}
spin++;
if(spin > 10000) {
printf("GetPage is stuck!");
}
}
if(ret) {
cacheHitOnPage(ret);
assert(ret->id == pageid);
pthread_mutex_unlock(&loadPagePtr_mutex);
} else {
/* If ret is null, then we know that:
a) there is no cache entry for pageid
b) this is the only thread that has gotten this far,
and that will try to add an entry for pageid
c) the most recent version of this page has been
written to the OS's file cache. */
int oldid = -1;
if( cache_state == FULL ) {
/* Select an item from cache, and remove it atomicly. (So it's
only reclaimed once) */
ret = cacheStalePage();
cacheRemovePage(ret);
oldid = ret->id;
assert(oldid != pageid);
} else {
ret = pageAlloc(-1);
ret->id = -1;
ret->inCache = 0;
}
writelock(ret->loadlatch, 217);
/* Inserting this into the cache before releasing the mutex
ensures that constraint (b) above holds. */
pblHtInsert(activePages, &pageid, sizeof(int), ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
/* Could writelock(ret) go here? */
assert(ret != dummy_page);
if(ret->id != -1) {
pageWrite(ret);
}
pageRealloc(ret, pageid);
pageRead(ret);
writeunlock(ret->loadlatch);
pthread_mutex_lock(&loadPagePtr_mutex);
/* pblHtRemove(activePages, &(ret->id), sizeof(int)); */
pblHtRemove(activePages, &(oldid), sizeof(int));
/* Put off putting this back into cache until we're done with
it. -- This could cause the cache to empty out if the ratio of
threads to buffer slots is above ~ 1/3, but it decreases the
liklihood of thrashing. */
cacheInsertPage(ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
if(ret->id != pageid) {
unlock(ret->loadlatch);
printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n");
fflush(NULL);
return getPage(pageid, locktype);
}
}
assert(ret->id == pageid);
return ret;
}
Page *loadPage(int pageid) {
Page * ret = getPage(pageid, RW);
return ret;
}

View file

@ -320,6 +320,7 @@ void closeLogWriter() {
deletelock(nextAvailableLSN_lock); deletelock(nextAvailableLSN_lock);
deletelock(log_read_lock); deletelock(log_read_lock);
pthread_mutex_destroy(&log_write_mutex); pthread_mutex_destroy(&log_write_mutex);
pthread_mutex_destroy(&truncateLog_mutex);
} }
@ -356,14 +357,17 @@ static LogEntry * readLogEntry() {
if(nmemb != 1) { if(nmemb != 1) {
/* Partial log entry. */ /* Partial log entry. */
if(feof(log)) { if(feof(log)) {
free(ret);
return NULL; return NULL;
} }
if(ferror(log)) { if(ferror(log)) {
perror("Error reading log!"); perror("Error reading log!");
free(ret);
assert(0); assert(0);
return 0; return 0;
} }
assert(0); assert(0);
free(ret);
return 0; return 0;
} }

View file

@ -406,6 +406,21 @@ void pageInit() {
pthread_mutex_init(&pageAllocMutex, NULL); pthread_mutex_init(&pageAllocMutex, NULL);
for(int i = 0; i < MAX_BUFFER_SIZE+1; i++) {
pool[i].rwlatch = initlock();
pool[i].loadlatch = initlock();
pool[i].memAddr = malloc(PAGE_SIZE);
}
}
void pageDeInit() {
for(int i = 0; i < MAX_BUFFER_SIZE+1; i++) {
deletelock(pool[i].rwlatch);
deletelock(pool[i].loadlatch);
free(pool[i].memAddr);
}
} }
typedef struct { typedef struct {
@ -690,10 +705,10 @@ Page *pageAlloc(int id) {
/* We have an implicit lock on rwlatch, since we allocated it, but /* We have an implicit lock on rwlatch, since we allocated it, but
haven't returned yet. */ haven't returned yet. */
page->rwlatch = initlock(); /* page->rwlatch = initlock();
page->loadlatch = initlock(); page->loadlatch = initlock();
page->memAddr = malloc(PAGE_SIZE); page->memAddr = malloc(PAGE_SIZE); */
nextPage++; nextPage++;
assert(nextPage <= MAX_BUFFER_SIZE + 1); /* There's a dummy page that we need to keep around, thus the +1 */ assert(nextPage <= MAX_BUFFER_SIZE + 1); /* There's a dummy page that we need to keep around, thus the +1 */

View file

@ -192,6 +192,8 @@ struct Page_s {
*/ */
void pageInit(); void pageInit();
void pageDeInit();
/** /**
* assumes that the page is already loaded in memory. It takes * assumes that the page is already loaded in memory. It takes
* as a parameter a Page. The Page struct contains the new LSN and the page * as a parameter a Page. The Page struct contains the new LSN and the page
@ -258,6 +260,8 @@ void pageDeRalloc(Page * page, recordid rid);
int pageGetSlotType(Page * p, int slot, int type); int pageGetSlotType(Page * p, int slot, int type);
void pageSetSlotType(Page * p, int slot, int type); void pageSetSlotType(Page * p, int slot, int type);
Page * loadPage(int page);
Page * getPage(int page, int mode);
END_C_DECLS END_C_DECLS

View file

@ -7,53 +7,33 @@
*/ */
#include <config.h> #include <config.h>
#include <lladd/common.h> #include <lladd/common.h>
#include "latches.h"
#include "page.h" #include "page.h"
#include <lladd/pageCache.h> #include <lladd/pageCache.h>
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
#include <assert.h> #include <assert.h>
#include <pbl/pbl.h>
#include <stdio.h> #include <stdio.h>
#include "pageFile.h" #include "pageFile.h"
static pblHashTable_t *activePages; /* page lookup */
static unsigned int bufferSize; /* < MAX_BUFFER_SIZE */ static unsigned int bufferSize; /* < MAX_BUFFER_SIZE */
static Page *repHead, *repMiddle, *repTail; /* replacement policy */ static Page *repHead, *repMiddle, *repTail; /* replacement policy */
static pthread_mutex_t loadPagePtr_mutex;
#define INITIAL 0 int cache_state;
#define FULL 1
static int state;
/* These three functions are for internal use. They are not declared void pageCacheInit(Page * first) {
static so that their test case can compile. */
static void cacheHitOnPage(Page * ret);
static void cacheRemovePage(Page * ret);
static void cacheInsertPage (Page * ret);
static Page * dummy_page;
void pageCacheInit() {
Page *first;
bufferSize = 1; bufferSize = 1;
state = INITIAL; cache_state = INITIAL;
pthread_mutex_init(&loadPagePtr_mutex, NULL);
activePages = pblHtCreate();
assert(activePages);
DEBUG("pageCacheInit()"); DEBUG("pageCacheInit()");
first = pageAlloc(0);
dummy_page = pageAlloc(-1);
pageRealloc(first, 0);
pageRealloc(dummy_page, -1);
first->inCache = 1; first->inCache = 1;
pblHtInsert(activePages, &first->id, sizeof(int), first);
first->prev = first->next = NULL; first->prev = first->next = NULL;
/* pageMap(first); */ /* pageMap(first); */
@ -66,25 +46,7 @@ void pageCacheInit() {
} }
void pageCacheDeinit() { void pageCacheDeinit() {
Page *p;
DEBUG("pageCacheDeinit()");
for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtRemove( activePages, 0, 0 )) {
DEBUG("+");
/** @todo No one seems to set the dirty flag... */
/*if(p->dirty && (ret = pageWrite(p)/ *flushPage(*p)* /)) {
printf("ERROR: flushPage on %s line %d", __FILE__, __LINE__);
abort();
/ * exit(ret); * /
}*/
pageWrite(p);
}
pthread_mutex_destroy(&loadPagePtr_mutex);
pblHtDelete(activePages);
} }
static void headInsert(Page *ret) { static void headInsert(Page *ret) {
@ -101,7 +63,7 @@ static void headInsert(Page *ret) {
static void middleInsert(Page *ret) { static void middleInsert(Page *ret) {
assert(state == FULL); assert(cache_state == FULL);
/* assert( bufferSize == MAX_BUFFER_SIZE ); */ /* assert( bufferSize == MAX_BUFFER_SIZE ); */
@ -122,7 +84,7 @@ static void middleInsert(Page *ret) {
/** @todo Under high contention, the buffer pool can empty. What should be done about this, other than making sure that # threads > buffer size? */ /** @todo Under high contention, the buffer pool can empty. What should be done about this, other than making sure that # threads > buffer size? */
static void qRemove(Page *ret) { static void qRemove(Page *ret) {
assert(state == FULL); assert(cache_state == FULL);
assert(ret->next != ret && ret->prev != ret); assert(ret->next != ret && ret->prev != ret);
@ -145,150 +107,18 @@ static void qRemove(Page *ret) {
assert(ret != repHead); assert(ret != repHead);
} }
Page * getPage(int pageid, int locktype) { void cacheInsertPage (Page * ret) {
Page * ret;
int spin = 0;
pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
//writelock(ret->loadlatch, 217);
}
while (ret && (ret->id != pageid)) {
unlock(ret->loadlatch);
pthread_mutex_unlock(&loadPagePtr_mutex);
sched_yield();
pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
// writelock(ret->loadlatch, 217);
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
}
spin++;
if(spin > 10000) {
printf("GetPage is stuck!");
}
}
if(ret) {
cacheHitOnPage(ret);
assert(ret->id == pageid);
pthread_mutex_unlock(&loadPagePtr_mutex);
} else {
/* If ret is null, then we know that:
a) there is no cache entry for pageid
b) this is the only thread that has gotten this far,
and that will try to add an entry for pageid
c) the most recent version of this page has been
written to the OS's file cache. */
int oldid = -1;
if( state == FULL ) {
/* Select an item from cache, and remove it atomicly. (So it's
only reclaimed once) */
ret = repTail;
cacheRemovePage(ret);
oldid = ret->id;
assert(oldid != pageid);
} else {
ret = pageAlloc(-1);
ret->id = -1;
ret->inCache = 0;
}
writelock(ret->loadlatch, 217);
/* Inserting this into the cache before releasing the mutex
ensures that constraint (b) above holds. */
pblHtInsert(activePages, &pageid, sizeof(int), ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
/* Could writelock(ret) go here? */
assert(ret != dummy_page);
if(ret->id != -1) {
pageWrite(ret);
}
pageRealloc(ret, pageid);
pageRead(ret);
writeunlock(ret->loadlatch);
pthread_mutex_lock(&loadPagePtr_mutex);
/* pblHtRemove(activePages, &(ret->id), sizeof(int)); */
pblHtRemove(activePages, &(oldid), sizeof(int));
/* Put off putting this back into cache until we're done with
it. -- This could cause the cache to empty out if the ratio of
threads to buffer slots is above ~ 1/3, but it decreases the
liklihood of thrashing. */
cacheInsertPage(ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
/* downgradelock(ret->loadlatch); */
// writelock(ret->loadlatch, 217);
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
if(ret->id != pageid) {
unlock(ret->loadlatch);
printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n");
fflush(NULL);
return getPage(pageid, locktype);
}
/* } else {
pthread_mutex_unlock(&loadPagePtr_mutex);
} */
}
assert(ret->id == pageid);
return ret;
}
static void cacheInsertPage (Page * ret) {
bufferSize++; bufferSize++;
assert(!ret->inCache); assert(!ret->inCache);
ret->inCache ++; ret->inCache ++;
if(state == FULL) { if(cache_state == FULL) {
middleInsert(ret); middleInsert(ret);
} else { } else {
if(bufferSize == MAX_BUFFER_SIZE/* - 1*/) { /* Set up page kick mechanism. */ if(bufferSize == MAX_BUFFER_SIZE/* - 1*/) { /* Set up page kick mechanism. */
int i; int i;
Page *iter; Page *iter;
state = FULL; cache_state = FULL;
headInsert(ret); headInsert(ret);
assert(ret->next != ret && ret->prev != ret); assert(ret->next != ret && ret->prev != ret);
@ -314,16 +144,16 @@ static void cacheInsertPage (Page * ret) {
} }
} }
static void cacheRemovePage(Page * ret) { void cacheRemovePage(Page * ret) {
assert(ret->inCache); assert(ret->inCache);
qRemove(ret); qRemove(ret);
ret->inCache--; ret->inCache--;
bufferSize --; bufferSize --;
} }
static void cacheHitOnPage(Page * ret) { void cacheHitOnPage(Page * ret) {
/* The page may not be in cache if it is about to be freed. */ /* The page may not be in cache if it is about to be freed. */
if(ret->inCache && state == FULL) { /* we need to worry about page sorting */ if(ret->inCache && cache_state == FULL) { /* we need to worry about page sorting */
/* move to head */ /* move to head */
if( ret != repHead ) { if( ret != repHead ) {
qRemove(ret); qRemove(ret);
@ -341,7 +171,6 @@ static void cacheHitOnPage(Page * ret) {
} }
} }
Page *loadPage(int pageid) { Page * cacheStalePage() {
Page * ret = getPage(pageid, RW); return repTail;
return ret;
} }

View file

@ -168,6 +168,7 @@ static void Redo() {
redoUpdate(e); redoUpdate(e);
} }
} }
free(e);
} }
} }
@ -247,7 +248,9 @@ static void Undo(int recovery) {
printf ("Unknown log type to undo (TYPE=%d, XID= %d, LSN=%ld), skipping...\n", e->type, e->xid, e->LSN); printf ("Unknown log type to undo (TYPE=%d, XID= %d, LSN=%ld), skipping...\n", e->type, e->xid, e->LSN);
break; break;
} }
free(e);
} }
/* printf("$"); fflush(NULL); */ /* printf("$"); fflush(NULL); */
} }
free(prepare_guard_state); free(prepare_guard_state);

View file

@ -101,7 +101,6 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
p = loadPage(rid.page); p = loadPage(rid.page);
/* KLUDGE re-enable loggging!*/
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
@ -111,6 +110,8 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
doUpdate(e, p); doUpdate(e, p);
releasePage(p); releasePage(p);
free(e);
} }
void Tread(int xid, recordid rid, void * dat) { void Tread(int xid, recordid rid, void * dat) {
@ -124,6 +125,7 @@ int Tcommit(int xid) {
#ifdef DEBUGGING #ifdef DEBUGGING
pthread_mutex_lock(&transactional_2_mutex); pthread_mutex_lock(&transactional_2_mutex);
assert(numActiveXactions <= MAX_TRANSACTIONS); assert(numActiveXactions <= MAX_TRANSACTIONS);
pthread_mutex_unlock(&transactional_2_mutex);
#endif #endif
lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]); lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]);
@ -166,6 +168,7 @@ int Tdeinit() {
for( i = 0; i < MAX_TRANSACTIONS; i++ ) { for( i = 0; i < MAX_TRANSACTIONS; i++ ) {
if( XactionTable[i].xid != INVALID_XTABLE_XID ) { if( XactionTable[i].xid != INVALID_XTABLE_XID ) {
Tabort(XactionTable[i].xid); Tabort(XactionTable[i].xid);
printf("WARNING: Tdeinit() is aborting transaction %d\n", XactionTable[i].xid);
} }
} }
assert( numActiveXactions == 0 ); assert( numActiveXactions == 0 );

View file

@ -6,6 +6,6 @@ else
TESTS = TESTS =
endif endif
noinst_PROGRAMS = $(TESTS) noinst_PROGRAMS = $(TESTS)
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a #-lefence LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a -lefence
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log
AM_CFLAGS= -g -Wall -pedantic -std=gnu99 AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -121,7 +121,7 @@ void * writingAbortingBlobWorkerThread ( void * v ) {
} }
Tcommit(xid); Tcommit(xid);
free(rids);
return NULL; return NULL;
} }
@ -180,6 +180,8 @@ void * writingAbortingWorkerThread ( void * v ) {
Tcommit(xid); Tcommit(xid);
free (rids);
return NULL; return NULL;
} }
@ -226,6 +228,7 @@ void * writingWorkerThread ( void * v ) {
} }
Tcommit(xid); Tcommit(xid);
free(rids);
return NULL; return NULL;
} }
@ -297,9 +300,7 @@ START_TEST(transactional_blobSmokeTest) {
fail_unless(rid.size == ARRAY_SIZE * sizeof(int), NULL); fail_unless(rid.size == ARRAY_SIZE * sizeof(int), NULL);
printf("TSet starting.\n"); fflush(NULL);
Tset(xid, rid, &foo); Tset(xid, rid, &foo);
printf("TSet returning.\n"); fflush(NULL);
Tread(xid, rid, &bar); Tread(xid, rid, &bar);
@ -432,6 +433,17 @@ START_TEST(transactional_blobs_threads_abort) {
} }
Tdeinit(); Tdeinit();
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
} END_TEST } END_TEST
/** /**