buffer manager implementation is now determined by an overridable factory method

This commit is contained in:
Sears Russell 2009-10-05 22:39:09 +00:00
parent 24650fa190
commit 6a02f2f159
13 changed files with 62 additions and 50 deletions

View file

@ -4,6 +4,7 @@
#include <stasis/transactional.h>
#include <stasis/bufferManager.h>
#include <stasis/bufferManager/legacy/legacyBufferManager.h>
#include <stasis/truncation.h>
/*static stasis_handle_t * memory_factory(lsn_t off, lsn_t len, void * ignored) {
@ -42,7 +43,7 @@ int main(int argc, char ** argv) {
direct = 1;
bufferManagerO_DIRECT = 1;
} else if(!strcmp(argv[i], "--deprecatedBM")) {
bufferManagerType = BUFFER_MANAGER_DEPRECATED_HASH;
stasis_buffer_manager_factory = stasis_buffer_manager_deprecated_factory;
legacyBM = 1;
} else if(!strcmp(argv[i], "--deprecatedFH")) {
bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_DEPRECATED;

View file

@ -35,6 +35,8 @@ typedef struct {
replacementPolicy *lru;
stasis_buffer_pool_t *buffer_pool;
stasis_page_handle_t *page_handle;
stasis_dirty_page_table_t *dpt;
stasis_log_t *log;
int flushing;
int running;
} stasis_buffer_hash_t;
@ -56,9 +58,9 @@ static inline struct Page_s ** pagePendingPtr(Page * p) {
static inline intptr_t* pagePinCountPtr(Page * p) {
return ((intptr_t*)(&((p)->queue)));
}
static inline int needFlush() {
// XXX need to remove call to stasis_runtime*
pageid_t count = stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table());
static inline int needFlush(stasis_buffer_manager_t * bm) {
stasis_buffer_hash_t *bh = bm->impl;
pageid_t count = stasis_dirty_page_table_dirty_count(bh->dpt);
const pageid_t needed = 1000; //MAX_BUFFER_SIZE / 5;
if(count > needed) {
DEBUG("Need flush? Dirty: %lld Total: %lld ret = %d\n", count, needed, count > needed);
@ -143,7 +145,7 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) {
DEBUG("Blocking app thread");
// We don't really care if this flush happens, so long as *something* is being written back, so ignore the EAGAIN it could return.
// (Besides, once this returns EAGAIN twice, we know that some other flush concurrently was initiated + returned, so we're good to go...)
stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table());
stasis_dirty_page_table_flush(bh->dpt);
pthread_mutex_lock(&bh->mut);
} else {
break;
@ -166,18 +168,18 @@ static void * writeBackWorker(void * bmp) {
stasis_buffer_hash_t * bh = bm->impl;
pthread_mutex_lock(&bh->mut);
while(1) {
while(bh->running && (!needFlush())) {
while(bh->running && (!needFlush(bm))) {
bh->flushing = 0;
DEBUG("Sleeping in write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()));
DEBUG("Sleeping in write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(bh->dpt));
pthread_cond_wait(&bh->needFree, &bh->mut);
DEBUG("Woke write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()));
DEBUG("Woke write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(bh->dpt));
bh->flushing = 1;
}
if(!bh->running) { break; }
pthread_mutex_unlock(&bh->mut);
DEBUG("Calling flush\n");
// ignore ret val; this flush is for performance, not correctness.
stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); // XXX no call to stasis_runtime_*
stasis_dirty_page_table_flush(bh->dpt);
pthread_mutex_lock(&bh->mut);
}
pthread_mutex_unlock(&bh->mut);
@ -323,7 +325,7 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const
pthread_cond_broadcast(&bh->readComplete);
// TODO Improve writeback policy
if((!bh->flushing) && needFlush()) {
if((!bh->flushing) && needFlush(bm)) {
pthread_cond_signal(&bh->needFree);
}
assert(ret->id == pageid);
@ -378,7 +380,7 @@ static void bhBufDeinit(stasis_buffer_manager_t * bm) {
pthread_join(bh->worker, 0);
// XXX flush range should return an error number, which we would check. (Right now, it aborts...)
int ret = stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table());
int ret = stasis_dirty_page_table_flush(bh->dpt);
assert(!ret); // currently the only return value that we'll see is EAGAIN, which means a concurrent thread is in writeback... That should never be the case!
struct LH_ENTRY(list) iter;
@ -389,7 +391,7 @@ static void bhBufDeinit(stasis_buffer_manager_t * bm) {
assertunlocked(p->rwlatch);
assert(0 == *pagePinCountPtr(p));
readlock(p->rwlatch,0);
assert(!stasis_dirty_page_table_is_dirty(stasis_runtime_dirty_page_table(), p));
assert(!stasis_dirty_page_table_is_dirty(bh->dpt, p));
unlock(p->rwlatch);
stasis_page_cleanup(p); // normally called by writeBackOnePage()
}
@ -437,7 +439,7 @@ static void bhSimulateBufferManagerCrash(stasis_buffer_manager_t *bm) {
free(bh);
}
stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t * h) {
stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t * h, stasis_log_t * log, stasis_dirty_page_table_t * dpt) {
stasis_buffer_manager_t *bm = malloc(sizeof(*bm));
stasis_buffer_hash_t *bh = malloc(sizeof(*bh));
@ -454,6 +456,8 @@ stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t *
bm->impl = bh;
bh->page_handle = h;
bh->log = log;
bh->dpt = dpt;
bh->running = 0;
#ifdef LONG_RUN
@ -482,3 +486,8 @@ stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t *
return bm;
}
stasis_buffer_manager_t* stasis_buffer_manager_hash_factory(stasis_log_t *log, stasis_dirty_page_table_t *dpt) {
stasis_page_handle_t *ph = stasis_page_handle_default_factory(log, dpt);
return stasis_buffer_manager_hash_open(ph, log, dpt);
}

View file

@ -407,3 +407,7 @@ static compensated_function Page *bufManLoadUninitPage(stasis_buffer_manager_t *
return ret;
}
stasis_buffer_manager_t* stasis_buffer_manager_deprecated_factory(stasis_log_t *log, stasis_dirty_page_table_t *dpt) {
stasis_page_handle_t * ph = stasis_page_handle_default_factory(log, dpt);
return stasis_buffer_manager_deprecated_open(ph);
}

View file

@ -91,3 +91,6 @@ stasis_buffer_manager_t * stasis_buffer_manager_mem_array_open () {
pthread_mutex_init(&pa->mut,0);
return bm;
}
stasis_buffer_manager_t* stasis_buffer_manager_mem_array_factory(stasis_log_t * log, stasis_dirty_page_table_t *dpt) {
return stasis_buffer_manager_mem_array_open();
}

View file

@ -2,14 +2,18 @@
#include <stasis/flags.h>
#include <stasis/constants.h>
#include <stasis/bufferManager/bufferHash.h>
#include <stasis/bufferManager/pageArray.h>
#include <stasis/bufferManager/legacy/legacyBufferManager.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#ifdef BUFFER_MANAGER_TYPE
int bufferManagerType = BUFFER_MANAGER_TYPE;
#ifdef STASIS_BUFFER_MANAGER_FACTORY
int stasis_buffer_manager_factory = STASIS_BUFFER_MANAGER_FACTORY;
#else
int bufferManagerType = BUFFER_MANAGER_HASH;
stasis_buffer_manager_t * (*stasis_buffer_manager_factory)(stasis_log_t*, stasis_dirty_page_table_t*) = stasis_buffer_manager_hash_factory;
#endif
#ifdef BUFFER_MANAGER_O_DIRECT

View file

@ -68,25 +68,16 @@ void * stasis_runtime_alloc_state() {
return stasis_alloc;
}
static stasis_buffer_manager_t* stasis_runtime_buffer_manager_open(int type, stasis_page_handle_t * ph) {
bufferManagerType = type;
static int lastType = 0;
if(type == BUFFER_MANAGER_REOPEN) {
type = lastType;
}
lastType = type;
if(type == BUFFER_MANAGER_DEPRECATED_HASH) {
return stasis_buffer_manager_deprecated_open(ph);
} else if (type == BUFFER_MANAGER_MEM_ARRAY) {
stasis_buffer_manager_t *ret = stasis_buffer_manager_mem_array_open();
ph->close(ph); // XXX should never have been opened in the first place!
return ret;
} else if (type == BUFFER_MANAGER_HASH) {
return stasis_buffer_manager_hash_open(ph);
stasis_page_handle_t* stasis_page_handle_default_factory(stasis_log_t *log, stasis_dirty_page_table_t *dpt) {
stasis_page_handle_t * ret;
if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) {
printf("\nWarning: Using old I/O routines (with known bugs).\n");
ret = openPageFile(log, dpt);
} else {
// XXX error handling
abort();
stasis_handle_t * h = stasis_handle_open(stasis_store_file_name);
ret = stasis_page_handle_open(h, log, dpt);
}
return ret;
}
int Tinit() {
@ -117,22 +108,14 @@ int Tinit() {
stasis_dirty_page_table = stasis_dirty_page_table_init();
stasis_page_init(stasis_dirty_page_table);
stasis_page_handle_t * page_handle;
if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) {
printf("\nWarning: Using old I/O routines (with known bugs).\n");
page_handle = openPageFile(stasis_log_file, stasis_dirty_page_table);
} else {
stasis_handle_t * h = stasis_handle_open(stasis_store_file_name);
// XXX should not be global.
page_handle = stasis_page_handle_open(h, stasis_log_file, stasis_dirty_page_table);
}
stasis_buffer_manager = stasis_runtime_buffer_manager_open(bufferManagerType, page_handle);
DEBUG("Buffer manager type = %d\n", bufferManagerType);
stasis_buffer_manager = stasis_buffer_manager_factory(stasis_log_file, stasis_dirty_page_table);
stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table, stasis_buffer_manager); // xxx circular dependency.
pageOperationsInit();
stasis_allocation_policy = stasis_allocation_policy_init();
stasis_alloc = stasis_alloc_init(stasis_allocation_policy);
TnaiveHashInit();
LinearHashNTAInit();
BtreeInit();
@ -523,7 +506,7 @@ int stasis_transaction_table_forget(int xid) {
}
int TdurabilityLevel() {
if(bufferManagerType == BUFFER_MANAGER_MEM_ARRAY) {
if(stasis_buffer_manager_factory == BUFFER_MANAGER_MEM_ARRAY) {
return VOLATILE;
} else if(stasis_log_type == LOG_TO_MEMORY) {
return PERSISTENT;

View file

@ -2,5 +2,6 @@
#define STASIS_BUFFERMANAGER_BUFFERHASH_H
#include <stasis/bufferManager.h>
#include <stasis/pageHandle.h>
stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t* ph);
stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t *ph, stasis_log_t *log, stasis_dirty_page_table_t *dpt);
stasis_buffer_manager_t* stasis_buffer_manager_hash_factory(stasis_log_t *log, stasis_dirty_page_table_t *dpt);
#endif //STASIS_BUFFERMANAGER_BUFFERHASH_H

View file

@ -2,4 +2,5 @@
#define STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H
#include <stasis/pageHandle.h>
stasis_buffer_manager_t* stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph);
stasis_buffer_manager_t* stasis_buffer_manager_deprecated_factory(stasis_log_t *log, stasis_dirty_page_table_t *dpt);
#endif//STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H

View file

@ -1,2 +1,3 @@
#include <stasis/bufferManager.h>
stasis_buffer_manager_t* stasis_buffer_manager_mem_array_open();
stasis_buffer_manager_t* stasis_buffer_manager_mem_array_factory(stasis_log_t * log, stasis_dirty_page_table_t *dpt);

View file

@ -91,7 +91,6 @@ terms specified in this license.
/*#define MAX_BUFFER_SIZE 7 */
#endif
#define BUFFER_MANAGER_REOPEN 0
#define BUFFER_MANAGER_HASH 1
#define BUFFER_MANAGER_MEM_ARRAY 2
#define BUFFER_MANAGER_DEPRECATED_HASH 3

View file

@ -1,5 +1,8 @@
#ifndef _STASIS_FLAGS_H__
#define _STASIS_FLAGS_H__
#include <stasis/bufferManager.h>
#include <stasis/logger/logger2.h>
#include <stasis/dirtyPageTable.h>
/**
This is the type of buffer manager that is being used.
@ -13,7 +16,7 @@
(The constants are named BUFFER_MANAGER_*)
*/
extern int bufferManagerType;
extern stasis_buffer_manager_t* (*stasis_buffer_manager_factory)(stasis_log_t*, stasis_dirty_page_table_t*);
/**
This determines which type of file handle the buffer manager will use.

View file

@ -100,4 +100,5 @@ struct stasis_page_handle_t {
stasis_page_handle_t * stasis_page_handle_open(struct stasis_handle_t * handle,
stasis_log_t * log, stasis_dirty_page_table_t * dirtyPages);
stasis_page_handle_t* stasis_page_handle_default_factory(stasis_log_t *log, stasis_dirty_page_table_t *dpt);
#endif //STASIS_PAGEHANDLE_H

View file

@ -50,6 +50,8 @@ terms specified in this license.
#include <stasis/transactional.h>
#include <stasis/bufferManager.h>
#include <stasis/bufferManager/pageArray.h>
#include <stasis/dirtyPageTable.h>
#include <sys/time.h>
@ -110,13 +112,13 @@ void * worker(void*arg) {
return 0;
}
START_TEST(dirtyPageTable_randomTest) {
bufferManagerType = BUFFER_MANAGER_MEM_ARRAY;
stasis_buffer_manager_factory = stasis_buffer_manager_mem_array_factory;
Tinit();
worker(0);
Tdeinit();
} END_TEST
START_TEST(dirtyPageTable_threadTest) {
bufferManagerType = BUFFER_MANAGER_MEM_ARRAY;
stasis_buffer_manager_factory = stasis_buffer_manager_mem_array_factory;
Tinit();
pthread_t thread[NUM_WORKERS];
for(int i = 0; i < NUM_WORKERS; i++) {