Preliminary force_range implementation; lsmTable leaks significantly less space now.

This commit is contained in:
Sears Russell 2007-11-11 23:22:21 +00:00
parent 95d7d3cd5f
commit 87a70f29bc
18 changed files with 285 additions and 23 deletions

View file

@ -209,7 +209,7 @@ AC_FUNC_MEMCMP
AC_FUNC_REALLOC AC_FUNC_REALLOC
AC_FUNC_STAT AC_FUNC_STAT
AC_FUNC_VPRINTF AC_FUNC_VPRINTF
AC_CHECK_FUNCS([bzero fdatasync ftruncate getcwd gettimeofday inet_ntoa localtime_r memmove memset mkdir posix_memalign pow powl socket sqrt strchr strdup strerror strrchr strstr strtol strtoul tcase_set_timeout]) AC_CHECK_FUNCS([bzero fdatasync floor ftruncate getcwd gettimeofday inet_ntoa localtime_r memmove memset mkdir posix_memalign pow powl socket sqrt strchr strdup strerror strrchr strstr strtol strtoul sync_file_range tcase_set_timeout])
#AC_CONFIG_LIBMYSQLD #AC_CONFIG_LIBMYSQLD

View file

@ -120,6 +120,7 @@ static int bufManBufInit() {
loadPageImpl = bufManLoadPage; loadPageImpl = bufManLoadPage;
writeBackPage = pageWrite; writeBackPage = pageWrite;
forcePages = forcePageFile; forcePages = forcePageFile;
forcePageRange = forceRangePageFile;
bufDeinit = bufManBufDeinit; bufDeinit = bufManBufDeinit;
simulateBufferManagerCrash = bufManSimulateBufferManagerCrash; simulateBufferManagerCrash = bufManSimulateBufferManagerCrash;
@ -497,6 +498,7 @@ Page * (*loadPageImpl)(int xid, int pageid) = 0;
void (*releasePageImpl)(Page * p) = 0; void (*releasePageImpl)(Page * p) = 0;
void (*writeBackPage)(Page * p) = 0; void (*writeBackPage)(Page * p) = 0;
void (*forcePages)() = 0; void (*forcePages)() = 0;
void (*forcePageRange)() = 0;
void (*bufDeinit)() = 0; void (*bufDeinit)() = 0;
void (*simulateBufferManagerCrash)() = 0; void (*simulateBufferManagerCrash)() = 0;

View file

@ -267,6 +267,9 @@ static void bhWriteBackPage(Page * p) {
static void bhForcePages() { static void bhForcePages() {
forcePageFile(); forcePageFile();
} }
static void bhForcePageRange() {
forceRangePageFile();
}
static void bhBufDeinit() { static void bhBufDeinit() {
running = 0; running = 0;
@ -325,6 +328,7 @@ void bhBufInit() {
releasePageImpl = bhReleasePage; releasePageImpl = bhReleasePage;
writeBackPage = bhWriteBackPage; writeBackPage = bhWriteBackPage;
forcePages = bhForcePages; forcePages = bhForcePages;
forcePageRange = bhForcePageRange;
bufDeinit = bhBufDeinit; bufDeinit = bhBufDeinit;
simulateBufferManagerCrash = bhSimulateBufferManagerCrash; simulateBufferManagerCrash = bhSimulateBufferManagerCrash;

View file

@ -152,6 +152,13 @@ static int debug_force(stasis_handle_t *h) {
printf("tid=%9ld retn force(%lx) = %d\n", pthread_self(), (unsigned long)hh, ret); fflush(stdout); printf("tid=%9ld retn force(%lx) = %d\n", pthread_self(), (unsigned long)hh, ret); fflush(stdout);
return ret; return ret;
} }
static int debug_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call force(%lx,%lld,%lld)\n", pthread_self(), (unsigned long)hh, start, stop); fflush(stdout);
int ret = hh->force_range(hh, start, stop);
printf("tid=%9ld retn force(%lx) = %d\n", pthread_self(), (unsigned long)hh, ret); fflush(stdout);
return ret;
}
static int debug_truncate_start(stasis_handle_t * h, lsn_t new_start) { static int debug_truncate_start(stasis_handle_t * h, lsn_t new_start) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h; stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call truncate_start(%lx, %lld)\n", pthread_self(), (unsigned long)hh, new_start); fflush(stdout); printf("tid=%9ld call truncate_start(%lx, %lld)\n", pthread_self(), (unsigned long)hh, new_start); fflush(stdout);
@ -175,6 +182,7 @@ struct stasis_handle_t debug_func = {
.read_buffer = debug_read_buffer, .read_buffer = debug_read_buffer,
.release_read_buffer = debug_release_read_buffer, .release_read_buffer = debug_release_read_buffer,
.force = debug_force, .force = debug_force,
.force_range = debug_force_range,
.truncate_start = debug_truncate_start, .truncate_start = debug_truncate_start,
.error = 0 .error = 0
}; };

View file

@ -1,4 +1,8 @@
#include <config.h> #include <config.h>
#ifdef HAVE_SYNC_FILE_RANGE
#define _GNU_SOURCE
#endif
#include <fcntl.h>
#include <stasis/common.h> #include <stasis/common.h>
#include <stasis/io/handle.h> #include <stasis/io/handle.h>
#include <stdlib.h> #include <stdlib.h>
@ -11,7 +15,6 @@
#include <assert.h> #include <assert.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h>
/** @file */ /** @file */
@ -39,6 +42,7 @@ static int updateEOF(stasis_handle_t * h) {
static int file_num_copies(stasis_handle_t * h) { return 0; } static int file_num_copies(stasis_handle_t * h) { return 0; }
static int file_num_copies_buffer(stasis_handle_t * h) { return 0; } static int file_num_copies_buffer(stasis_handle_t * h) { return 0; }
static int file_force(stasis_handle_t *h); static int file_force(stasis_handle_t *h);
static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop);
static int file_close(stasis_handle_t * h) { static int file_close(stasis_handle_t * h) {
file_force(h); file_force(h);
@ -415,7 +419,17 @@ static int file_force(stasis_handle_t * h) {
pthread_mutex_lock(&impl->mut); // must latch because of truncate... :( pthread_mutex_lock(&impl->mut); // must latch because of truncate... :(
int fd = impl->fd; int fd = impl->fd;
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
{
static int warned = 0;
if(!warned) {
printf("Warning: There is a race condition between force() and "
" truncate() in file.c (This shouldn't matter in practice, "
"as the logger hasn't moved over to use file.c yet.\n");
warned = 1;
}
}
// XXX there is a race here; the file handle could have been invalidated
// by truncate.
#ifdef HAVE_FDATASYNC #ifdef HAVE_FDATASYNC
DEBUG("file_force() is calling fdatasync()\n"); DEBUG("file_force() is calling fdatasync()\n");
fdatasync(fd); fdatasync(fd);
@ -428,7 +442,52 @@ static int file_force(stasis_handle_t * h) {
} }
return 0; return 0;
} }
static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
file_impl * impl = h->impl;
int ret = 0;
if(!impl->file_flags & O_SYNC) {
// not opened synchronously; we need to explicitly sync.
pthread_mutex_lock(&impl->mut);
int fd = impl->fd;
lsn_t off = impl->start_pos;
pthread_mutex_unlock(&impl->mut);
{
static int warned = 0;
if(!warned) {
printf("Warning: There is a race condition between force_range() and "
" truncate() in file.c (This shouldn't matter in practice, "
"as the logger hasn't moved over to use file.c yet.\n");
warned = 1;
}
}
//#ifdef HAVE_F_SYNC_RANGE
#ifdef HAVE_SYNC_FILE_RANGE
printf("Calling sync_file_range\n");
ret = sync_file_range(fd, start-off, (stop-start),
SYNC_FILE_RANGE_WAIT_BEFORE |
SYNC_FILE_RANGE_WRITE |
SYNC_FILE_RANGE_WAIT_AFTER);
if(ret) {
int error = errno;
assert(ret == -1);
// With the possible exceptions of ENOMEM and ENOSPACE, all of the sync
// errors are unrecoverable.
h->error = EBADF;
ret = error;
}
#else
#ifdef HAVE_FDATASYNC
printf("file_force_range() is calling fdatasync()\n");
fdatasync(fd);
#else
printf("file_force_range() is calling fsync()\n");
fsync(fd);
#endif
ret = 0;
#endif
}
return ret;
}
static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) { static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) {
file_impl * impl = h->impl; file_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
@ -508,6 +567,7 @@ struct stasis_handle_t file_func = {
.read_buffer = file_read_buffer, .read_buffer = file_read_buffer,
.release_read_buffer = file_release_read_buffer, .release_read_buffer = file_release_read_buffer,
.force = file_force, .force = file_force,
.force_range = file_force_range,
.truncate_start = file_truncate_start, .truncate_start = file_truncate_start,
.error = 0 .error = 0
}; };

View file

@ -215,6 +215,9 @@ static int mem_read(stasis_handle_t * h,
static int mem_force(stasis_handle_t *h) { static int mem_force(stasis_handle_t *h) {
return 0; return 0;
} }
static int mem_force_range(stasis_handle_t *h,lsn_t start, lsn_t stop) {
return 0;
}
static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) { static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) {
mem_impl* impl = (mem_impl*) h->impl; mem_impl* impl = (mem_impl*) h->impl;
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&(impl->mut));
@ -255,6 +258,7 @@ struct stasis_handle_t mem_func = {
.read_buffer = mem_read_buffer, .read_buffer = mem_read_buffer,
.release_read_buffer = mem_release_read_buffer, .release_read_buffer = mem_release_read_buffer,
.force = mem_force, .force = mem_force,
.force_range = mem_force_range,
.truncate_start = mem_truncate_start, .truncate_start = mem_truncate_start,
.error = 0 .error = 0
}; };

View file

@ -524,12 +524,16 @@ static int nbw_read(stasis_handle_t * h,
} }
return ret; return ret;
} }
static int nbw_force(stasis_handle_t * h) { static int nbw_force_range_impl(stasis_handle_t * h, lsn_t start, lsn_t stop) {
nbw_impl * impl = h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); // pthread_mutex_lock(&impl->mut);
const tree_node * n = RB_ENTRY(min)(impl->fast_handles); tree_node scratch;
scratch.start_pos = start;
scratch.end_pos = start+1;
const tree_node * n = RB_ENTRY(lookup)(RB_LUGTEQ,&scratch,impl->fast_handles); // min)(impl->fast_handles);
int blocked = 0; int blocked = 0;
while(n) { while(n) {
if(n->start_pos >= stop) { break; }
if(n->dirty) { if(n->dirty) {
// cast strips const // cast strips const
((tree_node*)n)->dirty = NEEDS_FORCE; ((tree_node*)n)->dirty = NEEDS_FORCE;
@ -553,15 +557,31 @@ static int nbw_force(stasis_handle_t * h) {
if(impl->slow_force_once) { if(impl->slow_force_once) {
if(impl->all_slow_handle_count) { if(impl->all_slow_handle_count) {
stasis_handle_t * h = impl->all_slow_handles[0]; stasis_handle_t * h = impl->all_slow_handles[0];
ret = h->force(h); ret = h->force_range(h, start, stop);
} }
} else { } else {
for(int i = 0; i < impl->all_slow_handle_count; i++) { for(int i = 0; i < impl->all_slow_handle_count; i++) {
stasis_handle_t * h = impl->all_slow_handles[i]; stasis_handle_t * h = impl->all_slow_handles[i];
int tmpret = h->force(h); int tmpret = h->force_range(h, start, stop);
if(tmpret) { ret = tmpret; } if(tmpret) { ret = tmpret; }
} }
} }
// pthread_mutex_unlock(&impl->mut);
return ret;
}
static int nbw_force(stasis_handle_t * h) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
int ret = nbw_force_range_impl(h, impl->start_pos, impl->end_pos);
pthread_mutex_unlock(&impl->mut);
return ret;
}
static int nbw_force_range(stasis_handle_t * h,
off_t start,
off_t stop) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
int ret = nbw_force_range_impl(h, start, stop);
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
return ret; return ret;
} }
@ -596,6 +616,7 @@ struct stasis_handle_t nbw_func = {
.read_buffer = nbw_read_buffer, .read_buffer = nbw_read_buffer,
.release_read_buffer = nbw_release_read_buffer, .release_read_buffer = nbw_release_read_buffer,
.force = nbw_force, .force = nbw_force,
.force_range = nbw_force_range,
.truncate_start = nbw_truncate_start, .truncate_start = nbw_truncate_start,
.error = 0 .error = 0
}; };

View file

@ -1,9 +1,13 @@
#include <config.h> #include <config.h>
#define _XOPEN_SOURCE 500 #define _XOPEN_SOURCE 500
#ifdef HAVE_SYNC_FILE_RANGE
#define _GNU_SOURCE
#endif
#include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
#include <assert.h> #include <assert.h>
@ -378,6 +382,34 @@ static int pfile_force(stasis_handle_t *h) {
} }
return 0; return 0;
} }
static int pfile_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
pfile_impl * impl = h->impl;
#ifdef HAVE_SYNC_FILE_RANGE
printf("Calling sync_file_range\n");
int ret = sync_file_range(impl->fd, start-impl->start_pos, (stop-start),
SYNC_FILE_RANGE_WAIT_BEFORE |
SYNC_FILE_RANGE_WRITE |
SYNC_FILE_RANGE_WAIT_AFTER);
if(ret) {
int error = errno;
assert(ret == -1);
// With the possible exceptions of ENOMEM and ENOSPACE, all of the sync
// errors are unrecoverable.
h->error = EBADF;
ret = error;
}
#else
#ifdef HAVE_FDATASYNC
printf("file_force_range() is calling fdatasync()\n");
fdatasync(fd);
#else
printf("file_force_range() is calling fsync()\n");
fsync(fd);
#endif
int ret = 0;
#endif
return ret;
}
static int pfile_truncate_start(stasis_handle_t *h, lsn_t new_start) { static int pfile_truncate_start(stasis_handle_t *h, lsn_t new_start) {
static int truncate_warned = 0; static int truncate_warned = 0;
if (!truncate_warned) { if (!truncate_warned) {
@ -403,6 +435,7 @@ struct stasis_handle_t pfile_func = {
.read_buffer = pfile_read_buffer, .read_buffer = pfile_read_buffer,
.release_read_buffer = pfile_release_read_buffer, .release_read_buffer = pfile_release_read_buffer,
.force = pfile_force, .force = pfile_force,
.force_range = pfile_force_range,
.truncate_start = pfile_truncate_start, .truncate_start = pfile_truncate_start,
.error = 0 .error = 0
}; };

View file

@ -6,6 +6,7 @@
#include <set> #include <set>
#include "lsmIterators.h" #include "lsmIterators.h"
#include <stasis/truncation.h>
namespace rose { namespace rose {
/** /**
@ -24,6 +25,7 @@ namespace rose {
int worker_id; int worker_id;
pageid_t(*pageAlloc)(int,void*); pageid_t(*pageAlloc)(int,void*);
void *pageAllocState; void *pageAllocState;
void *oldAllocState;
pthread_mutex_t * block_ready_mut; pthread_mutex_t * block_ready_mut;
pthread_cond_t * in_block_needed_cond; pthread_cond_t * in_block_needed_cond;
pthread_cond_t * out_block_needed_cond; pthread_cond_t * out_block_needed_cond;
@ -91,6 +93,7 @@ namespace rose {
// experiments, but 450 bytes overhead per tuple is insane! // experiments, but 450 bytes overhead per tuple is insane!
static const int RB_TREE_OVERHEAD = 400; // = 450; static const int RB_TREE_OVERHEAD = 400; // = 450;
static const pageid_t MEM_SIZE = 1000 * 1000 * 1000; static const pageid_t MEM_SIZE = 1000 * 1000 * 1000;
// static const pageid_t MEM_SIZE = 100 * 1000;
// How many pages should we try to fill with the first C1 merge? // How many pages should we try to fill with the first C1 merge?
static const int R = 3; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size)) static const int R = 3; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size))
static const pageid_t START_SIZE = MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. static const pageid_t START_SIZE = MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead.
@ -115,6 +118,8 @@ namespace rose {
int xid = Tbegin(); int xid = Tbegin();
// Initialize tree with an empty tree. // Initialize tree with an empty tree.
// XXX hardcodes ITERA's type: // XXX hardcodes ITERA's type:
// We assume that the caller set pageAllocState for us; oldPageAllocState
// shouldn't be set (it should be NULLRID)
typename ITERA::handle tree typename ITERA::handle tree
= new typename ITERA::treeIteratorHandle( = new typename ITERA::treeIteratorHandle(
TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
@ -155,7 +160,17 @@ namespace rose {
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
xid = Tbegin(); xid = Tbegin();
// XXX hardcodes allocator type.
if(((recordid*)a->oldAllocState)->size != -1) {
// free the tree that we merged against during the last round.
TlsmFree(xid,tree->r_,TlsmRegionDeallocRid,a->oldAllocState);
}
// we're merging against old alloc state this round.
*(recordid*)(a->oldAllocState) = *(recordid*)(a->pageAllocState);
// we're merging into pagealloc state.
*(recordid*)(a->pageAllocState) = Talloc(xid, sizeof(TlsmRegionAllocConf_t));
Tset(xid, *(recordid*)(a->pageAllocState),
&LSM_REGION_ALLOC_STATIC_INITIALIZER);
tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
@ -189,12 +204,8 @@ namespace rose {
delete taEnd; delete taEnd;
delete tbEnd; delete tbEnd;
gettimeofday(&stop_tv,0); gettimeofday(&stop_tv,0);
// TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation!
// TlsmFree(wait_queue[1])
merge_count++; merge_count++;
double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv); double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv);
@ -252,6 +263,17 @@ namespace rose {
// We don't want to ever look at the one we just handed upstream... // We don't want to ever look at the one we just handed upstream...
// We could wait for an in tree to be ready, and then pass it directly // We could wait for an in tree to be ready, and then pass it directly
// to compress data (to avoid all those merging comparisons...) // to compress data (to avoid all those merging comparisons...)
// old alloc state contains the tree that we used as input for this merge... we can still free it
// XXX storage leak; upstream is going to have to free this somehow...
*(recordid*)(a->pageAllocState) = NULLRID;
// create a new allocator.
*(recordid*)(a->pageAllocState) = Talloc(xid, sizeof(TlsmRegionAllocConf_t));
Tset(xid, *(recordid*)(a->pageAllocState),
&LSM_REGION_ALLOC_STATIC_INITIALIZER);
tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
@ -272,8 +294,10 @@ namespace rose {
typedef struct { typedef struct {
recordid bigTree; recordid bigTree;
recordid bigTreeAllocState; // this is probably the head of an arraylist of regions used by the tree... recordid bigTreeAllocState; // this is probably the head of an arraylist of regions used by the tree...
recordid oldBigTreeAllocState; // this is probably the head of an arraylist of regions used by the tree...
recordid mediumTree; recordid mediumTree;
recordid mediumTreeAllocState; recordid mediumTreeAllocState;
recordid oldMediumTreeAllocState;
epoch_t beginning; epoch_t beginning;
epoch_t end; epoch_t end;
} lsmTableHeader_t; } lsmTableHeader_t;
@ -287,11 +311,13 @@ namespace rose {
recordid ret = Talloc(xid, sizeof(lsmTableHeader_t)); recordid ret = Talloc(xid, sizeof(lsmTableHeader_t));
lsmTableHeader_t h; lsmTableHeader_t h;
h.oldBigTreeAllocState = NULLRID;
h.bigTreeAllocState = Talloc(xid,sizeof(TlsmRegionAllocConf_t)); h.bigTreeAllocState = Talloc(xid,sizeof(TlsmRegionAllocConf_t));
Tset(xid,h.bigTreeAllocState,&LSM_REGION_ALLOC_STATIC_INITIALIZER); Tset(xid,h.bigTreeAllocState,&LSM_REGION_ALLOC_STATIC_INITIALIZER);
h.bigTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(), h.bigTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),
TlsmRegionAllocRid,&h.bigTreeAllocState, TlsmRegionAllocRid,&h.bigTreeAllocState,
PAGELAYOUT::FMT::TUP::sizeofBytes()); PAGELAYOUT::FMT::TUP::sizeofBytes());
h.oldMediumTreeAllocState = NULLRID;
h.mediumTreeAllocState = Talloc(xid,sizeof(TlsmRegionAllocConf_t)); h.mediumTreeAllocState = Talloc(xid,sizeof(TlsmRegionAllocConf_t));
Tset(xid,h.mediumTreeAllocState,&LSM_REGION_ALLOC_STATIC_INITIALIZER); Tset(xid,h.mediumTreeAllocState,&LSM_REGION_ALLOC_STATIC_INITIALIZER);
h.mediumTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(), h.mediumTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),
@ -401,6 +427,8 @@ namespace rose {
recordid * ridp = (recordid*)malloc(sizeof(recordid)); recordid * ridp = (recordid*)malloc(sizeof(recordid));
*ridp = h.bigTreeAllocState; *ridp = h.bigTreeAllocState;
recordid * oldridp = (recordid*)malloc(sizeof(recordid));
*oldridp = NULLRID;
ret->args1 = (merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>)); ret->args1 = (merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>));
merge_args<PAGELAYOUT, LSM_ITER, LSM_ITER> tmpargs1 = merge_args<PAGELAYOUT, LSM_ITER, LSM_ITER> tmpargs1 =
@ -408,6 +436,7 @@ namespace rose {
1, 1,
TlsmRegionAllocRid, TlsmRegionAllocRid,
ridp, ridp,
oldridp,
block_ready_mut, block_ready_mut,
block1_needed_cond, block1_needed_cond,
block2_needed_cond, block2_needed_cond,
@ -427,6 +456,8 @@ namespace rose {
ridp = (recordid*)malloc(sizeof(recordid)); ridp = (recordid*)malloc(sizeof(recordid));
*ridp = h.mediumTreeAllocState; *ridp = h.mediumTreeAllocState;
oldridp = (recordid*)malloc(sizeof(recordid));
*oldridp = NULLRID;
ret->args2 = (merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>)); ret->args2 = (merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>));
merge_args<PAGELAYOUT, LSM_ITER, RB_ITER> tmpargs2 = merge_args<PAGELAYOUT, LSM_ITER, RB_ITER> tmpargs2 =
@ -434,6 +465,7 @@ namespace rose {
2, 2,
TlsmRegionAllocRid, TlsmRegionAllocRid,
ridp, ridp,
oldridp,
block_ready_mut, block_ready_mut,
block0_needed_cond, block0_needed_cond,
block1_needed_cond, block1_needed_cond,
@ -557,16 +589,17 @@ namespace rose {
byte * arry = val.toByteArray(); byte * arry = val.toByteArray();
typename PAGELAYOUT::FMT::TUP * r = 0; typename PAGELAYOUT::FMT::TUP * r = 0;
if(h->args2->my_tree) {
r = getRecordHelper<PAGELAYOUT>(xid, h->args2->my_tree->r_, val, scratch, arry); r = getRecordHelper<PAGELAYOUT>(xid, h->args2->my_tree->r_, val, scratch, arry);
if(r) { pthread_mutex_unlock(h->mut); return r; } if(r) { pthread_mutex_unlock(h->mut); return r; }
}
DEBUG("Not in first my_tree {%lld}\n", h->args2->my_tree->r_.size); DEBUG("Not in first my_tree {%lld}\n", h->args2->my_tree->r_.size);
if(*h->args1->in_tree) { if(*h->args1->in_tree) {
r = getRecordHelper<PAGELAYOUT>(xid, (**h->args1->in_tree)->r_, val, scratch, arry); r = getRecordHelper<PAGELAYOUT>(xid, (**h->args1->in_tree)->r_, val, scratch, arry);
if(r) { pthread_mutex_unlock(h->mut); return r; } if(r) { pthread_mutex_unlock(h->mut); return r; }
} else { } else {
DEBUG("no tree"); DEBUG("no second in_tree");
} }
DEBUG("Not in second in_tree\n"); DEBUG("Not in second in_tree\n");

View file

@ -11,19 +11,46 @@ static lsm_comparator_t comparators[MAX_LSM_COMPARATORS];
static lsm_page_initializer_t initializers[MAX_LSM_PAGE_INITIALIZERS]; static lsm_page_initializer_t initializers[MAX_LSM_PAGE_INITIALIZERS];
TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER = TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER =
{ -1, -1, 1000 }; { {0,0,-1}, 0, -1, -1, 1000 };
pageid_t TlsmRegionAlloc(int xid, void *conf) { pageid_t TlsmRegionAlloc(int xid, void *conf) {
TlsmRegionAllocConf_t * a = (TlsmRegionAllocConf_t*)conf; TlsmRegionAllocConf_t* a = (TlsmRegionAllocConf_t*)conf;
if(a->nextPage == a->endOfRegion) { if(a->nextPage == a->endOfRegion) {
a->nextPage = TregionAlloc(xid, a->regionSize,0); if(a->regionList.size == -1) {
a->endOfRegion = a->nextPage + a->regionSize; a->regionList = TarrayListAlloc(xid, 1, 4, sizeof(pageid_t));
a->regionCount = 0;
} }
TarrayListExtend(xid,a->regionList,1);
a->regionList.slot = a->regionCount;
DEBUG("region lst slot %lld\n",a->regionList.slot);
a->regionCount++;
DEBUG("region count %lld\n",a->regionCount);
a->nextPage = TregionAlloc(xid, a->regionSize,0);
DEBUG("next page %lld\n",a->nextPage);
a->endOfRegion = a->nextPage + a->regionSize;
Tset(xid,a->regionList,&a->nextPage);
DEBUG("next page %lld\n",a->nextPage);
}
DEBUG("%lld ?= %lld\n", a->nextPage,a->endOfRegion);
pageid_t ret = a->nextPage; pageid_t ret = a->nextPage;
DEBUG("ret %lld\n",ret);
(a->nextPage)++; (a->nextPage)++;
return ret; return ret;
} }
void TlsmRegionDeallocRid(int xid, void *conf) {
recordid rid = *(recordid*)conf;
TlsmRegionAllocConf_t a;
Tread(xid,rid,&a);
// TlsmRegionAllocConf_t* a = (TlsmRegionAllocConf_t*)conf;
for(int i = 0; i < a.regionCount; i++) {
a.regionList.slot = i;
pageid_t pid;
Tread(xid,a.regionList,&pid);
TregionDealloc(xid,pid);
}
}
pageid_t TlsmRegionAllocRid(int xid, void * ridp) { pageid_t TlsmRegionAllocRid(int xid, void * ridp) {
recordid rid = *(recordid*)ridp; recordid rid = *(recordid*)ridp;
TlsmRegionAllocConf_t conf; TlsmRegionAllocConf_t conf;
@ -575,6 +602,13 @@ recordid TlsmAppendPage(int xid, recordid tree,
return ret; return ret;
} }
void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc,
void *allocator_state) {
// Tdealloc(xid,tree);
dealloc(xid,allocator_state);
// XXX fishy shouldn't caller do this?
Tdealloc(xid, *(recordid*)allocator_state);
}
static pageid_t lsmLookup(int xid, Page *node, int depth, static pageid_t lsmLookup(int xid, Page *node, int depth,
const byte *key, size_t keySize, lsm_comparator_t cmp) { const byte *key, size_t keySize, lsm_comparator_t cmp) {

View file

@ -124,7 +124,6 @@ For<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
} }
} }
} }
assert(ret); //XXX
return ret; return ret;
} }
} // namespace rose } // namespace rose

View file

@ -28,6 +28,7 @@ static int stable = -1;
static pthread_mutex_t stable_mutex; static pthread_mutex_t stable_mutex;
static void pfForcePageFile(); static void pfForcePageFile();
static void pfClosePageFile(); static void pfClosePageFile();
static void pfForceRangePageFile(lsn_t start, lsn_t stop) ;
inline static pageid_t myLseekNoLock(int f, pageid_t offset, int whence); inline static pageid_t myLseekNoLock(int f, pageid_t offset, int whence);
static int oldOffset = -1; static int oldOffset = -1;
@ -134,6 +135,7 @@ void openPageFile() {
pageRead = pfPageRead; pageRead = pfPageRead;
pageWrite = pfPageWrite; pageWrite = pfPageWrite;
forcePageFile = pfForcePageFile; forcePageFile = pfForcePageFile;
forceRangePageFile = pfForceRangePageFile;
closePageFile = pfClosePageFile; closePageFile = pfClosePageFile;
DEBUG("Opening storefile.\n"); DEBUG("Opening storefile.\n");
@ -170,6 +172,23 @@ static void pfForcePageFile() {
} }
} }
static void pfForceRangePageFile(lsn_t start, lsn_t stop) {
if(pageFile_isDurable) {
#ifdef HAVE_SYNC_FILE_RANGE
int ret = sync_file_range(stable, start, stop,
SYNC_FILE_RANGE_WAIT_BEFORE |
SYNC_FILE_RANGE_WRITE |
SYNC_FILE_RANGE_WAIT_AFTER);
assert(!ret);
#else
#ifdef HAVE_FDATASYNC
fdatasync(fd);
#else
fsync(fd);
#endif
#endif
}
}
static void pfClosePageFile() { static void pfClosePageFile() {
assert(stable != -1); assert(stable != -1);
forcePageFile(); forcePageFile();

View file

@ -9,6 +9,7 @@
void (*pageWrite)(Page * dat); void (*pageWrite)(Page * dat);
void (*pageRead)(Page * ret); void (*pageRead)(Page * ret);
void (*forcePageFile)(); void (*forcePageFile)();
void (*forceRangePageFile)();
void (*closePageFile)(); void (*closePageFile)();
int printedForceWarning = 0; int printedForceWarning = 0;
@ -57,6 +58,10 @@ static void phForce() {
int err = h->force(h); int err = h->force(h);
assert(!err); assert(!err);
} }
static void phForceRange(lsn_t start, lsn_t stop) {
int err = h->force_range(h,start,stop);
assert(!err);
}
static void phClose() { static void phClose() {
int err = h->close(h); int err = h->close(h);
DEBUG("Closing pageHandle\n"); DEBUG("Closing pageHandle\n");
@ -72,6 +77,7 @@ void pageHandleOpen(stasis_handle_t * handle) {
pageWrite = phWrite; pageWrite = phWrite;
pageRead = phRead; pageRead = phRead;
forcePageFile = phForce; forcePageFile = phForce;
forceRangePageFile = phForceRange;
closePageFile = phClose; closePageFile = phClose;
h = handle; h = handle;
} }

View file

@ -106,7 +106,31 @@ static void dirtyPages_flush() {
} }
free(staleDirtyPages); free(staleDirtyPages);
} }
void dirtyPages_flushRange(pageid_t start, pageid_t stop) {
int * staleDirtyPages = malloc(sizeof(int) * (MAX_BUFFER_SIZE));
int i;
Page * p = 0;
pthread_mutex_lock(&dirtyPages_mutex);
void *tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages); tmp; tmp = pblHtNext(dirtyPages)) {
int num = *((int*) pblHtCurrentKey(dirtyPages));
if(num <= start && num < stop) {
staleDirtyPages[i] = num;
i++;
}
}
staleDirtyPages[i] = -1;
pthread_mutex_unlock(&dirtyPages_mutex);
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = loadPage(-1, staleDirtyPages[i]);
writeBackPage(p);
releasePage(p);
}
free(staleDirtyPages);
forcePageRange(start,stop); // XXX
}
void dirtyPagesInit() { void dirtyPagesInit() {
dirtyPages = pblHtCreate(); dirtyPages = pblHtCreate();
} }

View file

@ -149,7 +149,12 @@ extern void (*writeBackPage)(Page * p);
a no-op. a no-op.
*/ */
extern void (*forcePages)(); extern void (*forcePages)();
/**
Force written back pages that fall within a particular range to disk.
This does not force page that have not been written to with pageWrite().
*/
extern void (*forcePageRange)(pageid_t start, pageid_t stop);
extern void (*simulateBufferManagerCrash)(); extern void (*simulateBufferManagerCrash)();
int bufInit(int type); int bufInit(int type);

View file

@ -199,6 +199,7 @@ typedef struct stasis_handle_t {
returned) may or may not be forced to disk. returned) may or may not be forced to disk.
*/ */
int (*force)(struct stasis_handle_t * h); int (*force)(struct stasis_handle_t * h);
int (*force_range)(struct stasis_handle_t * h, lsn_t start, lsn_t stop);
/** /**
Truncate bytes from the beginning of the file. This is needed by Truncate bytes from the beginning of the file. This is needed by
the log manager. the log manager.

View file

@ -25,6 +25,7 @@ typedef struct {
typedef int(*lsm_comparator_t)(const void* a, const void* b); typedef int(*lsm_comparator_t)(const void* a, const void* b);
typedef void*(*lsm_page_initializer_t)(Page *, void *); typedef void*(*lsm_page_initializer_t)(Page *, void *);
typedef pageid_t(*lsm_page_allocator_t)(int, void *); typedef pageid_t(*lsm_page_allocator_t)(int, void *);
typedef void(*lsm_page_deallocator_t)(int, void *);
void lsmTreeRegisterComparator(int id, lsm_comparator_t i); void lsmTreeRegisterComparator(int id, lsm_comparator_t i);
void lsmTreeRegisterPageInitializer(int id, lsm_page_initializer_t i); void lsmTreeRegisterPageInitializer(int id, lsm_page_initializer_t i);
@ -32,10 +33,15 @@ void lsmTreeRegisterPageInitializer(int id, lsm_page_initializer_t i);
pageid_t TlsmRegionAlloc(int xid, void *conf); pageid_t TlsmRegionAlloc(int xid, void *conf);
pageid_t TlsmRegionAllocRid(int xid, void *conf); pageid_t TlsmRegionAllocRid(int xid, void *conf);
typedef struct { typedef struct {
recordid regionList;
pageid_t regionCount;
pageid_t nextPage; pageid_t nextPage;
pageid_t endOfRegion; pageid_t endOfRegion;
pageid_t regionSize; pageid_t regionSize;
} TlsmRegionAllocConf_t; } TlsmRegionAllocConf_t;
void TlsmRegionDeallocRid(int xid, void *conf);
extern TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER; extern TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER;
/** /**
@ -66,6 +72,8 @@ recordid TlsmAppendPage(int xid, recordid tree,
const byte *key, const byte *key,
lsm_page_allocator_t allocator, void *allocator_state, lsm_page_allocator_t allocator, void *allocator_state,
long pageid); long pageid);
void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc,
void *allocator_state);
/** /**
Lookup a leaf page. Lookup a leaf page.

View file

@ -47,6 +47,7 @@ extern void (*pageRead)(Page * ret);
as well...) as well...)
*/ */
extern void (*forcePageFile)(); extern void (*forcePageFile)();
extern void (*forceRangePageFile)();
/** /**
Force the page file to disk, then close it. Force the page file to disk, then close it.
*/ */