refactored/created stasis_buffer_manager_open and stasis_handle_open
This commit is contained in:
parent
b5b414490a
commit
7b4cf40221
18 changed files with 272 additions and 248 deletions
|
@ -29,7 +29,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c
|
|||
operations/pageOrientedListNTA.c
|
||||
operations/regions.c operations/lsmTree.c
|
||||
io/rangeTracker.c io/memory.c io/file.c io/pfile.c
|
||||
io/non_blocking.c io/debug.c
|
||||
io/non_blocking.c io/debug.c io/handle.c
|
||||
bufferManager/pageArray.c
|
||||
bufferManager/bufferHash.c replacementPolicy/lru.c
|
||||
replacementPolicy/lruFast.c)
|
||||
|
|
|
@ -22,7 +22,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common
|
|||
operations/regions.c operations/lsmTree.c \
|
||||
operations/lsnFreeSet.c \
|
||||
io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c \
|
||||
io/debug.c \
|
||||
io/debug.c io/handle.c \
|
||||
bufferManager.c \
|
||||
bufferManager/pageArray.c \
|
||||
bufferManager/bufferHash.c \
|
||||
|
|
|
@ -169,8 +169,8 @@ void (*releasePageImpl)(Page * p) = 0;
|
|||
void (*writeBackPage)(Page * p) = 0;
|
||||
void (*forcePages)() = 0;
|
||||
void (*forcePageRange)(pageid_t start, pageid_t stop) = 0;
|
||||
void (*bufDeinit)() = 0;
|
||||
void (*simulateBufferManagerCrash)() = 0;
|
||||
void (*stasis_buffer_manager_close)() = 0;
|
||||
void (*stasis_buffer_manager_simulate_crash)() = 0;
|
||||
|
||||
Page * loadPage(int xid, pageid_t pageid) {
|
||||
try_ret(NULL) {
|
||||
|
@ -195,7 +195,7 @@ void releasePage(Page * p) {
|
|||
releasePageImpl(p);
|
||||
}
|
||||
|
||||
int bufInit(int type) {
|
||||
int stasis_buffer_manager_open(int type) {
|
||||
bufferManagerType = type;
|
||||
static int lastType = 0;
|
||||
if(type == BUFFER_MANAGER_REOPEN) {
|
||||
|
@ -203,13 +203,13 @@ int bufInit(int type) {
|
|||
}
|
||||
lastType = type;
|
||||
if(type == BUFFER_MANAGER_DEPRECATED_HASH) {
|
||||
bufManBufInit();
|
||||
stasis_buffer_manager_deprecated_open();
|
||||
return 0;
|
||||
} else if (type == BUFFER_MANAGER_MEM_ARRAY) {
|
||||
paBufInit();
|
||||
stasis_buffer_manager_mem_array_open();
|
||||
return 0;
|
||||
} else if (type == BUFFER_MANAGER_HASH) {
|
||||
bhBufInit();
|
||||
stasis_buffer_manager_hash_open();
|
||||
return 0;
|
||||
} else {
|
||||
// XXX error handling
|
||||
|
|
|
@ -376,7 +376,7 @@ static void bhSimulateBufferManagerCrash() {
|
|||
stasis_buffer_pool_deinit(stasis_buffer_pool);
|
||||
}
|
||||
|
||||
void bhBufInit() {
|
||||
void stasis_buffer_manager_hash_open() {
|
||||
|
||||
assert(!running);
|
||||
|
||||
|
@ -390,8 +390,8 @@ void bhBufInit() {
|
|||
writeBackPage = bhWriteBackPage;
|
||||
forcePages = bhForcePages;
|
||||
forcePageRange = bhForcePageRange;
|
||||
bufDeinit = bhBufDeinit;
|
||||
simulateBufferManagerCrash = bhSimulateBufferManagerCrash;
|
||||
stasis_buffer_manager_close = bhBufDeinit;
|
||||
stasis_buffer_manager_simulate_crash = bhSimulateBufferManagerCrash;
|
||||
|
||||
stasis_buffer_pool = stasis_buffer_pool_init();
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ static void bufManSimulateBufferManagerCrash();
|
|||
|
||||
static stasis_buffer_pool_t * stasis_buffer_pool;
|
||||
|
||||
int bufManBufInit() {
|
||||
int stasis_buffer_manager_deprecated_open() {
|
||||
|
||||
releasePageImpl = bufManReleasePage;
|
||||
loadPageImpl = bufManLoadPage;
|
||||
|
@ -35,8 +35,8 @@ int bufManBufInit() {
|
|||
writeBackPage = pageWrite;
|
||||
forcePages = forcePageFile;
|
||||
forcePageRange = forceRangePageFile;
|
||||
bufDeinit = bufManBufDeinit;
|
||||
simulateBufferManagerCrash = bufManSimulateBufferManagerCrash;
|
||||
stasis_buffer_manager_close = bufManBufDeinit;
|
||||
stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash;
|
||||
|
||||
stasis_buffer_pool = stasis_buffer_pool_init();
|
||||
|
||||
|
|
|
@ -57,14 +57,14 @@ static void paBufDeinit() {
|
|||
}
|
||||
}
|
||||
|
||||
void paBufInit () {
|
||||
void stasis_buffer_manager_mem_array_open () {
|
||||
|
||||
releasePageImpl = paReleasePage;
|
||||
loadPageImpl = paLoadPage;
|
||||
writeBackPage = paWriteBackPage;
|
||||
forcePages = paForcePages;
|
||||
bufDeinit = paBufDeinit;
|
||||
simulateBufferManagerCrash = paBufDeinit;
|
||||
stasis_buffer_manager_close = paBufDeinit;
|
||||
stasis_buffer_manager_simulate_crash = paBufDeinit;
|
||||
|
||||
pageCount = 0;
|
||||
pageMap = 0;
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
|
||||
/** @file */
|
||||
|
||||
typedef struct file_impl {
|
||||
typedef struct file_impl {
|
||||
pthread_mutex_t mut;
|
||||
lsn_t start_pos;
|
||||
lsn_t end_pos;
|
||||
|
@ -31,7 +31,7 @@ typedef struct file_impl {
|
|||
static int updateEOF(stasis_handle_t * h) {
|
||||
file_impl * impl = h->impl;
|
||||
off_t pos = lseek(impl->fd, 0, SEEK_END);
|
||||
if(pos == (off_t)-1) {
|
||||
if(pos == (off_t)-1) {
|
||||
return errno;
|
||||
} else {
|
||||
impl->end_pos = impl->start_pos + pos;
|
||||
|
@ -64,77 +64,77 @@ static lsn_t file_start_position(stasis_handle_t *h) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
static lsn_t file_end_position(stasis_handle_t *h) {
|
||||
static lsn_t file_end_position(stasis_handle_t *h) {
|
||||
file_impl * impl = (file_impl*)h->impl;
|
||||
pthread_mutex_lock(&(impl->mut));
|
||||
int error = updateEOF(h);
|
||||
int ret;
|
||||
if(error) {
|
||||
if(error) {
|
||||
h->error = error;
|
||||
ret = -1;
|
||||
} else {
|
||||
} else {
|
||||
ret = impl->end_pos;
|
||||
}
|
||||
pthread_mutex_unlock(&(impl->mut));
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off,
|
||||
const byte * dat, lsn_t len) {
|
||||
inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off,
|
||||
const byte * dat, lsn_t len) {
|
||||
file_impl * impl = (file_impl*)h->impl;
|
||||
int error = 0;
|
||||
|
||||
// These should have been checked by the caller.
|
||||
assert(impl->start_pos <= off);
|
||||
assert(impl->start_pos <= off);
|
||||
assert(impl->end_pos >= off+len);
|
||||
|
||||
// @todo need a test harness that gets read(), write() and lseek() to misbehave.
|
||||
|
||||
off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET);
|
||||
|
||||
if(lseek_offset == (off_t)-1) {
|
||||
if(lseek_offset == (off_t)-1) {
|
||||
error = errno;
|
||||
if(error == EBADF || error == ESPIPE) {
|
||||
h->error = error;
|
||||
error = EBADF;
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
ssize_t bytes_written = 0;
|
||||
|
||||
|
||||
// seek succeeded, so attempt to write.
|
||||
while(bytes_written < len) {
|
||||
while(bytes_written < len) {
|
||||
ssize_t ret = write(impl->fd,
|
||||
dat+bytes_written,
|
||||
dat+bytes_written,
|
||||
len-bytes_written);
|
||||
if(ret == -1) {
|
||||
if(errno == EAGAIN || errno == EINTR) {
|
||||
// EAGAIN could be returned if the file handle was opened in non-blocking mode.
|
||||
// we should probably warn instead of spinning.
|
||||
|
||||
|
||||
// EINTR is returned if the write is interrupted by a signal.
|
||||
// On Linux, it must have been interrupted before any bytes were written.
|
||||
// On SVr4 it may have been interrupted at any point.
|
||||
|
||||
|
||||
// Try again.
|
||||
|
||||
lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET);
|
||||
if(lseek_offset == (off_t)-1) {
|
||||
if(lseek_offset == (off_t)-1) {
|
||||
error = errno;
|
||||
if(error == EBADF || error == ESPIPE) {
|
||||
if(error == EBADF || error == ESPIPE) {
|
||||
h->error = error;
|
||||
error = EBADF;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
|
||||
} else {
|
||||
|
||||
} else {
|
||||
// Need to set h->error if an unrecoverable error occured.
|
||||
// The only unrecoverable error is EBADF. (EINVAL could be
|
||||
// the caller's fault if O_DIRECT is being used. Otherwise,
|
||||
// it is unrecoverable.
|
||||
if(errno == EBADF) {
|
||||
if(errno == EBADF) {
|
||||
h->error = EBADF;
|
||||
}
|
||||
error = errno;
|
||||
|
@ -148,13 +148,13 @@ inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off,
|
|||
return error;
|
||||
}
|
||||
|
||||
inline static void print_eof_error(char * file, int line) {
|
||||
inline static void print_eof_error(char * file, int line) {
|
||||
fprintf(stderr, "%s:%d Internal error: attempt to access negative offset, or beyond EOF.\n", file, line);
|
||||
fflush(stderr);
|
||||
}
|
||||
|
||||
static int file_read(stasis_handle_t * h,
|
||||
lsn_t off, byte * buf, lsn_t len) {
|
||||
static int file_read(stasis_handle_t * h,
|
||||
lsn_t off, byte * buf, lsn_t len) {
|
||||
file_impl * impl = (file_impl*)(h->impl);
|
||||
pthread_mutex_lock(&(impl->mut));
|
||||
int error = 0;
|
||||
|
@ -167,61 +167,61 @@ static int file_read(stasis_handle_t * h,
|
|||
}
|
||||
}
|
||||
|
||||
if(!error) {
|
||||
if(!error) {
|
||||
off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET);
|
||||
|
||||
if(lseek_offset == (off_t)-1) {
|
||||
|
||||
if(lseek_offset == (off_t)-1) {
|
||||
error = errno;
|
||||
if(error == EBADF || error == ESPIPE) {
|
||||
h->error = error;
|
||||
error = EBADF;
|
||||
} else if(error == EINVAL) {
|
||||
} else if(error == EINVAL) {
|
||||
print_eof_error(__FILE__, __LINE__);
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
ssize_t bytes_written = 0;
|
||||
// seek succeeded, so attempt to read.
|
||||
while(bytes_written < len) {
|
||||
while(bytes_written < len) {
|
||||
ssize_t ret = read(impl->fd,
|
||||
buf+bytes_written,
|
||||
buf+bytes_written,
|
||||
len-bytes_written);
|
||||
if(ret == -1) {
|
||||
if(errno == EAGAIN || errno == EINTR) {
|
||||
// EAGAIN could be returned if the file handle was opened in non-blocking mode.
|
||||
// we should probably warn instead of spinning.
|
||||
|
||||
|
||||
// EINTR is returned if the write is interrupted by a signal.
|
||||
// On Linux, it must have been interrupted before any bytes were written.
|
||||
// On SVr4 it may have been interrupted at any point.
|
||||
|
||||
|
||||
// Try again.
|
||||
|
||||
lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET);
|
||||
if(lseek_offset == (off_t)-1) {
|
||||
if(lseek_offset == (off_t)-1) {
|
||||
error = errno;
|
||||
if(error == EBADF || error == ESPIPE) {
|
||||
if(error == EBADF || error == ESPIPE) {
|
||||
h->error = error;
|
||||
error = EBADF;
|
||||
} else if(error == EINVAL) {
|
||||
} else if(error == EINVAL) {
|
||||
print_eof_error(__FILE__, __LINE__);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
|
||||
} else {
|
||||
|
||||
} else {
|
||||
// Need to set h->error if an unrecoverable error occured.
|
||||
// The only unrecoverable error is EBADF. (EINVAL could be
|
||||
// the caller's fault if O_DIRECT is being used. Otherwise,
|
||||
// it is unrecoverable.
|
||||
if(errno == EBADF) {
|
||||
if(errno == EBADF) {
|
||||
h->error = EBADF;
|
||||
}
|
||||
error = errno;
|
||||
break;
|
||||
}
|
||||
} else if (ret == 0) {
|
||||
} else if (ret == 0) {
|
||||
// EOF (!)
|
||||
print_eof_error(__FILE__, __LINE__);
|
||||
error = EINVAL;
|
||||
|
@ -240,12 +240,12 @@ static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len
|
|||
file_impl * impl = (file_impl*)(h->impl);
|
||||
pthread_mutex_lock(&(impl->mut));
|
||||
int error = 0;
|
||||
if(impl->start_pos > off) {
|
||||
error = EDOM;
|
||||
if(impl->start_pos > off) {
|
||||
error = EDOM;
|
||||
}
|
||||
|
||||
if(!error) {
|
||||
if(impl->end_pos < off+len){
|
||||
if(!error) {
|
||||
if(impl->end_pos < off+len){
|
||||
impl->end_pos = off+len;
|
||||
}
|
||||
error = file_write_unlocked(h, off, dat, len);
|
||||
|
@ -254,7 +254,7 @@ static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len
|
|||
return error;
|
||||
}
|
||||
|
||||
static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) {
|
||||
static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) {
|
||||
file_impl * impl = (file_impl*)(h->impl);
|
||||
pthread_mutex_lock(&impl->mut);
|
||||
updateEOF(h);
|
||||
|
@ -267,35 +267,35 @@ static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t
|
|||
|
||||
|
||||
|
||||
static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h,
|
||||
lsn_t off, lsn_t len) {
|
||||
static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h,
|
||||
lsn_t off, lsn_t len) {
|
||||
// Allocate the handle
|
||||
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
|
||||
if(!ret) { return NULL; }
|
||||
|
||||
|
||||
file_impl * impl = (file_impl*)h->impl;
|
||||
int error = 0;
|
||||
|
||||
pthread_mutex_lock(&(impl->mut));
|
||||
|
||||
if(impl->start_pos > off) {
|
||||
if(impl->start_pos > off) {
|
||||
error = EDOM;
|
||||
}
|
||||
if(off + len > impl->end_pos) {
|
||||
if(off + len > impl->end_pos) {
|
||||
impl->end_pos = off+len;
|
||||
}
|
||||
pthread_mutex_unlock(&(impl->mut));
|
||||
|
||||
|
||||
byte * buf;
|
||||
if(!error) {
|
||||
if(!error) {
|
||||
// Allocate the buffer
|
||||
buf = malloc(len);
|
||||
if(!buf) {
|
||||
if(!buf) {
|
||||
error = ENOMEM;
|
||||
}
|
||||
}
|
||||
|
||||
if(error) {
|
||||
if(error) {
|
||||
ret->h = h;
|
||||
ret->off = 0;
|
||||
ret->buf = 0;
|
||||
|
@ -314,7 +314,7 @@ static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h,
|
|||
return ret;
|
||||
}
|
||||
|
||||
static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h,
|
||||
static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h,
|
||||
lsn_t len) {
|
||||
// Allocate the handle
|
||||
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
|
||||
|
@ -328,15 +328,15 @@ static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h,
|
|||
off_t off = impl->end_pos;
|
||||
impl->end_pos += len;
|
||||
pthread_mutex_unlock(&(impl->mut));
|
||||
|
||||
|
||||
// Allocate the buffer
|
||||
byte * buf = malloc(len);
|
||||
int error = 0;
|
||||
if(!buf) {
|
||||
if(!buf) {
|
||||
error = ENOMEM;
|
||||
}
|
||||
|
||||
if(error) {
|
||||
if(error) {
|
||||
ret->h = h;
|
||||
ret->off = 0;
|
||||
ret->buf = 0;
|
||||
|
@ -354,18 +354,18 @@ static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h,
|
|||
return ret;
|
||||
|
||||
}
|
||||
static int file_release_write_buffer(stasis_write_buffer_t * w) {
|
||||
static int file_release_write_buffer(stasis_write_buffer_t * w) {
|
||||
|
||||
file_impl * impl = (file_impl*)(w->h->impl);
|
||||
|
||||
pthread_mutex_lock(&(impl->mut));
|
||||
int error = 0;
|
||||
if(impl->end_pos < w->off + w->len ||
|
||||
impl->start_pos > w->off) {
|
||||
if(impl->end_pos < w->off + w->len ||
|
||||
impl->start_pos > w->off) {
|
||||
error = EDOM;
|
||||
}
|
||||
|
||||
if(!error) {
|
||||
|
||||
if(!error) {
|
||||
// Call write().
|
||||
error = file_write_unlocked(w->h, w->off, w->buf, w->len);
|
||||
}
|
||||
|
@ -376,20 +376,20 @@ static int file_release_write_buffer(stasis_write_buffer_t * w) {
|
|||
}
|
||||
|
||||
static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h,
|
||||
lsn_t off, lsn_t len) {
|
||||
lsn_t off, lsn_t len) {
|
||||
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
|
||||
if(!ret) { return NULL; }
|
||||
|
||||
|
||||
byte * buf = malloc(len);
|
||||
int error = 0;
|
||||
|
||||
if(!buf) { error = ENOMEM; }
|
||||
|
||||
if(!error) {
|
||||
if(!error) {
|
||||
error = file_read(h, off, buf, len);
|
||||
}
|
||||
|
||||
if(error) {
|
||||
if(error) {
|
||||
ret->h = h;
|
||||
ret->buf = 0;
|
||||
ret->off = 0;
|
||||
|
@ -397,7 +397,7 @@ static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h,
|
|||
ret->impl = 0;
|
||||
ret->error = error;
|
||||
if(buf) { free(buf); }
|
||||
} else {
|
||||
} else {
|
||||
ret->h = h;
|
||||
ret->buf = buf;
|
||||
ret->off = off;
|
||||
|
@ -405,9 +405,9 @@ static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h,
|
|||
ret->impl = 0;
|
||||
ret->error = 0;
|
||||
}
|
||||
return ret;
|
||||
return ret;
|
||||
}
|
||||
static int file_release_read_buffer(stasis_read_buffer_t * r) {
|
||||
static int file_release_read_buffer(stasis_read_buffer_t * r) {
|
||||
free((void*)r->buf);
|
||||
free(r);
|
||||
return 0;
|
||||
|
@ -450,7 +450,7 @@ static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
|
|||
pthread_mutex_lock(&impl->mut);
|
||||
int fd = impl->fd;
|
||||
lsn_t off = impl->start_pos;
|
||||
(void)off;
|
||||
(void)off;
|
||||
pthread_mutex_unlock(&impl->mut);
|
||||
{
|
||||
static int warned = 0;
|
||||
|
@ -489,40 +489,40 @@ static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
|
|||
}
|
||||
return ret;
|
||||
}
|
||||
static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) {
|
||||
static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) {
|
||||
file_impl * impl = h->impl;
|
||||
pthread_mutex_lock(&impl->mut);
|
||||
int error = 0;
|
||||
|
||||
if(new_start > impl->end_pos) {
|
||||
if(new_start > impl->end_pos) {
|
||||
updateEOF(h);
|
||||
if(!error && new_start > impl->end_pos) {
|
||||
if(!error && new_start > impl->end_pos) {
|
||||
error = EDOM;
|
||||
}
|
||||
}
|
||||
|
||||
if(!error && new_start > impl->start_pos) {
|
||||
if(!error && new_start > impl->start_pos) {
|
||||
char * tmpfile = malloc(strlen(impl->filename)+2);
|
||||
strcpy(tmpfile, impl->filename);
|
||||
tmpfile[strlen(tmpfile)+1] = '\0';
|
||||
tmpfile[strlen(tmpfile)] = '~';
|
||||
|
||||
int fd = open(tmpfile, impl->file_flags, impl->file_mode);
|
||||
|
||||
|
||||
lseek(fd, 0, SEEK_SET);
|
||||
lseek(impl->fd, new_start-impl->start_pos, SEEK_SET);
|
||||
int count;
|
||||
int buf_size = 1024 * 1024;
|
||||
char * buf = malloc(buf_size);
|
||||
while((count = read(impl->fd, buf, buf_size))) {
|
||||
if(count == -1) {
|
||||
while((count = read(impl->fd, buf, buf_size))) {
|
||||
if(count == -1) {
|
||||
error = errno;
|
||||
perror("truncate failed to read");
|
||||
}
|
||||
ssize_t bytes_written = 0;
|
||||
while(bytes_written < count) {
|
||||
while(bytes_written < count) {
|
||||
ssize_t write_ret = write(fd, buf+bytes_written, count-bytes_written);
|
||||
if(write_ret == -1) {
|
||||
if(write_ret == -1) {
|
||||
error = errno;
|
||||
perror("truncate failed to write");
|
||||
break;
|
||||
|
@ -532,23 +532,23 @@ static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) {
|
|||
if(error) break;
|
||||
}
|
||||
free(buf);
|
||||
if(!error) {
|
||||
if(-1 == close(impl->fd)) {
|
||||
if(!error) {
|
||||
if(-1 == close(impl->fd)) {
|
||||
error = errno;
|
||||
}
|
||||
impl->fd = fd;
|
||||
impl->start_pos = new_start;
|
||||
fsync(impl->fd);
|
||||
int rename_ret = rename(tmpfile, impl->filename);
|
||||
if(rename_ret) {
|
||||
if(rename_ret) {
|
||||
error = errno;
|
||||
}
|
||||
free(tmpfile);
|
||||
} else {
|
||||
} else {
|
||||
close(fd);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pthread_mutex_unlock(&impl->mut);
|
||||
return error;
|
||||
}
|
||||
|
@ -573,7 +573,7 @@ struct stasis_handle_t file_func = {
|
|||
.error = 0
|
||||
};
|
||||
|
||||
stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * filename, int flags, int mode) {
|
||||
stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, const char * filename, int flags, int mode) {
|
||||
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
|
||||
if(!ret) { return NULL; }
|
||||
*ret = file_func;
|
||||
|
@ -585,7 +585,7 @@ stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * filename,
|
|||
impl->end_pos = start_offset;
|
||||
assert(sizeof(off_t) >= (64/8));
|
||||
impl->fd = open(filename, flags, mode);
|
||||
if(impl->fd == -1) {
|
||||
if(impl->fd == -1) {
|
||||
ret->error = errno;
|
||||
}
|
||||
impl->filename = strdup(filename);
|
||||
|
|
115
src/stasis/io/handle.c
Normal file
115
src/stasis/io/handle.c
Normal file
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* handle.c
|
||||
*
|
||||
* Created on: May 7, 2009
|
||||
* Author: sears
|
||||
*/
|
||||
#include <config.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include <stasis/io/handle.h>
|
||||
|
||||
|
||||
// @todo this factory stuff doesn't really belong here...
|
||||
static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) {
|
||||
stasis_handle_t * h = stasis_handle(open_memory)(off);
|
||||
//h = stasis_handle(open_debug)(h);
|
||||
stasis_write_buffer_t * w = h->append_buffer(h, len);
|
||||
w->h->release_write_buffer(w);
|
||||
return h;
|
||||
}
|
||||
typedef struct sf_args {
|
||||
const char * filename;
|
||||
int openMode;
|
||||
int filePerm;
|
||||
} sf_args;
|
||||
static stasis_handle_t * slow_file_factory(void * argsP) {
|
||||
sf_args * args = (sf_args*) argsP;
|
||||
stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm);
|
||||
//h = stasis_handle(open_debug)(h);
|
||||
return h;
|
||||
}
|
||||
static void slow_file_factory_close(void * argsP) {
|
||||
// nop
|
||||
}
|
||||
static stasis_handle_t * slow_pfile_factory(void * argsP) {
|
||||
stasis_handle_t * h = argsP;
|
||||
return h;
|
||||
}
|
||||
static void slow_pfile_factory_close(void * argsP) {
|
||||
stasis_handle_t * h = argsP;
|
||||
h->close(h);
|
||||
}
|
||||
static int (*slow_close)(stasis_handle_t * h) = 0;
|
||||
static stasis_handle_t * slow_pfile = 0;
|
||||
static int nop_close(stasis_handle_t*h) { return 0; }
|
||||
|
||||
stasis_handle_t * stasis_handle(open)(const char * path) {
|
||||
#ifndef HAVE_O_DIRECT
|
||||
if(bufferManagerO_DIRECT) {
|
||||
printf("O_DIRECT not supported by this build; switching to conventional buffered I/O.\n");
|
||||
bufferManagerO_DIRECT = 0;
|
||||
}
|
||||
#endif
|
||||
int openMode;
|
||||
if(bufferManagerO_DIRECT) {
|
||||
#ifdef HAVE_O_DIRECT
|
||||
openMode = O_CREAT | O_RDWR | O_DIRECT;
|
||||
#else
|
||||
printf("Can't happen\n");
|
||||
abort();
|
||||
#endif
|
||||
} else {
|
||||
openMode = O_CREAT | O_RDWR;
|
||||
}
|
||||
stasis_handle_t * ret;
|
||||
/// @todo remove hardcoding of buffer manager implementations in transactional2.c
|
||||
|
||||
switch(bufferManagerFileHandleType) {
|
||||
case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: {
|
||||
struct sf_args * slow_arg = malloc(sizeof(sf_args));
|
||||
slow_arg->filename = path;
|
||||
|
||||
slow_arg->openMode = openMode;
|
||||
|
||||
slow_arg->filePerm = FILE_PERM;
|
||||
// Allow 4MB of outstanding writes.
|
||||
// @todo Where / how should we open storefile?
|
||||
int worker_thread_count = 4;
|
||||
if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) {
|
||||
// printf("\nusing pread()/pwrite()\n");
|
||||
slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm);
|
||||
slow_close = slow_pfile->close;
|
||||
slow_pfile->close = nop_close;
|
||||
ret =
|
||||
stasis_handle(open_non_blocking)(slow_pfile_factory, slow_pfile_factory_close, slow_pfile, 1, fast_factory,
|
||||
NULL, worker_thread_count, PAGE_SIZE * 1024 , 1024);
|
||||
|
||||
} else if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_FILE) {
|
||||
ret =
|
||||
stasis_handle(open_non_blocking)(slow_file_factory, slow_file_factory_close, slow_arg, 0, fast_factory,
|
||||
NULL, worker_thread_count, PAGE_SIZE * 1024, 1024);
|
||||
} else {
|
||||
printf("Unknown value for config option bufferManagerNonBlockingSlowHandleType\n");
|
||||
abort();
|
||||
}
|
||||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_FILE: {
|
||||
ret = stasis_handle_open_file(0, path, openMode, FILE_PERM);
|
||||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_PFILE: {
|
||||
ret = stasis_handle_open_pfile(0, path, openMode, FILE_PERM);
|
||||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: {
|
||||
assert(bufferManagerFileHandleType != BUFFER_MANAGER_FILE_HANDLE_DEPRECATED);
|
||||
} break;
|
||||
default: {
|
||||
printf("\nUnknown buffer manager filehandle type: %d\n",
|
||||
bufferManagerFileHandleType);
|
||||
abort();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
|
@ -145,6 +145,7 @@ typedef struct nbw_impl {
|
|||
|
||||
// Fields to manage slow handles
|
||||
stasis_handle_t * (*slow_factory)(void * arg);
|
||||
void (*slow_factory_close)(void * arg);
|
||||
void * slow_factory_arg;
|
||||
int slow_force_once;
|
||||
|
||||
|
@ -245,7 +246,7 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
|
|||
DEBUG("Blocking on write. %lld bytes (%lld max)\n",
|
||||
impl->used_buffer_size, impl->max_buffer_size);
|
||||
}
|
||||
|
||||
|
||||
pthread_mutex_unlock(&impl->mut);
|
||||
|
||||
np->dirty = INVALID_NODE;
|
||||
|
@ -362,6 +363,10 @@ static int nbw_close(stasis_handle_t * h) {
|
|||
free(impl->all_slow_handles);
|
||||
assert(impl->available_slow_handle_count == 0);
|
||||
|
||||
if(impl->slow_factory_close) {
|
||||
impl->slow_factory_close(impl->slow_factory_arg);
|
||||
}
|
||||
|
||||
free(h->impl);
|
||||
free(h);
|
||||
return 0;
|
||||
|
@ -777,7 +782,9 @@ static void * nbw_worker(void * handle) {
|
|||
}
|
||||
|
||||
stasis_handle_t * stasis_handle(open_non_blocking)
|
||||
(stasis_handle_t * (*slow_factory)(void * arg), void * slow_factory_arg,
|
||||
(stasis_handle_t * (*slow_factory)(void * arg),
|
||||
void (*slow_factory_close)(void * arg),
|
||||
void * slow_factory_arg,
|
||||
int slow_force_once,
|
||||
stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *),
|
||||
void * fast_factory_arg, int worker_thread_count, lsn_t buffer_size,
|
||||
|
@ -789,6 +796,7 @@ stasis_handle_t * stasis_handle(open_non_blocking)
|
|||
impl->end_pos = 0;
|
||||
|
||||
impl->slow_factory = slow_factory;
|
||||
impl->slow_factory_close = slow_factory_close;
|
||||
impl->slow_factory_arg = slow_factory_arg;
|
||||
|
||||
impl->slow_force_once = slow_force_once;
|
||||
|
|
|
@ -167,7 +167,7 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) {
|
|||
int err = errno;
|
||||
// XXX Why is sys_errlist[] is unavailable here?
|
||||
perror("pfile_read encountered an unknown error code.");
|
||||
fprintf(stderr, "pread() returned -1; errno is %d\n",err);
|
||||
fprintf(stderr, "pread() returned -1; errno is %d\n",err);
|
||||
abort(); // XXX other errors?
|
||||
}
|
||||
error = errno;
|
||||
|
@ -442,7 +442,7 @@ struct stasis_handle_t pfile_func = {
|
|||
};
|
||||
|
||||
stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset,
|
||||
char *filename,
|
||||
const char *filename,
|
||||
int flags, int mode) {
|
||||
stasis_handle_t *ret = malloc(sizeof(stasis_handle_t));
|
||||
if (!ret) { return NULL; }
|
||||
|
|
|
@ -1,26 +1,26 @@
|
|||
#include "../page.h"
|
||||
#include "header.h"
|
||||
#include <stasis/page.h>
|
||||
#include <stasis/page/header.h>
|
||||
#include <assert.h>
|
||||
/**
|
||||
@file header.c is dead code(?)
|
||||
@file header.c is dead code(?)
|
||||
@todo Delete header.c
|
||||
*/
|
||||
int headerPageInitialize() {
|
||||
Page * p;
|
||||
try_ret(0) {
|
||||
try_ret(0) {
|
||||
p = loadPage(-1, 0);
|
||||
assert(!compensation_error());
|
||||
} end_ret(0);
|
||||
int freePage;
|
||||
if(*page_type_ptr(p) != LLADD_HEADER_PAGE) {
|
||||
if(*page_type_ptr(p) != LLADD_HEADER_PAGE) {
|
||||
assert(*page_type_ptr(p) == 0) ;
|
||||
memset(p->memAddr, 0, PAGE_SIZE);
|
||||
stasis_page_cleanup(p);
|
||||
*page_type_ptr(p) = LLADD_HEADER_PAGE;
|
||||
*headerFreepage_ptr(p) = 1;
|
||||
*headerFreepagelist_ptr(p) = 0;
|
||||
*headerFreepagelist_ptr(p) = 0;
|
||||
}
|
||||
|
||||
|
||||
freePage = *headerFreepage_ptr(p);
|
||||
releasePage(p);
|
||||
assert(freePage);
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
#include <config.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include <stasis/common.h>
|
||||
#include <stasis/latches.h>
|
||||
|
@ -24,7 +21,7 @@
|
|||
#include <stasis/truncation.h>
|
||||
#include <stasis/io/handle.h>
|
||||
#include <stasis/blobManager.h> // XXX remove this, move Tread() to set.c
|
||||
#include <stdio.h>
|
||||
//#include <stdio.h>
|
||||
#include <assert.h>
|
||||
#include <limits.h>
|
||||
|
||||
|
@ -53,33 +50,6 @@ void stasis_transaction_table_init() {
|
|||
}
|
||||
}
|
||||
|
||||
// @todo this factory stuff doesn't really belong here...
|
||||
static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) {
|
||||
stasis_handle_t * h = stasis_handle(open_memory)(off);
|
||||
//h = stasis_handle(open_debug)(h);
|
||||
stasis_write_buffer_t * w = h->append_buffer(h, len);
|
||||
w->h->release_write_buffer(w);
|
||||
return h;
|
||||
}
|
||||
typedef struct sf_args {
|
||||
char * filename;
|
||||
int openMode;
|
||||
int filePerm;
|
||||
} sf_args;
|
||||
static stasis_handle_t * slow_file_factory(void * argsP) {
|
||||
sf_args * args = (sf_args*) argsP;
|
||||
stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm);
|
||||
//h = stasis_handle(open_debug)(h);
|
||||
return h;
|
||||
}
|
||||
static stasis_handle_t * slow_pfile_factory(void * argsP) {
|
||||
stasis_handle_t * h = argsP;
|
||||
return h;
|
||||
}
|
||||
static int (*slow_close)(stasis_handle_t * h) = 0;
|
||||
static stasis_handle_t * slow_pfile = 0;
|
||||
static int nop_close(stasis_handle_t*h) { return 0; }
|
||||
|
||||
int Tinit() {
|
||||
pthread_mutex_init(&stasis_transaction_table_mutex, NULL);
|
||||
stasis_initted = 1;
|
||||
|
@ -101,81 +71,15 @@ int Tinit() {
|
|||
}
|
||||
stasis_page_init();
|
||||
|
||||
#ifndef HAVE_O_DIRECT
|
||||
if(bufferManagerO_DIRECT) {
|
||||
printf("O_DIRECT not supported by this build; switching to conventional buffered I/O.\n");
|
||||
bufferManagerO_DIRECT = 0;
|
||||
}
|
||||
#endif
|
||||
int openMode;
|
||||
if(bufferManagerO_DIRECT) {
|
||||
#ifdef HAVE_O_DIRECT
|
||||
openMode = O_CREAT | O_RDWR | O_DIRECT;
|
||||
#else
|
||||
printf("Can't happen\n");
|
||||
abort();
|
||||
#endif
|
||||
if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) {
|
||||
printf("\nWarning: Using old I/O routines (with known bugs).\n");
|
||||
openPageFile();
|
||||
} else {
|
||||
openMode = O_CREAT | O_RDWR;
|
||||
stasis_handle_t * h = stasis_handle_open(stasis_store_file_name);
|
||||
// XXX should not be global.
|
||||
pageHandleOpen(h);
|
||||
}
|
||||
|
||||
/// @todo remove hardcoding of buffer manager implementations in transactional2.c
|
||||
|
||||
switch(bufferManagerFileHandleType) {
|
||||
case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: {
|
||||
struct sf_args * slow_arg = malloc(sizeof(sf_args));
|
||||
slow_arg->filename = stasis_store_file_name;
|
||||
|
||||
slow_arg->openMode = openMode;
|
||||
|
||||
slow_arg->filePerm = FILE_PERM;
|
||||
// Allow 4MB of outstanding writes.
|
||||
// @todo Where / how should we open storefile?
|
||||
stasis_handle_t * pageFile;
|
||||
int worker_thread_count = 4;
|
||||
if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) {
|
||||
// printf("\nusing pread()/pwrite()\n");
|
||||
slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm);
|
||||
slow_close = slow_pfile->close;
|
||||
slow_pfile->close = nop_close;
|
||||
pageFile =
|
||||
stasis_handle(open_non_blocking)(slow_pfile_factory, slow_pfile, 1, fast_factory,
|
||||
NULL, worker_thread_count, PAGE_SIZE * 1024 , 1024);
|
||||
|
||||
} else if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_FILE) {
|
||||
pageFile =
|
||||
stasis_handle(open_non_blocking)(slow_file_factory, slow_arg, 0, fast_factory,
|
||||
NULL, worker_thread_count, PAGE_SIZE * 1024, 1024);
|
||||
} else {
|
||||
printf("Unknown value for config option bufferManagerNonBlockingSlowHandleType\n");
|
||||
abort();
|
||||
}
|
||||
//pageFile = stasis_handle(open_debug)(pageFile);
|
||||
pageHandleOpen(pageFile);
|
||||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_FILE: {
|
||||
stasis_handle_t * pageFile =
|
||||
stasis_handle_open_file(0, stasis_store_file_name,
|
||||
openMode, FILE_PERM);
|
||||
pageHandleOpen(pageFile);
|
||||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_PFILE: {
|
||||
stasis_handle_t * pageFile =
|
||||
stasis_handle_open_pfile(0, stasis_store_file_name,
|
||||
openMode, FILE_PERM);
|
||||
pageHandleOpen(pageFile);
|
||||
} break;
|
||||
case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: {
|
||||
printf("\nWarning: Using old I/O routines (with known bugs).\n");
|
||||
openPageFile();
|
||||
} break;
|
||||
default: {
|
||||
printf("\nUnknown buffer manager filehandle type: %d\n",
|
||||
bufferManagerFileHandleType);
|
||||
abort();
|
||||
}
|
||||
}
|
||||
bufInit(bufferManagerType);
|
||||
stasis_buffer_manager_open(bufferManagerType);
|
||||
DEBUG("Buffer manager type = %d\n", bufferManagerType);
|
||||
pageOperationsInit();
|
||||
TallocInit();
|
||||
|
@ -441,14 +345,9 @@ int Tdeinit() {
|
|||
stasis_truncation_deinit();
|
||||
TnaiveHashDeinit();
|
||||
TallocDeinit();
|
||||
bufDeinit();
|
||||
stasis_buffer_manager_close();
|
||||
DEBUG("Closing page file tdeinit\n");
|
||||
closePageFile();
|
||||
if(slow_pfile) {
|
||||
slow_close(slow_pfile);
|
||||
slow_pfile = 0;
|
||||
slow_close = 0;
|
||||
}
|
||||
stasis_page_deinit();
|
||||
stasis_log_file->close(stasis_log_file);
|
||||
dirtyPagesDeinit();
|
||||
|
@ -464,12 +363,8 @@ int TuncleanShutdown() {
|
|||
stasis_suppress_unclean_shutdown_warnings = 1;
|
||||
stasis_truncation_deinit();
|
||||
TnaiveHashDeinit();
|
||||
simulateBufferManagerCrash();
|
||||
if(slow_pfile) {
|
||||
slow_close(slow_pfile);
|
||||
slow_pfile = 0;
|
||||
slow_close = 0;
|
||||
}
|
||||
stasis_buffer_manager_simulate_crash();
|
||||
// XXX: closePageFile?
|
||||
stasis_page_deinit();
|
||||
stasis_log_file->close(stasis_log_file);
|
||||
stasis_transaction_table_num_active = 0;
|
||||
|
|
|
@ -99,7 +99,7 @@ Page * loadPage(int xid, pageid_t pageid);
|
|||
Page * loadUninitializedPage(int xid, pageid_t pageid);
|
||||
|
||||
/**
|
||||
This is the function pointer that bufInit sets in order to
|
||||
This is the function pointer that stasis_buffer_manager_open sets in order to
|
||||
override loadPage.
|
||||
*/
|
||||
extern Page * (*loadPageImpl)(int xid, pageid_t pageid);
|
||||
|
@ -111,7 +111,7 @@ extern Page * (*loadUninitPageImpl)(int xid, pageid_t pageid);
|
|||
void releasePage(Page *p);
|
||||
|
||||
/**
|
||||
This is the function pointer that bufInit sets in order to
|
||||
This is the function pointer that stasis_buffer_manager_open sets in order to
|
||||
override releasePage.
|
||||
*/
|
||||
extern void (*releasePageImpl)(Page * p);
|
||||
|
@ -146,14 +146,14 @@ extern void (*forcePages)();
|
|||
This does not force page that have not been written to with pageWrite().
|
||||
*/
|
||||
extern void (*forcePageRange)(pageid_t start, pageid_t stop);
|
||||
extern void (*simulateBufferManagerCrash)();
|
||||
extern void (*stasis_buffer_manager_simulate_crash)();
|
||||
|
||||
int bufInit(int type);
|
||||
int stasis_buffer_manager_open(int type);
|
||||
/**
|
||||
* will write out any dirty pages, assumes that there are no running
|
||||
* transactions
|
||||
*/
|
||||
extern void (*bufDeinit)();
|
||||
extern void (*stasis_buffer_manager_close)();
|
||||
|
||||
#ifdef PROFILE_LATCHES_WRITE_ONLY
|
||||
#define loadPage(x,y) __profile_loadPage((x), (y), __FILE__, __LINE__)
|
||||
|
|
|
@ -1 +1 @@
|
|||
void bhBufInit();
|
||||
void stasis_buffer_manager_hash_open();
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#ifndef __STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H
|
||||
#define __STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H
|
||||
int bufManBufInit();
|
||||
int stasis_buffer_manager_deprecated_open();
|
||||
#endif//__STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H
|
||||
|
|
|
@ -1 +1 @@
|
|||
void paBufInit();
|
||||
void stasis_buffer_manager_mem_array_open();
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
/**
|
||||
|
||||
@file
|
||||
@file
|
||||
|
||||
Interface for I/O handle implementations.
|
||||
|
||||
|
@ -263,7 +263,7 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset);
|
|||
@param perm The file permissions to be passed to open()
|
||||
*/
|
||||
stasis_handle_t * stasis_handle(open_file)
|
||||
(lsn_t start_offset, char * path, int flags, int perm);
|
||||
(lsn_t start_offset, const char * path, int flags, int perm);
|
||||
/**
|
||||
Open a handle that is backed by a file. This handle uses pread()
|
||||
and pwrite(). It never holds a mutex while perfoming I/O.
|
||||
|
@ -278,7 +278,7 @@ stasis_handle_t * stasis_handle(open_file)
|
|||
@param perm The file permissions to be passed to open()
|
||||
*/
|
||||
stasis_handle_t * stasis_handle(open_pfile)
|
||||
(lsn_t start_offset, char * path, int flags, int perm);
|
||||
(lsn_t start_offset, const char * path, int flags, int perm);
|
||||
/**
|
||||
Given a factory for creating "fast" and "slow" handles, provide a
|
||||
handle that never makes callers wait for write requests to
|
||||
|
@ -326,7 +326,9 @@ stasis_handle_t * stasis_handle(open_pfile)
|
|||
before blocking.
|
||||
*/
|
||||
stasis_handle_t * stasis_handle(open_non_blocking)
|
||||
(stasis_handle_t * (*slow_factory)(void * arg), void * slow_factory_arg,
|
||||
(stasis_handle_t * (*slow_factory)(void * arg),
|
||||
void (*slow_factory_close)(void * arg),
|
||||
void * slow_factory_arg,
|
||||
int slow_force_once,
|
||||
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg),
|
||||
void * fast_factory_arg, int worker_thread_count, lsn_t buffer_size,
|
||||
|
@ -341,5 +343,9 @@ stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h);
|
|||
@param h All handle operations will be forwarded to h.
|
||||
*/
|
||||
stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h);
|
||||
/**
|
||||
* Open a Stasis file handle using default arguments.
|
||||
*/
|
||||
stasis_handle_t * stasis_handle(open)(const char * path);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -426,7 +426,7 @@ START_TEST(io_nonBlockingTest) {
|
|||
FILE_PERM
|
||||
};
|
||||
|
||||
h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, 0,
|
||||
h = stasis_handle(open_non_blocking)(slow_factory, 0, &slow_args, 0,
|
||||
fast_factory, 0,
|
||||
5, 1024*1024, 100);
|
||||
// h = stasis_handle(open_debug)(h);
|
||||
|
@ -435,7 +435,7 @@ START_TEST(io_nonBlockingTest) {
|
|||
|
||||
unlink("logfile.txt");
|
||||
|
||||
h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, 0,
|
||||
h = stasis_handle(open_non_blocking)(slow_factory, 0, &slow_args, 0,
|
||||
fast_factory, 0,
|
||||
5, 1024*1024, 100);
|
||||
//h = stasis_handle(open_debug)(h);
|
||||
|
@ -444,7 +444,7 @@ START_TEST(io_nonBlockingTest) {
|
|||
|
||||
unlink("logfile.txt");
|
||||
|
||||
h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, 0,
|
||||
h = stasis_handle(open_non_blocking)(slow_factory, 0, &slow_args, 0,
|
||||
fast_factory, 0,
|
||||
5, 1024 * 1024, 100);
|
||||
handle_concurrencytest(h);
|
||||
|
|
Loading…
Reference in a new issue