stasis-aries-wal/src/stasis/truncation.c

251 lines
7.3 KiB
C
Raw Normal View History

#include <limits.h>
#include <stasis/truncation.h>
2006-05-19 20:17:44 +00:00
#include <pbl/pbl.h>
#include <stasis/logger/logger2.h>
#include <stasis/page.h>
2006-05-19 20:17:44 +00:00
#include <assert.h>
static int initialized = 0;
2006-05-19 20:17:44 +00:00
static int automaticallyTuncating = 0;
static pthread_t truncationThread;
static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t shutdown_cond = PTHREAD_COND_INITIALIZER;
2006-05-19 20:17:44 +00:00
static pblHashTable_t * dirtyPages = 0;
static pthread_mutex_t dirtyPages_mutex = PTHREAD_MUTEX_INITIALIZER;
#ifdef LONG_TEST
#define TARGET_LOG_SIZE (1024 * 1024 * 5)
#define TRUNCATE_INTERVAL 1
#define MIN_INCREMENTAL_TRUNCATION (1024 * 1024 * 1)
#else
2006-05-19 20:17:44 +00:00
#define TARGET_LOG_SIZE (1024 * 1024 * 50)
#define TRUNCATE_INTERVAL 1
#define MIN_INCREMENTAL_TRUNCATION (1024 * 1024 * 25)
#endif
2006-05-19 20:17:44 +00:00
void dirtyPages_add(Page * p) {
pthread_mutex_lock(&dirtyPages_mutex);
if(!p->dirty) {
p->dirty = 1;
2006-05-24 02:19:04 +00:00
//assert(p->LSN);
void* ret = pblHtLookup(dirtyPages, &(p->id), sizeof(p->id));
2006-05-24 02:19:04 +00:00
assert(!ret);
2006-11-07 22:37:05 +00:00
lsn_t * insert = malloc(sizeof(lsn_t));
*insert = p->LSN;
pblHtInsert(dirtyPages, &(p->id), sizeof(p->id), insert); //(void*)p->LSN);
2006-05-19 20:17:44 +00:00
}
pthread_mutex_unlock(&dirtyPages_mutex);
}
void dirtyPages_remove(Page * p) {
pthread_mutex_lock(&dirtyPages_mutex);
// printf("Removing page %d\n", p->id);
//assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int)));
// printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages));
p->dirty = 0;
lsn_t * old = pblHtLookup(dirtyPages, &(p->id),sizeof(p->id));
pblHtRemove(dirtyPages, &(p->id), sizeof(p->id));
if(old) {
free(old);
}
2006-05-24 02:19:04 +00:00
//assert(!ret); <--- Due to a bug in the PBL compatibility mode,
//there is no way to tell whether the value didn't exist, or if it
//was null.
2006-05-19 20:17:44 +00:00
pthread_mutex_unlock(&dirtyPages_mutex);
}
int dirtyPages_isDirty(Page * p) {
int ret;
pthread_mutex_lock(&dirtyPages_mutex);
ret = p->dirty;
pthread_mutex_unlock(&dirtyPages_mutex);
return ret;
}
2006-05-19 20:17:44 +00:00
static lsn_t dirtyPages_minRecLSN() {
lsn_t lsn = LSN_T_MAX; // LogFlushedLSN ();
pageid_t* pageid;
2006-05-19 20:17:44 +00:00
pthread_mutex_lock(&dirtyPages_mutex);
for( pageid = (pageid_t*)pblHtFirst (dirtyPages); pageid; pageid = (pageid_t*)pblHtNext(dirtyPages)) {
2006-11-07 22:37:05 +00:00
lsn_t * thisLSN = (lsn_t*) pblHtCurrent(dirtyPages);
2006-05-19 20:17:44 +00:00
// printf("lsn = %d\n", thisLSN);
2006-11-07 22:37:05 +00:00
if(*thisLSN < lsn) {
lsn = *thisLSN;
2006-05-19 20:17:44 +00:00
}
}
pthread_mutex_unlock(&dirtyPages_mutex);
return lsn;
}
static void dirtyPages_flush() {
pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE));
2006-05-19 20:17:44 +00:00
int i;
for(i = 0; i < MAX_BUFFER_SIZE; i++) {
2006-05-19 20:17:44 +00:00
staleDirtyPages[i] = -1;
}
Page* p = 0;
pthread_mutex_lock(&dirtyPages_mutex);
void* tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages); tmp; tmp = pblHtNext(dirtyPages)) {
staleDirtyPages[i] = *((pageid_t*) pblHtCurrentKey(dirtyPages));
2006-05-19 20:17:44 +00:00
i++;
}
assert(i < MAX_BUFFER_SIZE);
2006-05-19 20:17:44 +00:00
pthread_mutex_unlock(&dirtyPages_mutex);
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = loadPage(-1, staleDirtyPages[i]);
writeBackPage(p);
2006-05-19 20:17:44 +00:00
releasePage(p);
}
free(staleDirtyPages);
2006-05-19 20:17:44 +00:00
}
void dirtyPages_flushRange(pageid_t start, pageid_t stop) {
pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE));
int i;
Page * p = 0;
pthread_mutex_lock(&dirtyPages_mutex);
void *tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages); tmp; tmp = pblHtNext(dirtyPages)) {
pageid_t num = *((pageid_t*) pblHtCurrentKey(dirtyPages));
if(num <= start && num < stop) {
staleDirtyPages[i] = num;
i++;
}
}
staleDirtyPages[i] = -1;
pthread_mutex_unlock(&dirtyPages_mutex);
2006-05-19 20:17:44 +00:00
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = loadPage(-1, staleDirtyPages[i]);
writeBackPage(p);
releasePage(p);
}
free(staleDirtyPages);
forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE);
}
2006-05-19 20:17:44 +00:00
void dirtyPagesInit() {
dirtyPages = pblHtCreate();
}
void dirtyPagesDeinit() {
2006-11-07 22:37:05 +00:00
void * tmp;
int areDirty = 0;
2006-11-07 22:37:05 +00:00
for(tmp = pblHtFirst(dirtyPages); tmp; tmp = pblHtNext(dirtyPages)) {
free(pblHtCurrent(dirtyPages));
if((!areDirty) &&
(!stasis_suppress_unclean_shutdown_warnings)) {
printf("Warning: dirtyPagesDeinit detected dirty, unwritten pages. "
"Updates lost?\n");
areDirty = 1;
}
2006-11-07 22:37:05 +00:00
}
2006-05-19 20:17:44 +00:00
pblHtDelete(dirtyPages);
dirtyPages = 0;
}
2008-12-01 19:48:59 +00:00
void stasis_truncation_init() {
2006-05-19 20:17:44 +00:00
initialized = 1;
}
2008-12-01 19:48:59 +00:00
void stasis_truncation_deinit() {
pthread_mutex_lock(&shutdown_mutex);
2006-05-19 20:17:44 +00:00
initialized = 0;
if(automaticallyTuncating) {
void * ret = 0;
pthread_mutex_unlock(&shutdown_mutex);
pthread_cond_broadcast(&shutdown_cond);
2006-05-19 20:17:44 +00:00
pthread_join(truncationThread, &ret);
} else {
pthread_mutex_unlock(&shutdown_mutex);
2006-05-19 20:17:44 +00:00
}
automaticallyTuncating = 0;
}
2008-12-01 19:48:59 +00:00
static void* stasis_truncation_thread_worker(void* logp) {
stasis_log_t * log = logp;
pthread_mutex_lock(&shutdown_mutex);
2008-12-01 19:48:59 +00:00
while(initialized) {
if(log->first_unstable_lsn(log, LOG_FORCE_WAL) - log->truncation_point(log)
2008-12-01 19:48:59 +00:00
> TARGET_LOG_SIZE) {
stasis_truncation_truncate(log, 0);
2006-05-19 20:17:44 +00:00
}
struct timeval now;
struct timespec timeout;
int timeret = gettimeofday(&now, 0);
assert(0 == timeret);
2008-12-01 19:48:59 +00:00
timeout.tv_sec = now.tv_sec;
timeout.tv_nsec = now.tv_usec;
timeout.tv_sec += TRUNCATE_INTERVAL;
2008-12-01 19:48:59 +00:00
pthread_cond_timedwait(&shutdown_cond, &shutdown_mutex, &timeout);
2006-05-19 20:17:44 +00:00
}
pthread_mutex_unlock(&shutdown_mutex);
2006-05-19 20:17:44 +00:00
return (void*)0;
}
2008-12-01 19:48:59 +00:00
void stasis_truncation_thread_start(stasis_log_t* log) {
2006-05-19 20:17:44 +00:00
assert(!automaticallyTuncating);
automaticallyTuncating = 1;
2008-12-01 19:48:59 +00:00
pthread_create(&truncationThread, 0, &stasis_truncation_thread_worker, log);
2006-05-19 20:17:44 +00:00
}
2008-12-01 19:48:59 +00:00
int stasis_truncation_truncate(stasis_log_t* log, int force) {
// *_minRecLSN() used to return the same value as flushed if
//there were no outstanding transactions, but flushed might
//not point to the front of a log entry... now, both return
//LSN_T_MAX if there are no outstanding transactions / no
//dirty pages.
2008-12-01 19:48:59 +00:00
2006-05-19 20:17:44 +00:00
lsn_t page_rec_lsn = dirtyPages_minRecLSN();
lsn_t xact_rec_lsn = transactions_minRecLSN();
lsn_t flushed_lsn = log->first_unstable_lsn(log, LOG_FORCE_WAL);
2006-05-19 20:17:44 +00:00
lsn_t rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
2008-12-01 19:48:59 +00:00
lsn_t log_trunc = log->truncation_point(log);
if(force || (xact_rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
//fprintf(stderr, "xact = %ld \t log = %ld\n", xact_rec_lsn, log_trunc);
2008-12-01 19:48:59 +00:00
if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
// fprintf(stderr, "Truncating now. rec_lsn = %ld, log_trunc = %ld\n", rec_lsn, log_trunc);
// fprintf(stderr, "Truncating to rec_lsn = %ld\n", rec_lsn);
forcePages();
2008-12-01 19:48:59 +00:00
log->truncate(log, rec_lsn);
return 1;
2008-12-01 19:48:59 +00:00
} else {
lsn_t flushed = log->first_unstable_lsn(log, LOG_FORCE_WAL);
2008-12-01 19:48:59 +00:00
if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) {
//fprintf(stderr, "Flushing dirty buffers: rec_lsn = %ld log_trunc = %ld flushed = %ld\n", rec_lsn, log_trunc, flushed);
dirtyPages_flush();
2008-12-01 19:48:59 +00:00
page_rec_lsn = dirtyPages_minRecLSN();
rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
2008-12-01 19:48:59 +00:00
//fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn);
forcePages();
2008-12-01 19:48:59 +00:00
log->truncate(log, rec_lsn);
return 1;
2008-12-01 19:48:59 +00:00
} else {
2006-05-19 20:17:44 +00:00
return 0;
}
}
} else {
return 0;
2006-05-19 20:17:44 +00:00
}
}