Fixed initialization bugs on pendingCommits and syncLogCount. Group commit used to hang with 100% CPU if a signal interrputed it. Now it ignores the signal. (Don't know why it's receiving a singal, so it warns when this happens...)
This commit is contained in:
parent
2d136695ff
commit
836d2d097d
1 changed files with 13 additions and 12 deletions
|
@ -61,6 +61,10 @@ terms specified in this license.
|
||||||
|
|
||||||
int loggerType = LOG_TO_FILE;
|
int loggerType = LOG_TO_FILE;
|
||||||
|
|
||||||
|
extern int numActiveXactions;
|
||||||
|
static int pendingCommits;
|
||||||
|
static int syncLogCount;
|
||||||
|
|
||||||
long LoggerSizeOfInternalLogEntry(const LogEntry * e) {
|
long LoggerSizeOfInternalLogEntry(const LogEntry * e) {
|
||||||
if(loggerType == LOG_TO_FILE) {
|
if(loggerType == LOG_TO_FILE) {
|
||||||
return sizeofInternalLogEntry_LogWriter(e);
|
return sizeofInternalLogEntry_LogWriter(e);
|
||||||
|
@ -87,6 +91,8 @@ int LogInit(int logType) {
|
||||||
|
|
||||||
loggerType = logType;
|
loggerType = logType;
|
||||||
|
|
||||||
|
pendingCommits = 0;
|
||||||
|
syncLogCount = 0;
|
||||||
if(LOG_TO_FILE == logType) {
|
if(LOG_TO_FILE == logType) {
|
||||||
openLogWriter();
|
openLogWriter();
|
||||||
} else if(LOG_TO_MEMORY == logType) {
|
} else if(LOG_TO_MEMORY == logType) {
|
||||||
|
@ -209,21 +215,17 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extern int numActiveXactions;
|
|
||||||
/**
|
/**
|
||||||
@todo This belongs in logWriter.c and needs a new name.
|
@todo This should be usable by all calls that sync the log; not just commit.
|
||||||
*/
|
*/
|
||||||
static lsn_t groupCommit(TransactionLog * l) {
|
static lsn_t groupCommit(TransactionLog * l) {
|
||||||
static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
|
||||||
static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
|
static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
|
||||||
static int pendingCommits = 0;
|
|
||||||
static int syncLogCount;
|
|
||||||
|
|
||||||
lsn_t ret = LogTransCommon(l, XCOMMIT);
|
lsn_t ret = LogTransCommon(l, XCOMMIT);
|
||||||
|
|
||||||
struct timeval now;
|
struct timeval now;
|
||||||
struct timespec timeout;
|
struct timespec timeout;
|
||||||
// int retcode;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&check_commit);
|
pthread_mutex_lock(&check_commit);
|
||||||
if(LogFlushedLSN() >= ret) {
|
if(LogFlushedLSN() >= ret) {
|
||||||
|
@ -241,24 +243,24 @@ static lsn_t groupCommit(TransactionLog * l) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pendingCommits++;
|
pendingCommits++;
|
||||||
// if(pendingCommits <= (numActiveXactions / 2)) {
|
|
||||||
if((numActiveXactions > 1 && pendingCommits < numActiveXactions) ||
|
if((numActiveXactions > 1 && pendingCommits < numActiveXactions) ||
|
||||||
(numActiveXactions > 20 && pendingCommits < (int)((double)numActiveXactions * 0.95))) {
|
(numActiveXactions > 20 && pendingCommits < (int)((double)numActiveXactions * 0.95))) {
|
||||||
while(ETIMEDOUT != (pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) {
|
int retcode;
|
||||||
|
while(ETIMEDOUT != (retcode = pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) {
|
||||||
|
if(retcode != 0) {
|
||||||
|
printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by a signal in groupCommit(). Acting as though it timed out.\n", __FILE__, __LINE__);
|
||||||
|
break;
|
||||||
|
}
|
||||||
if(LogFlushedLSN() >= ret) {
|
if(LogFlushedLSN() >= ret) {
|
||||||
pendingCommits--;
|
pendingCommits--;
|
||||||
pthread_mutex_unlock(&check_commit);
|
pthread_mutex_unlock(&check_commit);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// printf("Timed out");
|
|
||||||
} else {
|
|
||||||
// printf("Didn't wait %d < %d\n", (numActiveXactions / 2), pendingCommits);
|
|
||||||
}
|
}
|
||||||
if(LogFlushedLSN() < ret) {
|
if(LogFlushedLSN() < ret) {
|
||||||
syncLog_LogWriter();
|
syncLog_LogWriter();
|
||||||
syncLogCount++;
|
syncLogCount++;
|
||||||
// printf(" %d ", syncLogCount);
|
|
||||||
pthread_cond_broadcast(&tooFewXacts);
|
pthread_cond_broadcast(&tooFewXacts);
|
||||||
}
|
}
|
||||||
assert(LogFlushedLSN() >= ret);
|
assert(LogFlushedLSN() >= ret);
|
||||||
|
@ -312,7 +314,6 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
|
||||||
|
|
||||||
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage);
|
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage);
|
||||||
|
|
||||||
// writeLogEntry(e);
|
|
||||||
LogWrite(e);
|
LogWrite(e);
|
||||||
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid,
|
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid,
|
||||||
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize);
|
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize);
|
||||||
|
|
Loading…
Reference in a new issue