refactored truncation + dirty page table, fixed doxygen warnings.

This commit is contained in:
Sears Russell 2009-05-20 21:23:51 +00:00
parent 73c4b4fb71
commit 7db06197ca
21 changed files with 430 additions and 359 deletions

View file

@ -80,7 +80,7 @@ int main(int argc, char ** argv) {
for(long i =0; i < page_count; i++) {
Page * p = loadPage(-1, i);
dirtyPages_add(p);
stasis_dirty_page_table_set_dirty(stasis_dirty_page_table, p);
releasePage(p);
}

View file

@ -6,6 +6,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c
bufferManager/legacy/legacyBufferManager.c
page.c bufferPool.c blobManager.c
recovery2.c truncation.c transactional2.c
dirtyPageTable.c
allocationPolicy.c lockManager.c iterator.c
consumer.c arrayCollection.c ringbuffer.c fifo.c
multiplexer.c graph.c logger/logEntry.c

View file

@ -3,6 +3,7 @@ lib_LTLIBRARIES=libstasis.la
libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common.c flags.c \
stats.c linkedlist.c operations.c pageHandle.c \
page.c bufferPool.c blobManager.c recovery2.c truncation.c \
dirtyPageTable.c \
transactional2.c allocationPolicy.c \
lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\
logger/logEntry.c \

View file

@ -80,9 +80,9 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret) {
dirty page table can be kept up to date. */
static void pfPageWrite(stasis_page_handle_t * h, Page * ret) {
/** If the page is clean, there's no reason to write it out. */
assert(ret->dirty == dirtyPages_isDirty(ret));
assert(ret->dirty == stasis_dirty_page_table_is_dirty(h->dirtyPages, ret));
if(!ret->dirty) {
// if(!dirtyPages_isDirty(ret)) {
// if(!stasis_dirty_page_table_is_dirty(ret)) {
DEBUG(" =^)~ ");
return;
}
@ -126,14 +126,14 @@ static void pfPageWrite(stasis_page_handle_t * h, Page * ret) {
}
}
dirtyPages_remove(ret);
stasis_dirty_page_table_set_clean(h->dirtyPages, ret);
pthread_mutex_unlock(&stable_mutex);
}
//#define PAGE_FILE_O_DIRECT
/** @todo O_DIRECT is broken in older linuxes (eg 2.4). The build script should disable it on such platforms. */
stasis_page_handle_t* openPageFile(stasis_log_t * log) {
stasis_page_handle_t* openPageFile(stasis_log_t * log, stasis_dirty_page_table_t * dpt) {
stasis_page_handle_t * ret = malloc(sizeof(*ret));
ret->read = pfPageRead;
ret->write = pfPageWrite;
@ -141,6 +141,7 @@ stasis_page_handle_t* openPageFile(stasis_log_t * log) {
ret->force_range = pfForceRangePageFile;
ret->close = pfClosePageFile;
ret->log = log;
ret->dirtyPages = dpt;
DEBUG("Opening storefile.\n");
#ifdef PAGE_FILE_O_DIRECT

View file

@ -15,15 +15,15 @@ static pthread_mutex_t pageArray_mut = PTHREAD_MUTEX_INITIALIZER;
static Page * paLoadPage(int xid, pageid_t pageid) {
pthread_mutex_lock(&pageArray_mut);
if(pageid >= pageCount) {
if(pageid >= pageCount) {
pageMap = realloc(pageMap, (1+pageid) * sizeof(Page*));
for(pageid_t i = pageCount; i <= pageid; i++) {
for(pageid_t i = pageCount; i <= pageid; i++) {
pageMap[i] = 0;
}
pageCount = pageid + 1;
}
if(!pageMap[pageid]) {
if(!pageMap[pageid]) {
pageMap[pageid] = malloc(sizeof(Page));
pageMap[pageid]->id = pageid;
pageMap[pageid]->LSN = 0;
@ -40,16 +40,16 @@ static Page * paLoadPage(int xid, pageid_t pageid) {
return pageMap[pageid];
}
static void paReleasePage(Page * p) {
dirtyPages_remove(p);
static void paReleasePage(Page * p) {
stasis_dirty_page_table_set_clean(stasis_dirty_page_table, p);
}
static void paWriteBackPage(Page * p) { /* no-op */ }
static void paForcePages() { /* no-op */ }
static void paBufDeinit() {
for(pageid_t i =0; i < pageCount; i++) {
if(pageMap[i]) {
static void paBufDeinit() {
for(pageid_t i =0; i < pageCount; i++) {
if(pageMap[i]) {
deletelock(pageMap[i]->rwlatch);
deletelock(pageMap[i]->loadlatch);
free(pageMap[i]);
@ -57,14 +57,14 @@ static void paBufDeinit() {
}
}
void stasis_buffer_manager_mem_array_open () {
void stasis_buffer_manager_mem_array_open () {
releasePageImpl = paReleasePage;
loadPageImpl = paLoadPage;
writeBackPage = paWriteBackPage;
forcePages = paForcePages;
stasis_buffer_manager_close = paBufDeinit;
stasis_buffer_manager_simulate_crash = paBufDeinit;
stasis_buffer_manager_close = paBufDeinit;
stasis_buffer_manager_simulate_crash = paBufDeinit;
pageCount = 0;
pageMap = 0;

151
src/stasis/dirtyPageTable.c Normal file
View file

@ -0,0 +1,151 @@
/*
* dirtyPageTable.c
*
* Created on: May 18, 2009
* Author: sears
*/
#include <pbl/pbl.h>
#include <stasis/common.h>
#include <stasis/dirtyPageTable.h>
#include <stasis/page.h>
#include <stasis/bufferManager.h>
struct stasis_dirty_page_table_t {
pblHashTable_t * table;
pthread_mutex_t mutex;
};
void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) {
pthread_mutex_lock(&dirtyPages->mutex);
if(!p->dirty) {
p->dirty = 1;
//assert(p->LSN);
void* ret = pblHtLookup(dirtyPages->table, &(p->id), sizeof(p->id));
assert(!ret);
lsn_t * insert = malloc(sizeof(lsn_t));
*insert = p->LSN;
pblHtInsert(dirtyPages->table, &(p->id), sizeof(p->id), insert); //(void*)p->LSN);
}
pthread_mutex_unlock(&dirtyPages->mutex);
}
void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p) {
pthread_mutex_lock(&dirtyPages->mutex);
// printf("Removing page %d\n", p->id);
//assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int)));
// printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages));
p->dirty = 0;
lsn_t * old = pblHtLookup(dirtyPages->table, &(p->id),sizeof(p->id));
pblHtRemove(dirtyPages->table, &(p->id), sizeof(p->id));
if(old) {
free(old);
}
//assert(!ret); <--- Due to a bug in the PBL compatibility mode,
//there is no way to tell whether the value didn't exist, or if it
//was null.
pthread_mutex_unlock(&dirtyPages->mutex);
}
int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) {
int ret;
pthread_mutex_lock(&dirtyPages->mutex);
ret = p->dirty;
pthread_mutex_unlock(&dirtyPages->mutex);
return ret;
}
lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t * dirtyPages) {
lsn_t lsn = LSN_T_MAX; // LogFlushedLSN ();
pageid_t* pageid;
pthread_mutex_lock(&dirtyPages->mutex);
for( pageid = (pageid_t*)pblHtFirst (dirtyPages->table); pageid; pageid = (pageid_t*)pblHtNext(dirtyPages->table)) {
lsn_t * thisLSN = (lsn_t*) pblHtCurrent(dirtyPages->table);
// printf("lsn = %d\n", thisLSN);
if(*thisLSN < lsn) {
lsn = *thisLSN;
}
}
pthread_mutex_unlock(&dirtyPages->mutex);
return lsn;
}
void stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE));
int i;
for(i = 0; i < MAX_BUFFER_SIZE; i++) {
staleDirtyPages[i] = -1;
}
Page* p = 0;
pthread_mutex_lock(&dirtyPages->mutex);
void* tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) {
staleDirtyPages[i] = *((pageid_t*) pblHtCurrentKey(dirtyPages->table));
i++;
}
assert(i < MAX_BUFFER_SIZE);
pthread_mutex_unlock(&dirtyPages->mutex);
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = loadPage(-1, staleDirtyPages[i]);
writeBackPage(p);
releasePage(p);
}
free(staleDirtyPages);
}
void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop) {
pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE));
int i;
Page * p = 0;
pthread_mutex_lock(&dirtyPages->mutex);
void *tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) {
pageid_t num = *((pageid_t*) pblHtCurrentKey(dirtyPages->table));
if(num <= start && num < stop) {
staleDirtyPages[i] = num;
i++;
}
}
staleDirtyPages[i] = -1;
pthread_mutex_unlock(&dirtyPages->mutex);
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = loadPage(-1, staleDirtyPages[i]);
writeBackPage(p);
releasePage(p);
}
free(staleDirtyPages);
forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE);
}
stasis_dirty_page_table_t * stasis_dirty_page_table_init() {
stasis_dirty_page_table_t * ret = malloc(sizeof(*ret));
ret->table = pblHtCreate();
pthread_mutex_init(&ret->mutex, 0);
return ret;
}
void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages) {
void * tmp;
int areDirty = 0;
for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) {
free(pblHtCurrent(dirtyPages->table));
if((!areDirty) &&
(!stasis_suppress_unclean_shutdown_warnings)) {
printf("Warning: dirtyPagesDeinit detected dirty, unwritten pages. "
"Updates lost?\n");
areDirty = 1;
}
}
pblHtDelete(dirtyPages->table);
pthread_mutex_destroy(&dirtyPages->mutex);
free(dirtyPages);
}

View file

@ -58,7 +58,7 @@ void LinearHashNTAInit() {
/* private methods... */
compensated_function static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh);
/** @todo Remove #defined HASH_INIT_ARRAY_LIST_COUNT */
/** @todo Remove defined HASH_INIT_ARRAY_LIST_COUNT */
#define HASH_INIT_ARRAY_LIST_COUNT (stasis_util_two_to_the(HASH_INIT_BITS))
#define HASH_INIT_ARRAY_LIST_MULT 2
@ -89,7 +89,7 @@ compensated_function recordid ThashCreate(int xid, int keySize, int valueSize) {
Tset(xid, bucket, &rid);
// printf("paged list alloced at rid {%d %d %d}\n", rid.page, rid.slot, rid.size);
} end_ret(NULLRID);
}
} else {
@ -112,7 +112,7 @@ compensated_function recordid ThashCreate(int xid, int keySize, int valueSize) {
} end_ret(NULLRID);
return hashHeader;
}
compensated_function void ThashDelete(int xid, recordid hash) {
abort();
}
@ -137,7 +137,7 @@ compensated_function static int op_linear_hash_insert(const LogEntry* e, Page* p
recordid hashHeader = args->hashHeader;
int keySize = args->keySize;
int valueSize = args->valueSize;
assert(valueSize >= 0);
byte * key = (byte*)(args+1);
@ -146,24 +146,24 @@ compensated_function static int op_linear_hash_insert(const LogEntry* e, Page* p
pthread_mutex_lock(&linear_hash_mutex);
__ThashInsert(e->xid, hashHeader, key, keySize, value, valueSize);
} compensate_ret(compensation_error());
return 0;
return 0;
}
compensated_function static int op_linear_hash_remove(const LogEntry* e, Page* p) {
const linearHash_insert_arg * args = (const linearHash_insert_arg*) getUpdateArgs(e);
recordid hashHeader = args->hashHeader;
int keySize = args->keySize;
int keySize = args->keySize;
byte * key = (byte*)(args + 1);
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
__ThashRemove(e->xid, hashHeader, key, keySize);
} compensate_ret(compensation_error());
return 0;
}
stasis_operation_impl stasis_op_impl_linear_hash_insert() {
stasis_operation_impl o = {
OPERATION_LINEAR_HASH_INSERT,
OPERATION_LINEAR_HASH_INSERT,
OPERATION_NOOP,
OPERATION_LINEAR_HASH_REMOVE,
&op_linear_hash_insert
@ -172,7 +172,7 @@ stasis_operation_impl stasis_op_impl_linear_hash_insert() {
}
stasis_operation_impl stasis_op_impl_linear_hash_remove() {
stasis_operation_impl o = {
OPERATION_LINEAR_HASH_REMOVE,
OPERATION_LINEAR_HASH_REMOVE,
OPERATION_NOOP,
OPERATION_LINEAR_HASH_INSERT,
&op_linear_hash_remove
@ -196,7 +196,7 @@ compensated_function int ThashInsert(int xid, recordid hashHeader, const byte* k
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
handle = TbeginNestedTopAction(xid, OPERATION_LINEAR_HASH_INSERT, (byte*)arg, argSize);
free(arg);
ret = __ThashInsert(xid, hashHeader, key, keySize, value, valueSize);
ret = __ThashInsert(xid, hashHeader, key, keySize, value, valueSize);
} end_action_ret(compensation_error());
TendNestedTopAction(xid, handle);
@ -216,7 +216,7 @@ compensated_function static int __ThashInsert(int xid, recordid hashHeader, cons
if(lhh.numEntries > (int)((double)(lhh.nextSplit
+ stasis_util_two_to_the(lhh.bits-1)) * (HASH_FILL_FACTOR))) {
ThashSplitBucket(xid, hashHeader, &lhh);
}
}
} else {
if(lhh.numEntries > (int)((double)(lhh.nextSplit
+ stasis_util_two_to_the(lhh.bits-1)) * HASH_FILL_FACTOR)) {
@ -229,18 +229,18 @@ compensated_function static int __ThashInsert(int xid, recordid hashHeader, cons
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
int ret;
try_ret(compensation_error()) {
try_ret(compensation_error()) {
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &bucketList);
// int before = TpagedListSpansPages(xid, bucketList);
ret = TpagedListRemove(xid, bucketList, key, keySize);
TpagedListInsert(xid, bucketList, key, keySize, value, valueSize);
// int after = TpagedListSpansPages(xid, bucketList);
// if(before != after) { // Page overflowed...
// T hashSplitBucket(xid, hashHeader, &lhh);
@ -255,15 +255,15 @@ compensated_function static int __ThashInsert(int xid, recordid hashHeader, cons
if(ret) { lhh.numEntries--; }
Tset(xid, hashHeader, &lhh);
} end_ret(compensation_error());
} end_ret(compensation_error());
return ret;
}
compensated_function int ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize) {
hashHeader.size = sizeof(lladd_hash_header);
byte * value;
int valueSize;
int ret;
int ret;
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
valueSize = ThashLookup(xid, hashHeader, key, keySize, &value);
@ -271,11 +271,11 @@ compensated_function int ThashRemove(int xid, recordid hashHeader, const byte *
if(valueSize == -1) {
pthread_mutex_unlock(&linear_hash_mutex);
return 0;
return 0;
}
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
int argSize = sizeof(linearHash_remove_arg) + keySize + valueSize;
linearHash_remove_arg * arg = calloc(1,argSize);
arg->hashHeader = hashHeader;
@ -305,10 +305,10 @@ compensated_function static int __ThashRemove(int xid, recordid hashHeader, cons
Tread(xid, hashHeader, &lhh);
lhh.numEntries--;
Tset(xid, hashHeader, &lhh);
recordid bucket = lhh.buckets;
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &bucketList);
@ -319,7 +319,7 @@ compensated_function static int __ThashRemove(int xid, recordid hashHeader, cons
ret = TlinkedListRemove(xid, bucket, key, keySize);
}
} end_ret(compensation_error());
return ret;
}
@ -328,15 +328,15 @@ compensated_function int ThashLookup(int xid, recordid hashHeader, const byte *
hashHeader.size = sizeof(lladd_hash_header);
int ret;
// This whole thing is safe since the callee's do not modify global state...
// This whole thing is safe since the callee's do not modify global state...
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
Tread(xid, hashHeader, &lhh);
recordid bucket = lhh.buckets;
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &bucketList);
@ -346,7 +346,7 @@ compensated_function int ThashLookup(int xid, recordid hashHeader, const byte *
ret = TlinkedListFind(xid, bucket, key, keySize, value);
}
} compensate_ret(compensation_error());
return ret;
}
compensated_function static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh) {
@ -367,22 +367,22 @@ compensated_function static void ThashSplitBucket(int xid, recordid hashHeader,
} else {
byte * entry = calloc(1, lhh->buckets.size);
Tset(xid, new_bucket_rid, entry);
free(entry);
free(entry);
}
if(lhh->nextSplit < stasis_util_two_to_the(lhh->bits-1)-1) {
lhh->nextSplit++;
} else {
lhh->nextSplit = 0;
lhh->bits++;
lhh->bits++;
}
/** @todo linearHashNTA's split bucket should use the 'move' function call. */
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
recordid old_bucket_list;
Tread(xid, old_bucket_rid, &old_bucket_list);
lladd_pagedList_iterator * pit = TpagedListIterator(xid, old_bucket_list);
byte *key, *value;
int keySize, valueSize;
while(TpagedListNext(xid, pit, &key, &keySize, &value, &valueSize)) {
@ -445,9 +445,9 @@ lladd_hash_iterator * ThashIterator(int xid, recordid hashHeader, int keySize, i
} end_action_ret(NULL);
return it;
}
int ThashNext(int xid, lladd_hash_iterator * it, byte ** key, int * keySize, byte** value, int * valueSize) {
try_ret(0) {
try_ret(0) {
if(it->it) {
assert(!it->pit);
while(!TlinkedListNext(xid, it->it, key, keySize, value, valueSize)) {
@ -455,7 +455,7 @@ int ThashNext(int xid, lladd_hash_iterator * it, byte ** key, int * keySize, byt
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
TlinkedListClose(xid, it->it);
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
} else {
TlinkedListClose(xid, it->it);
it->it = 0;
@ -530,7 +530,7 @@ static void linearHashNTAIterator_close(int xid, void * impl) {
static int linearHashNTAIterator_next (int xid, void * impl) {
lladd_linearHashNTA_generic_it * it = impl;
if(it->lastKey) {
free(it->lastKey);
it->lastKey = NULL;
@ -561,13 +561,13 @@ static int linearHashNTAIterator_value(int xid, void * impl, byte ** value) {
//--------------------------------- async hash operations happen below here
typedef struct {
typedef struct {
int value_len;
int key_len;
} asyncHashInsert_t;
void ThashInsertAsync(int xid, lladdConsumer_t * cons, recordid hash, byte * value, int value_len, byte * key, int key_len) {
Tconsumer_push(xid, cons, key, key_len, value, value_len);
}
@ -575,20 +575,20 @@ void ThashInsertAsync(int xid, lladdConsumer_t * cons, recordid hash, byte * val
void ThashInsertConsume(int xid, recordid hash, lladdIterator_t * it) {
while(Titerator_next(xid, it)) {
byte * key;
byte * value;
int key_len = Titerator_key(xid, it, &key);
int value_len = Titerator_value(xid, it, &value);
ThashInsert(xid, hash, key, key_len, value, value_len);
Titerator_tupleDone(xid, it);
}
}
typedef struct {
typedef struct {
lladdIterator_t * it;
recordid hash;
int xid;
@ -610,11 +610,11 @@ void * ThashAsyncWorker(void * argp) {
}
/*lladdMultiplexer_t **/
lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
int mainFifoLen, int numFifos,
lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
int mainFifoLen, int numFifos,
int subFifoLen, int dirtyFifoLen,
lladdIterator_t ** dirtyIterator) {
lladdFifo_t * mainFifo = logMemoryFifo(mainFifoLen, 0);
lladdFifo_t * dirtyFifos = logMemoryFifo(dirtyFifoLen, 0);
lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(numFifos, subFifoLen, NULL, dirtyFifos);
@ -627,7 +627,7 @@ lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
lladdMultiplexer_start(mux, &attr);
int i = 0;
int i = 0;
for(i = 0; i < numWorkerThreads; i++) {
@ -635,8 +635,8 @@ lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
pthread_create(&thread, &attr, ThashAsyncWorker, mux->fifoPool->dirtyPoolFifo->iterator);
pthread_detach(thread);
}
if(dirtyIterator) {
if(dirtyIterator) {
*dirtyIterator = mux->fifoPool->dirtyPoolFifo->iterator;
}

View file

@ -56,7 +56,7 @@ void TlsmRegionForceRid(int xid, void *conf) {
a.regionList.slot = i;
pageid_t pid;
Tread(xid,a.regionList,&pid);
dirtyPages_flushRange(pid, pid+a.regionSize);
stasis_dirty_page_table_flush_range(stasis_dirty_page_table, pid, pid+a.regionSize);
// TregionDealloc(xid,pid);
}
}
@ -403,7 +403,7 @@ static recordid appendInternalNode(int xid, Page *p,
pageid_t val_page, pageid_t lastLeaf,
lsm_page_allocator_t allocator,
void *allocator_state) {
assert(*stasis_page_type_ptr(p) == LSM_ROOT_PAGE ||
assert(*stasis_page_type_ptr(p) == LSM_ROOT_PAGE ||
*stasis_page_type_ptr(p) == FIXED_PAGE);
if(!depth) {
// leaf node.

View file

@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
@ -49,7 +49,7 @@ terms specified in this license.
page types. This interface's primary purpose is to wrap common
functionality together, and to delegate responsibility for page
handling to other modules.
Latching summary:
Each page has an associated read/write lock. This lock only
@ -61,10 +61,10 @@ terms specified in this license.
Read LSN Read lock
Record write *READ LOCK*
Write LSN Write lock
Any circumstance where these locks are held during an I/O operation
is a bug.
*/
@ -93,6 +93,7 @@ terms specified in this license.
#include <stasis/truncation.h>
static page_impl page_impls[MAX_PAGE_TYPE];
static stasis_dirty_page_table_t * dirtyPages;
void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
assertlocked(page->rwlatch);
@ -100,7 +101,9 @@ void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
if(page->LSN < lsn) {
page->LSN = lsn;
}
dirtyPages_add(page);
// XXX probably should be handled by releasePage or something...
stasis_dirty_page_table_set_dirty(dirtyPages, page);
return;
}
@ -115,7 +118,8 @@ lsn_t stasis_page_lsn_read(const Page * page) {
* initializes all the important variables needed in
* all the functions dealing with pages.
*/
void stasis_page_init() {
void stasis_page_init(stasis_dirty_page_table_t * dpt) {
dirtyPages = dpt;
slottedPageInit();
fixedPageInit();
@ -216,7 +220,7 @@ void stasis_record_write_done(int xid, Page *p, recordid rid, byte *b) {
}
int stasis_record_type_read(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
if(page_impls[*stasis_page_type_ptr(p)].recordGetType)
if(page_impls[*stasis_page_type_ptr(p)].recordGetType)
return page_impls[*stasis_page_type_ptr(p)].recordGetType(xid, p, rid);
else
return INVALID_SLOT;
@ -300,12 +304,12 @@ void stasis_page_flushed(Page * p){
*stasis_page_lsn_ptr(p) = p->LSN;
}
}
void stasis_page_cleanup(Page * p) {
void stasis_page_cleanup(Page * p) {
short type = *stasis_page_type_ptr(p);
if(type) {
if(type) {
assert(page_impls[type].page_type == type);
page_impls[type].pageCleanup(p);
}
}
}
/// Generic block implementations

View file

@ -1,6 +1,7 @@
#include <stasis/page/raw.h>
#include <stasis/logger/logger2.h>
#include <stasis/truncation.h>
/**
@todo Should rawPageInferMetadata set a page type in the Page
struct?
@ -23,7 +24,8 @@ void rawPageSetData(int xid, lsn_t lsn, Page * p) {
assertlocked(p->rwlatch);
// writelock(p->rwlatch, 255);
rawPageWriteLSN(xid, p, lsn);
dirtyPages_add(p);
// XXX should be handled in releasePage.
stasis_dirty_page_table_set_dirty(stasis_dirty_page_table, p);
// unlock(p->rwlatch);
return;
}

View file

@ -1,13 +1,8 @@
#include <stasis/pageHandle.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <stasis/io/handle.h>
#include <stasis/pageHandle.h>
#include <stasis/bufferPool.h>
#include <stasis/logger/logger2.h>
#include <stasis/truncation.h>
#include <stasis/page.h>
/**
@todo Make sure this doesn't need to be atomic. (It isn't!) Can
@ -28,7 +23,7 @@ static void phWrite(stasis_page_handle_t * ph, Page * ret) {
fflush(stdout);
abort();
}
dirtyPages_remove(ret);
stasis_dirty_page_table_set_clean(ph->dirtyPages, ret);
unlock(ret->rwlatch);
}
static void phRead(stasis_page_handle_t * ph, Page * ret) {
@ -67,7 +62,7 @@ static void phClose(stasis_page_handle_t * ph) {
free(ph);
}
stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle,
stasis_log_t * log) {
stasis_log_t * log, stasis_dirty_page_table_t * dpt) {
DEBUG("Using pageHandle implementation\n");
stasis_page_handle_t * ret = malloc(sizeof(*ret));
ret->write = phWrite;
@ -76,6 +71,7 @@ stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle,
ret->force_range = phForceRange;
ret->close = phClose;
ret->log = log;
ret->dirtyPages = dpt;
ret->impl = handle;
return ret;
}

View file

@ -32,7 +32,9 @@ static int stasis_transaction_table_num_active = 0;
static int stasis_transaction_table_xid_count = 0;
static stasis_log_t* stasis_log_file = 0;
// XXX should be static!
stasis_dirty_page_table_t * stasis_dirty_page_table = 0;
static stasis_truncation_t * stasis_truncation = 0;
/**
This mutex protects stasis_transaction_table, numActiveXactions and
xidCount.
@ -53,62 +55,62 @@ void stasis_transaction_table_init() {
}
int Tinit() {
pthread_mutex_init(&stasis_transaction_table_mutex, NULL);
stasis_initted = 1;
stasis_transaction_table_num_active = 0;
pthread_mutex_init(&stasis_transaction_table_mutex, NULL);
stasis_initted = 1;
stasis_transaction_table_num_active = 0;
compensations_init();
compensations_init();
stasis_transaction_table_init();
stasis_operation_table_init();
dirtyPagesInit();
stasis_transaction_table_init();
stasis_operation_table_init();
stasis_dirty_page_table_init();
stasis_log_file = 0;
stasis_log_file = 0;
if(LOG_TO_FILE == stasis_log_type) {
stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name,
stasis_log_file_mode,
stasis_log_file_permissions);
stasis_log_file->group_force =
stasis_log_group_force_init(stasis_log_file, 10 * 1000 * 1000); // timeout in nsec; want 10msec.
} else if(LOG_TO_MEMORY == stasis_log_type) {
stasis_log_file = stasis_log_impl_in_memory_open();
stasis_log_file->group_force = 0;
} else {
assert(stasis_log_file != NULL);
}
if(LOG_TO_FILE == stasis_log_type) {
stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name,
stasis_log_file_mode,
stasis_log_file_permissions);
stasis_log_file->group_force =
stasis_log_group_force_init(stasis_log_file, 10 * 1000 * 1000); // timeout in nsec; want 10msec.
} else if(LOG_TO_MEMORY == stasis_log_type) {
stasis_log_file = stasis_log_impl_in_memory_open();
stasis_log_file->group_force = 0;
} else {
assert(stasis_log_file != NULL);
}
stasis_page_init();
stasis_page_handle_t * page_handle;
if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) {
printf("\nWarning: Using old I/O routines (with known bugs).\n");
page_handle = openPageFile(stasis_log_file);
} else {
stasis_handle_t * h = stasis_handle_open(stasis_store_file_name);
// XXX should not be global.
page_handle = stasis_page_handle_open(h, stasis_log_file);
}
stasis_dirty_page_table = stasis_dirty_page_table_init();
stasis_page_init(stasis_dirty_page_table);
stasis_page_handle_t * page_handle;
if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) {
printf("\nWarning: Using old I/O routines (with known bugs).\n");
page_handle = openPageFile(stasis_log_file, stasis_dirty_page_table);
} else {
stasis_handle_t * h = stasis_handle_open(stasis_store_file_name);
// XXX should not be global.
page_handle = stasis_page_handle_open(h, stasis_log_file, stasis_dirty_page_table);
}
stasis_buffer_manager_open(bufferManagerType, page_handle);
DEBUG("Buffer manager type = %d\n", bufferManagerType);
pageOperationsInit();
TallocInit();
TnaiveHashInit();
LinearHashNTAInit();
TlinkedListNTAInit();
iterator_init();
consumer_init();
setupLockManagerCallbacksNil();
//setupLockManagerCallbacksPage();
stasis_buffer_manager_open(bufferManagerType, page_handle);
DEBUG("Buffer manager type = %d\n", bufferManagerType);
pageOperationsInit();
TallocInit();
TnaiveHashInit();
LinearHashNTAInit();
TlinkedListNTAInit();
iterator_init();
consumer_init();
setupLockManagerCallbacksNil();
//setupLockManagerCallbacksPage();
stasis_recovery_initiate(stasis_log_file);
stasis_truncation_init();
if(stasis_truncation_automatic) {
// should this be before InitiateRecovery?
stasis_truncation_thread_start(stasis_log_file);
}
return 0;
stasis_recovery_initiate(stasis_log_file);
stasis_truncation = stasis_truncation_init(stasis_dirty_page_table, stasis_log_file);
if(stasis_truncation_automatic) {
// should this be before InitiateRecovery?
stasis_truncation_thread_start(stasis_truncation);
}
return 0;
}
@ -352,7 +354,7 @@ int Tdeinit() {
}
}
assert( stasis_transaction_table_num_active == 0 );
stasis_truncation_deinit();
stasis_truncation_deinit(stasis_truncation);
TnaiveHashDeinit();
TallocDeinit();
stasis_buffer_manager_close();
@ -361,7 +363,7 @@ int Tdeinit() {
stasis_log_group_force_t * group_force = stasis_log_file->group_force;
stasis_log_file->close(stasis_log_file);
if(group_force) { stasis_log_group_force_deinit(group_force); }
dirtyPagesDeinit();
stasis_dirty_page_table_deinit(stasis_dirty_page_table);
stasis_initted = 0;
@ -372,14 +374,14 @@ int TuncleanShutdown() {
// We're simulating a crash; don't complain when writes get lost,
// and active transactions get rolled back.
stasis_suppress_unclean_shutdown_warnings = 1;
stasis_truncation_deinit();
stasis_truncation_deinit(stasis_truncation);
TnaiveHashDeinit();
stasis_buffer_manager_simulate_crash();
// XXX: close_file?
stasis_page_deinit();
stasis_log_file->close(stasis_log_file);
stasis_transaction_table_num_active = 0;
dirtyPagesDeinit();
stasis_dirty_page_table_deinit(stasis_dirty_page_table);
// Reset it here so the warnings will appear if a new stasis
// instance encounters problems during a clean shutdown.
@ -491,7 +493,7 @@ int TdurabilityLevel() {
}
void TtruncateLog() {
stasis_truncation_truncate(stasis_log_file, 1);
stasis_truncation_truncate(stasis_truncation, 1);
}
typedef struct {
lsn_t prev_lsn;

View file

@ -1,183 +1,59 @@
#include <limits.h>
#include <stasis/truncation.h>
#include <pbl/pbl.h>
#include <stasis/logger/logger2.h>
#include <stasis/page.h>
#include <assert.h>
static int initialized = 0;
static int automaticallyTuncating = 0;
static pthread_t truncationThread;
static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t shutdown_cond = PTHREAD_COND_INITIALIZER;
static pblHashTable_t * dirtyPages = 0;
static pthread_mutex_t dirtyPages_mutex = PTHREAD_MUTEX_INITIALIZER;
struct stasis_truncation_t {
char initialized;
char automaticallyTruncating;
pthread_t truncationThread;
pthread_mutex_t shutdown_mutex;
pthread_cond_t shutdown_cond;
stasis_dirty_page_table_t * dirty_pages;
stasis_log_t * log;
};
#ifdef LONG_TEST
#define TARGET_LOG_SIZE (1024 * 1024 * 5)
#define TRUNCATE_INTERVAL 1
#define MIN_INCREMENTAL_TRUNCATION (1024 * 1024 * 1)
#else
#else
#define TARGET_LOG_SIZE (1024 * 1024 * 50)
#define TRUNCATE_INTERVAL 1
#define MIN_INCREMENTAL_TRUNCATION (1024 * 1024 * 25)
#endif
void dirtyPages_add(Page * p) {
pthread_mutex_lock(&dirtyPages_mutex);
if(!p->dirty) {
p->dirty = 1;
//assert(p->LSN);
void* ret = pblHtLookup(dirtyPages, &(p->id), sizeof(p->id));
assert(!ret);
lsn_t * insert = malloc(sizeof(lsn_t));
*insert = p->LSN;
pblHtInsert(dirtyPages, &(p->id), sizeof(p->id), insert); //(void*)p->LSN);
}
pthread_mutex_unlock(&dirtyPages_mutex);
}
void dirtyPages_remove(Page * p) {
pthread_mutex_lock(&dirtyPages_mutex);
// printf("Removing page %d\n", p->id);
//assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int)));
// printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages));
p->dirty = 0;
lsn_t * old = pblHtLookup(dirtyPages, &(p->id),sizeof(p->id));
pblHtRemove(dirtyPages, &(p->id), sizeof(p->id));
if(old) {
free(old);
}
//assert(!ret); <--- Due to a bug in the PBL compatibility mode,
//there is no way to tell whether the value didn't exist, or if it
//was null.
pthread_mutex_unlock(&dirtyPages_mutex);
}
int dirtyPages_isDirty(Page * p) {
int ret;
pthread_mutex_lock(&dirtyPages_mutex);
ret = p->dirty;
pthread_mutex_unlock(&dirtyPages_mutex);
stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_log_t * log) {
stasis_truncation_t * ret = malloc(sizeof(*ret));
ret->initialized = 1;
ret->automaticallyTruncating = 0;
pthread_mutex_init(&ret->shutdown_mutex, 0);
pthread_cond_init(&ret->shutdown_cond, 0);
ret->dirty_pages = dpt;
ret->log = log;
return ret;
}
static lsn_t dirtyPages_minRecLSN() {
lsn_t lsn = LSN_T_MAX; // LogFlushedLSN ();
pageid_t* pageid;
pthread_mutex_lock(&dirtyPages_mutex);
for( pageid = (pageid_t*)pblHtFirst (dirtyPages); pageid; pageid = (pageid_t*)pblHtNext(dirtyPages)) {
lsn_t * thisLSN = (lsn_t*) pblHtCurrent(dirtyPages);
// printf("lsn = %d\n", thisLSN);
if(*thisLSN < lsn) {
lsn = *thisLSN;
}
}
pthread_mutex_unlock(&dirtyPages_mutex);
return lsn;
}
static void dirtyPages_flush() {
pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE));
int i;
for(i = 0; i < MAX_BUFFER_SIZE; i++) {
staleDirtyPages[i] = -1;
}
Page* p = 0;
pthread_mutex_lock(&dirtyPages_mutex);
void* tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages); tmp; tmp = pblHtNext(dirtyPages)) {
staleDirtyPages[i] = *((pageid_t*) pblHtCurrentKey(dirtyPages));
i++;
}
assert(i < MAX_BUFFER_SIZE);
pthread_mutex_unlock(&dirtyPages_mutex);
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = loadPage(-1, staleDirtyPages[i]);
writeBackPage(p);
releasePage(p);
}
free(staleDirtyPages);
}
void dirtyPages_flushRange(pageid_t start, pageid_t stop) {
pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE));
int i;
Page * p = 0;
pthread_mutex_lock(&dirtyPages_mutex);
void *tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages); tmp; tmp = pblHtNext(dirtyPages)) {
pageid_t num = *((pageid_t*) pblHtCurrentKey(dirtyPages));
if(num <= start && num < stop) {
staleDirtyPages[i] = num;
i++;
}
}
staleDirtyPages[i] = -1;
pthread_mutex_unlock(&dirtyPages_mutex);
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = loadPage(-1, staleDirtyPages[i]);
writeBackPage(p);
releasePage(p);
}
free(staleDirtyPages);
forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE);
}
void dirtyPagesInit() {
dirtyPages = pblHtCreate();
}
void dirtyPagesDeinit() {
void * tmp;
int areDirty = 0;
for(tmp = pblHtFirst(dirtyPages); tmp; tmp = pblHtNext(dirtyPages)) {
free(pblHtCurrent(dirtyPages));
if((!areDirty) &&
(!stasis_suppress_unclean_shutdown_warnings)) {
printf("Warning: dirtyPagesDeinit detected dirty, unwritten pages. "
"Updates lost?\n");
areDirty = 1;
}
}
pblHtDelete(dirtyPages);
dirtyPages = 0;
}
void stasis_truncation_init() {
initialized = 1;
}
void stasis_truncation_deinit() {
pthread_mutex_lock(&shutdown_mutex);
initialized = 0;
if(automaticallyTuncating) {
void stasis_truncation_deinit(stasis_truncation_t * trunc) {
pthread_mutex_lock(&trunc->shutdown_mutex);
trunc->initialized = 0;
if(trunc->automaticallyTruncating) {
void * ret = 0;
pthread_mutex_unlock(&shutdown_mutex);
pthread_cond_broadcast(&shutdown_cond);
pthread_join(truncationThread, &ret);
} else {
pthread_mutex_unlock(&shutdown_mutex);
pthread_mutex_unlock(&trunc->shutdown_mutex);
pthread_cond_broadcast(&trunc->shutdown_cond);
pthread_join(trunc->truncationThread, &ret);
} else {
pthread_mutex_unlock(&trunc->shutdown_mutex);
}
automaticallyTuncating = 0;
trunc->automaticallyTruncating = 0;
pthread_mutex_destroy(&trunc->shutdown_mutex);
pthread_cond_destroy(&trunc->shutdown_cond);
free(trunc);
}
static void* stasis_truncation_thread_worker(void* logp) {
stasis_log_t * log = logp;
pthread_mutex_lock(&shutdown_mutex);
while(initialized) {
if(log->first_unstable_lsn(log, LOG_FORCE_WAL) - log->truncation_point(log)
static void* stasis_truncation_thread_worker(void* truncp) {
stasis_truncation_t * trunc = truncp;
pthread_mutex_lock(&trunc->shutdown_mutex);
while(trunc->initialized) {
if(trunc->log->first_unstable_lsn(trunc->log, LOG_FORCE_WAL) - trunc->log->truncation_point(trunc->log)
> TARGET_LOG_SIZE) {
stasis_truncation_truncate(log, 0);
stasis_truncation_truncate(trunc, 0);
}
struct timeval now;
struct timespec timeout;
@ -188,20 +64,20 @@ static void* stasis_truncation_thread_worker(void* logp) {
timeout.tv_nsec = now.tv_usec;
timeout.tv_sec += TRUNCATE_INTERVAL;
pthread_cond_timedwait(&shutdown_cond, &shutdown_mutex, &timeout);
pthread_cond_timedwait(&trunc->shutdown_cond, &trunc->shutdown_mutex, &timeout);
}
pthread_mutex_unlock(&shutdown_mutex);
pthread_mutex_unlock(&trunc->shutdown_mutex);
return (void*)0;
}
void stasis_truncation_thread_start(stasis_log_t* log) {
assert(!automaticallyTuncating);
automaticallyTuncating = 1;
pthread_create(&truncationThread, 0, &stasis_truncation_thread_worker, log);
void stasis_truncation_thread_start(stasis_truncation_t* trunc) {
assert(!trunc->automaticallyTruncating);
trunc->automaticallyTruncating = 1;
pthread_create(&trunc->truncationThread, 0, &stasis_truncation_thread_worker, trunc);
}
int stasis_truncation_truncate(stasis_log_t* log, int force) {
int stasis_truncation_truncate(stasis_truncation_t* trunc, int force) {
// *_minRecLSN() used to return the same value as flushed if
//there were no outstanding transactions, but flushed might
@ -209,36 +85,36 @@ int stasis_truncation_truncate(stasis_log_t* log, int force) {
//LSN_T_MAX if there are no outstanding transactions / no
//dirty pages.
lsn_t page_rec_lsn = dirtyPages_minRecLSN();
lsn_t page_rec_lsn = stasis_dirty_page_table_minRecLSN(trunc->dirty_pages);
lsn_t xact_rec_lsn = stasis_transaction_table_minRecLSN();
lsn_t flushed_lsn = log->first_unstable_lsn(log, LOG_FORCE_WAL);
lsn_t flushed_lsn = trunc->log->first_unstable_lsn(trunc->log, LOG_FORCE_WAL);
lsn_t rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
lsn_t log_trunc = log->truncation_point(log);
lsn_t log_trunc = trunc->log->truncation_point(trunc->log);
if(force || (xact_rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
//fprintf(stderr, "xact = %ld \t log = %ld\n", xact_rec_lsn, log_trunc);
if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
// fprintf(stderr, "Truncating now. rec_lsn = %ld, log_trunc = %ld\n", rec_lsn, log_trunc);
// fprintf(stderr, "Truncating to rec_lsn = %ld\n", rec_lsn);
forcePages();
log->truncate(log, rec_lsn);
trunc->log->truncate(trunc->log, rec_lsn);
return 1;
} else {
lsn_t flushed = log->first_unstable_lsn(log, LOG_FORCE_WAL);
lsn_t flushed = trunc->log->first_unstable_lsn(trunc->log, LOG_FORCE_WAL);
if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) {
//fprintf(stderr, "Flushing dirty buffers: rec_lsn = %ld log_trunc = %ld flushed = %ld\n", rec_lsn, log_trunc, flushed);
dirtyPages_flush();
stasis_dirty_page_table_flush(trunc->dirty_pages);
page_rec_lsn = dirtyPages_minRecLSN();
page_rec_lsn = stasis_dirty_page_table_minRecLSN(trunc->dirty_pages);
rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
//fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn);
forcePages();
log->truncate(log, rec_lsn);
trunc->log->truncate(trunc->log, rec_lsn);
return 1;
} else {
return 0;

View file

@ -5,6 +5,6 @@
#include <stasis/pageHandle.h>
#include <stasis/logger/logger2.h>
stasis_page_handle_t* openPageFile(stasis_log_t * log);
stasis_page_handle_t* openPageFile(stasis_log_t * log, stasis_dirty_page_table_t * dirtyPages);
#endif /* __PAGE_FILE_H */

32
stasis/dirtyPageTable.h Normal file
View file

@ -0,0 +1,32 @@
/*
* dirtyPageTable.h
*
* Created on: May 18, 2009
* Author: sears
*/
#ifndef DIRTYPAGETABLE_H_
#define DIRTYPAGETABLE_H_
BEGIN_C_DECLS
typedef struct stasis_dirty_page_table_t stasis_dirty_page_table_t;
stasis_dirty_page_table_t * stasis_dirty_page_table_init();
void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages);
void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p);
void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p);
int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p);
void stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages);
lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t* dirtyPages);
/**
@todo flushRange's API sucks. It should be two functions, "startRangeFlush" and "waitRangeFlushes" or something.
*/
void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop);
END_C_DECLS
#endif /* DIRTYPAGETABLE_H_ */

View file

@ -297,6 +297,9 @@ stasis_handle_t * stasis_handle(open_pfile)
open_file), while a single truely concurrent
handle (@see open_pfile) should suffice.
@param slow_factory_close A callback that will be called when this
Stasis handle is closed.
@param slow_factory_arg A pointer to data that will be passed into
slow_factory.

View file

@ -93,7 +93,7 @@ namespace rose {
rose::slot_index_t ret = mc->append(xid, *i);
if(ret == rose::NOSPACE) {
dirtyPages_add(p);
stasis_dirty_page_table_set_dirty(stasis_dirty_page_table, p);
mc->pack();
unlock(p->rwlatch);
releasePage(p);
@ -108,7 +108,7 @@ namespace rose {
}
(*inserted)++;
}
dirtyPages_add(p);
stasis_dirty_page_table_set_dirty(stasis_dirty_page_table, p);
mc->pack();
unlock(p->rwlatch);
releasePage(p);
@ -305,7 +305,7 @@ namespace rose {
double target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size))
/ ((C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio)));
printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n",
printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n",
((double)(*a->out_tree_size/*+*a->my_tree_size*/)) / ((double)*a->my_tree_size),
((double)*a->my_tree_size) / ((double)(C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio)),
target_R);
@ -370,7 +370,7 @@ namespace rose {
TlsmFree(xid,
(**(typename ITERA::handle**)a->in_tree)->r_,
TlsmRegionDeallocRid,a->in_tree_allocer);
DEBUG("%d freed C?: (in_tree) %lld\n", PAGELAYOUT::FMT::TUP::NN,
DEBUG("%d freed C?: (in_tree) %lld\n", PAGELAYOUT::FMT::TUP::NN,
(**(typename ITERA::handle**)a->in_tree)->r_.page);
} else {
@ -692,7 +692,7 @@ namespace rose {
pthread_mutex_unlock(h->mut);
if( (handleBytes > memSizeThresh / 2)
if( (handleBytes > memSizeThresh / 2)
&& ( go || handleBytes > memSizeThresh ) ) { // XXX ok?
printf("Handle mbytes %lld (%lld) Input size: %lld input size thresh: %lld mbytes mem size thresh: %lld\n",
(long long) handleBytes / (1024*1024), (long long) h->scratch_tree->size(), (long long) *h->input_size,
@ -791,7 +791,7 @@ namespace rose {
M_LSM_LSM_LSM_M_RB_RB m12345(m123,m45,m123end,m45end);
M_LSM_LSM_LSM_M_RB_RB m12345end(m123,m45,m123end,m45end);
m12345end.seekEnd();
m12345end.seekEnd();
while(m12345 != m12345end) {
*m12345;

View file

@ -104,6 +104,7 @@ terms specified in this license.
#include <stasis/common.h>
#include <stasis/constants.h>
#include <stasis/latches.h>
#include <stasis/dirtyPageTable.h>
#include <assert.h>
@ -433,7 +434,7 @@ stasis_page_pageid_t_cptr_from_end(const Page *p, int count) {
*
* @todo documentation group for page init and deinit?
*/
void stasis_page_init();
void stasis_page_init(stasis_dirty_page_table_t * dirtyPages);
/**
* releases all resources held by the page sub-system.
*/

View file

@ -73,19 +73,27 @@ struct stasis_page_handle_t {
stasis_log_force on this to maintain the write-ahead invariant.
*/
stasis_log_t * log;
/**
The dirty page table associated with this page handle.
If this is non-null, stasis_page_handle will keep the dirty page table up-to-date.
*/
stasis_dirty_page_table_t * dirtyPages;
/**
* Pointer to implementation-specific state.
*/
void * impl;
};
/**
Open a stasis page handle.
Open a Stasis page handle.
@param handle A stasis_handle_t that will perform I/O to the page file.
@param log A stasis_log_t that will be used to maintain the write ahead invariant.
If null, then write ahead will not be maintained.
@param dirtyPages A stasis_dirty_page_table that will be updated as pages are written back.
@return a handle that performs high-level (page based, write-ahead) page file I/O.
*/
stasis_page_handle_t * stasis_page_handle_open(struct stasis_handle_t * handle, stasis_log_t * log);
stasis_page_handle_t * stasis_page_handle_open(struct stasis_handle_t * handle,
stasis_log_t * log, stasis_dirty_page_table_t * dirtyPages);
#endif //STASIS_PAGEHANDLE_H

View file

@ -42,50 +42,43 @@ terms specified in this license.
/**
* @file
*
* Implementation of log truncation for lladd.
*
* Implementation of log truncation for Stasis
*
* @todo TRUNCATE_INTERVAL should be dynamically set...
* @todo log truncation policy should be set of the percentage of the log that can be truncated
* (instead of by absolute logfile size)
* @todo avoid copying the non-truncated tail of the log each time truncation occurs.
*
*
* $Id$
*
*
*/
#include <stasis/transactional.h>
BEGIN_C_DECLS
#ifndef __LLADD_TRUNCATION_H
#define __LLADD_TRUNCATION_H 1
#ifndef STASIS_TRUNCATION_H
#define STASIS_TRUNCATION_H
typedef struct stasis_truncation_t stasis_truncation_t;
#include <stasis/logger/logger2.h>
#include <stasis/dirtyPageTable.h>
void dirtyPagesInit();
void dirtyPagesDeinit();
void dirtyPages_add(Page * p);
void dirtyPages_remove(Page * p);
int dirtyPages_isDirty(Page * p);
/**
@todo flushRange's API sucks. It should be two functions, "startRangeFlush" and "waitRangeFlushes" or something.
@todo flushRange has nothing to do with the dirty pages api, or truncation.
*/
void dirtyPages_flushRange(pageid_t start, pageid_t stop);
void stasis_truncation_init();
void stasis_truncation_deinit();
stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_log_t * log);
void stasis_truncation_deinit(stasis_truncation_t * trunc);
/**
Spawn a periodic, demand-based log truncation thread.
*/
void stasis_truncation_thread_start();
void stasis_truncation_thread_start(stasis_truncation_t* trunc);
/**
Initiate a round of log truncation.
*/
int stasis_truncation_truncate(stasis_log_t* log, int force);
int stasis_truncation_truncate(stasis_truncation_t* trunc, int force);
/**
* XXX if releasePage kept the dirty page table up to date, it would greatly reduce the number of places where the dirty page table is updated.
*/
extern stasis_dirty_page_table_t * stasis_dirty_page_table;
END_C_DECLS
#endif

View file

@ -287,7 +287,7 @@ void * blindRandomWorker(void * v) {
pages[j] = loadPage(-1, pageids[j]);
assert(pages[j]->id == pageids[j]);
} else {
// dirtyPages_add(pages[j]); // Shouldn't affect buffermanager too much...
// stasis_dirty_page_table_set_dirty(pages[j]); // Shouldn't affect buffermanager too much...
assert(pages[j]->id == pageids[j]);
releasePage(pages[j]);
pageids[j] = -1;