Improved latch profiling, especially for loadPage.

This commit is contained in:
Sears Russell 2006-08-04 23:45:27 +00:00
parent 67dce5158f
commit 8a390c40d2
6 changed files with 259 additions and 57 deletions

View file

@ -108,6 +108,14 @@ compensated_function Page * loadPage(int xid, int pageid);
*/
void releasePage(Page * p);
#ifdef PROFILE_LATCHES_WRITE_ONLY
#define loadPage(x,y) __profile_loadPage((x), (y), __FILE__, __LINE__)
#define releasePage(x) __profile_releasePage((x))
compensated_function void __profile_releasePage(Page * p);
compensated_function Page * __profile_loadPage(int xid, int pageid, char * file, int line);
#endif
/**
* initialize buffer manager
* @return 0 on success

View file

@ -46,11 +46,21 @@ terms specified in this license.
* *************************************************/
#include <config.h>
#ifdef PROFILE_LATCHES_WRITE_ONLY
#define _GNU_SOURCE
#include <stdio.h> // Need _GNU_SOURCE for asprintf
#include <lladd/lhtable.h>
#endif
#include <lladd/common.h>
#include <latches.h>
#include <assert.h>
#include <lladd/bufferManager.h>
#include <lladd/bufferPool.h>
#include <lladd/lockManager.h>
@ -59,9 +69,31 @@ terms specified in this license.
#include <lladd/pageCache.h>
#include "pageFile.h"
#include <pbl/pbl.h>
#include <lladd/truncation.h>
#ifdef LONG_TEST
#undef loadPage
#undef releasePage
#undef Page
#ifdef LONG_TEST
#define PIN_COUNT
#endif
#ifdef PROFILE_LATCHES_WRITE_ONLY
// These should only be defined if PROFILE_LATCHES_WRITE_ONLY is set.
#undef loadPage
#undef releasePage
pthread_mutex_t profile_load_mutex = PTHREAD_MUTEX_INITIALIZER;
struct LH_ENTRY(table) * profile_load_hash = 0;
struct LH_ENTRY(table) * profile_load_pins_hash = 0;
#endif
#ifdef PIN_COUNT
pthread_mutex_t pinCount_mutex = PTHREAD_MUTEX_INITIALIZER;
int pinCount = 0;
#endif
@ -96,7 +128,10 @@ int bufInit() {
pageCacheInit(first);
assert(activePages);
#ifdef PROFILE_LATCHES_WRITE_ONLY
profile_load_hash = LH_ENTRY(create)(10);
profile_load_pins_hash = LH_ENTRY(create)(10);
#endif
return 0;
}
@ -122,7 +157,7 @@ void bufDeinit() {
closePageFile();
pageDeInit();
#ifdef LONG_TEST
#ifdef PIN_COUNT
if(pinCount != 0) {
printf("WARNING: At exit, %d pages were still pinned!\n", pinCount);
}
@ -136,14 +171,14 @@ void bufDeinit() {
void simulateBufferManagerCrash() {
closeBlobStore();
closePageFile();
#ifdef LONG_TEST
#ifdef PIN_COUNT
pinCount = 0;
#endif
}
void releasePage (Page * p) {
unlock(p->loadlatch);
#ifdef LONG_TEST
#ifdef PIN_COUNT
pthread_mutex_lock(&pinCount_mutex);
pinCount --;
pthread_mutex_unlock(&pinCount_mutex);
@ -159,11 +194,30 @@ static Page * getPage(int pageid, int locktype) {
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
#ifdef PROFILE_LATCHES_WRITE_ONLY
// "holder" will contain a \n delimited list of the sites that
// called loadPage() on the pinned page since the last time it was
// completely unpinned. One such site is responsible for the
// leak.
char * holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*));
int * pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*));
char * holderD =0;
int pinsD = 0;
if(holder) {
holderD = strdup(holder);
pinsD = *pins;
}
#endif
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
#ifdef PROFILE_LATCHES_WRITE_ONLY
if(holderD)
free(holderD);
#endif
}
while (ret && (ret->id != pageid)) {
@ -174,11 +228,31 @@ static Page * getPage(int pageid, int locktype) {
ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) {
#ifdef PROFILE_LATCHES_WRITE_ONLY
// "holder" will contain a \n delimited list of the sites that
// called loadPage() on the pinned page since the last time it was
// completely unpinned. One such site is responsible for the
// leak.
char * holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*));
int * pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*));
char * holderD = 0;
int pinsD = 0;
if(holder) {
holderD = strdup(holder);
pinsD = *pins;
}
#endif
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
#ifdef PROFILE_LATCHES_WRITE_ONLY
if(holderD)
free(holderD);
#endif
}
spin++;
if(spin > 10000) {
@ -220,7 +294,30 @@ static Page * getPage(int pageid, int locktype) {
ret->inCache = 0;
}
// If you leak a page, and it eventually gets evicted, and reused, the system deadlocks here.
#ifdef PROFILE_LATCHES_WRITE_ONLY
// "holder" will contain a \n delimited list of the sites that
// called loadPage() on the pinned page since the last time it was
// completely unpinned. One such site is responsible for the
// leak.
char * holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*));
int * pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*));
char * holderD = 0;
int pinsD = 0;
if(holder) {
holderD = strdup(holder);
pinsD = *pins;
}
#endif
writelock(ret->loadlatch, 217);
#ifdef PROFILE_LATCHES_WRITE_ONLY
if(holderD)
free(holderD);
#endif
/* Inserting this into the cache before releasing the mutex
ensures that constraint (b) above holds. */
@ -252,12 +349,29 @@ static Page * getPage(int pageid, int locktype) {
cacheInsertPage(ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
#ifdef PROFILE_LATCHES_WRITE_ONLY
// "holder" will contain a \n delimited list of the sites that
// called loadPage() on the pinned page since the last time it was
// completely unpinned. One such site is responsible for the
// leak.
holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*));
pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*));
if(holder) {
holderD = strdup(holder);
pinsD = *pins;
}
#endif
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
}
#ifdef PROFILE_LATCHES_WRITE_ONLY
if(holderD)
free(holderD);
#endif
if(ret->id != pageid) {
unlock(ret->loadlatch);
printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n");
@ -269,14 +383,80 @@ static Page * getPage(int pageid, int locktype) {
return ret;
}
#ifdef PROFILE_LATCHES_WRITE_ONLY
compensated_function Page * __profile_loadPage(int xid, int pageid, char * file, int line) {
Page * ret = loadPage(xid, pageid);
pthread_mutex_lock(&profile_load_mutex);
char * holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*));
int * pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*));
if(!pins) {
pins = malloc(sizeof(int));
*pins = 0;
LH_ENTRY(insert)(profile_load_pins_hash, &ret, sizeof(void*), pins);
}
if(*pins) {
assert(holder);
char * newHolder;
asprintf(&newHolder, "%s\n%s:%d", holder, file, line);
free(holder);
holder = newHolder;
} else {
assert(!holder);
asprintf(&holder, "%s:%d", file, line);
}
(*pins)++;
LH_ENTRY(insert)(profile_load_hash, &ret, sizeof(void*), holder);
pthread_mutex_unlock(&profile_load_mutex);
return ret;
}
compensated_function void __profile_releasePage(Page * p) {
pthread_mutex_lock(&profile_load_mutex);
// int pageid = p->id;
int * pins = LH_ENTRY(find)(profile_load_pins_hash, &p, sizeof(void*));
assert(pins);
if(*pins == 1) {
char * holder = LH_ENTRY(remove)(profile_load_hash, &p, sizeof(void*));
assert(holder);
free(holder);
}
(*pins)--;
pthread_mutex_unlock(&profile_load_mutex);
releasePage(p);
}
#endif
compensated_function Page *loadPage(int xid, int pageid) {
try_ret(NULL) {
if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); }
} end_ret(NULL);
Page * ret = getPage(pageid, RO);
#ifdef LONG_TEST
#ifdef PIN_COUNT
pthread_mutex_lock(&pinCount_mutex);
pinCount ++;
pthread_mutex_unlock(&pinCount_mutex);
@ -284,3 +464,4 @@ compensated_function Page *loadPage(int xid, int pageid) {
return ret;
}

View file

@ -7,6 +7,7 @@
#include <pbl/pbl.h>
#include <errno.h>
#include <assert.h>
#undef pthread_mutex_t
#undef pthread_mutex_init
@ -177,16 +178,14 @@ __profile_rwl *__profile_rw_initlock (char * file, int line) {
ret->lockpoints = pblHtCreate();
ret->lock = initlock();
ret->holder = 0;
ret->readCount = 0;
return ret;
}
/*static pthread_mutex_t __profile_rwl_mutex = PTHREAD_MUTEX_INITIALIZER;*/
/**
@todo For now, we only profile write locks...
*/
void __profile_readlock (__profile_rwl *lock, int d, char * file, int line) {
char * location;
@ -203,7 +202,16 @@ void __profile_readlock (__profile_rwl *lock, int d, char * file, int line) {
implementation, or should we see how many times we were woken
before obtaining the lock? */
#ifdef PROFILE_LATCHES_WRITE_ONLY
pthread_t self = pthread_self();
if(lock->holder != self) {
writelock(lock->lock, d);
lock->holder = self;
}
lock->readCount++;
#else
readlock(lock->lock, d);
#endif
/* pthread_mutex_lock(__profile_rwl_mutex); */
@ -226,8 +234,8 @@ void __profile_readlock (__profile_rwl *lock, int d, char * file, int line) {
/* pthread_mutex_unlock(__profile_rwl_mutex);*/
}
void __profile_writelock (__profile_rwl *lock, int d, char * file, int line) {
char * location;
@ -242,7 +250,6 @@ void __profile_writelock (__profile_rwl *lock, int d, char * file, int line) {
/** @todo Should we spin instead of using the more efficient rwl
implementation, or should we see how many times we were woken
before obtaining the lock? */
writelock(lock->lock, d);
/* pthread_mutex_lock(__profile_rwl_mutex); */
@ -270,8 +277,23 @@ void __profile_writelock (__profile_rwl *lock, int d, char * file, int line) {
}
void __profile_readunlock (__profile_rwl *lock) {
readunlock(lock->lock);
profile_tuple * tup = pblHtLookup(lock->lockpoints, lock->last_acquired_at, strlen(lock->last_acquired_at)+1);
released_lock(tup);
released_lock(&(lock->tup));
#ifdef PROFILE_LATCHES_WRITE_ONLY
pthread_t self = pthread_self();
assert(lock->holder == self);
lock->readCount--;
if(!lock->readCount) {
lock->holder = 0;
free(lock->last_acquired_at); // last_acquired_at gets leaked by readunlock.
writeunlock(lock->lock);
}
#else
readunlock(lock->lock);
#endif
}
void __profile_writeunlock (__profile_rwl *lock) {
@ -287,7 +309,11 @@ void __profile_writeunlock (__profile_rwl *lock) {
}
void __profile_unlock (__profile_rwl * lock) {
#ifdef PROFILE_LATCHES_WRITE_ONLY
if(!lock->readCount) {
#else
if(lock->lock->writers) {
#endif
__profile_writeunlock(lock);
} else {
__profile_readunlock(lock);
@ -309,17 +335,23 @@ void __profile_deletelock (__profile_rwl *lock) {
profile_tuple * tup;
printf("Free rwl init: %s %d\n ", lock->file, lock->line);
#ifdef PROFILE_LATCHES_VERBOSE
printf("Free rwl init: %s %d\t ", lock->file, lock->line);
print_profile_tuple(&(lock->tup));
printf("\n Lock points: [mean, stddev, max] \n");
printf("\n");
printf("Lock points: [mean, stddev, max] \n");
for(tup = pblHtFirst(lock->lockpoints); tup; tup = pblHtNext(lock->lockpoints)) {
printf("\t%s ", (char*)pblHtCurrentKey(lock->lockpoints));
print_profile_tuple(tup);
printf("\n");
free(tup);
}
}
#else
for(tup = pblHtFirst(lock->lockpoints); tup; tup = pblHtNext(lock->lockpoints)) {
free(tup);
}
#endif
pblHtDelete(lock->lockpoints);
deletelock(lock->lock);

View file

@ -42,6 +42,8 @@ typedef struct {
char * last_acquired_at;
/* pblHashTable_t * lockpoints; */
void * lockpoints;
pthread_t holder;
int readCount;
} __profile_rwl;
#ifdef PROFILE_LATCHES

View file

@ -105,7 +105,7 @@ static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat
return 0;
}
static pthread_mutex_t talloc_mutex;
static pthread_mutex_t talloc_mutex = PTHREAD_MUTEX_INITIALIZER;
Operation getAlloc() {
Operation o = {
@ -146,7 +146,7 @@ static allocationPolicy * allocPolicy;
void TallocInit() {
lastFreepage = UINT64_MAX;
allocPolicy = allocationPolicyInit();
pthread_mutex_init(&talloc_mutex, NULL);
// pthread_mutex_init(&talloc_mutex, NULL);
}
static compensated_function recordid TallocFromPageInternal(int xid, Page * p, unsigned long size);
@ -225,33 +225,6 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
}
/* if(lastFreepage == UINT64_MAX) {
try_ret(NULLRID) {
lastFreepage = TpageAlloc(xid);
} end_ret(NULLRID);
try_ret(NULLRID) {
p = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
// assert(*page_type_ptr(p) == UNINITIALIZED_PAGE);
slottedPageInitialize(p);
} else {
try_ret(NULLRID) {
p = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
} */
/* if(slottedFreespace(p) < physical_slot_length(type) ) {
// XXX compact page?!?
releasePage(p);
try_ret(NULLRID) {
lastFreepage = TpageAlloc(xid);
} end_ret(NULLRID);
try_ret(NULLRID) {
p = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
slottedPageInitialize(p);
} */
rid = TallocFromPageInternal(xid, p, size);
int newFreespace = slottedFreespace(p);
@ -303,10 +276,10 @@ static compensated_function recordid TallocFromPageInternal(int xid, Page * p, u
assert(slotSize < PAGE_SIZE && slotSize > 0);
if(slottedFreespace(p) < slotSize) {
/* if(slottedFreespace(p) < slotSize) {
slottedCompact(p);
}
if(slottedFreespace(p) < slotSize) {
} */
if(slottedFreespace(p) < slotSize) {
rid = NULLRID;
} else {
rid = slottedRawRalloc(p, type);
@ -333,6 +306,17 @@ static compensated_function recordid TallocFromPageInternal(int xid, Page * p, u
compensated_function void Tdealloc(int xid, recordid rid) {
// @todo this needs to garbage collect emptry pages / storage regions.
// XXX This is BROKEN. It needs to lock the page that it's
// deallocating from. A shared/exclusive lock doesn't quite do it.
// If dealloc got a shared lock, then alloc could get an exclusive
// lock when it allocs, but then alloc would need to free the lock,
// since deallocation is always safe (assuming the app isn't
// deallocating something that hasn't committed yet, which is its
// fault, not ours. Also, we don't want to prevent a transaction
// from allocating to a page if it is the only transaction that's
// freed something on that page.
void * preimage = malloc(rid.size);
Page * p;
try {

View file

@ -7,7 +7,6 @@
static void really_do_ralloc(Page * page, recordid rid) ;
size_t slottedFreespaceForSlot(Page * page, int slot);
void fsckSlottedPage(const Page const * page) {
#ifndef SLOTTED_PAGE_SKIP_SANITY_CHECKS
@ -322,7 +321,7 @@ size_t slottedFreespaceUnlocked(Page * page);
size_t slottedFreespaceForSlot(Page * page, int slot) {
size_t slotOverhead;
if(slot == -1) {
if(slot == INVALID_SLOT) {
slotOverhead = (*freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0;
} else if(slot < *numslots_ptr(page)) {
slotOverhead = 0;
@ -352,7 +351,7 @@ size_t slottedFreespaceForSlot(Page * page, int slot) {
then write a randomized test that confirms the model matches the
implementation's behavior. */
size_t slottedFreespaceUnlocked(Page * page) {
return slottedFreespaceForSlot(page, -1);
return slottedFreespaceForSlot(page, INVALID_SLOT);
}
size_t slottedFreespace(Page * page) {
@ -523,8 +522,6 @@ static void really_do_ralloc(Page * page, recordid rid) {
*slot_length_ptr(page, rid.slot) = rid.size;
assert(slottedFreespaceForSlot(page, -1) || 1);
}
/**
@param rid with user-visible size.
@ -586,8 +583,6 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
pageWriteLSN(xid, page, lsn);
assert(slottedFreespaceForSlot(page, -1) || 1);
fsckSlottedPage(page);
writeunlock(page->rwlatch);