New access method for the buffer manager; write coalescing; runtime bufferManager selection and a few bufferManager bugfixes.

This commit is contained in:
Sears Russell 2007-08-20 16:23:57 +00:00
parent 8963d6d381
commit 90f011b049
9 changed files with 338 additions and 90 deletions

View file

@ -1,6 +1,6 @@
nobase_pkginclude_HEADERS=$(wildcard *.h) $(wildcard */*.h)
lib_LTLIBRARIES=libstasis.la
libstasis_la_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \
libstasis_la_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c flags.c stats.c io.c bufferManager.c linkedlist.c operations.c \
pageHandle.c pageFile.c pageCache.c page.c bufferPool.c blobManager.c recovery2.c truncation.c \
transactional2.c allocationPolicy.c \
lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\
@ -15,7 +15,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c st
operations/linearHashNTA.c operations/linkedListNTA.c \
operations/pageOrientedListNTA.c operations/bTree.c \
operations/regions.c operations/lsmTree.c \
io/rangeTracker.c io/memory.c io/file.c io/non_blocking.c io/debug.c \
io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c io/debug.c \
bufferManager/pageArray.c bufferManager/bufferHash.c \
replacementPolicy/lru.c replacementPolicy/lruFast.c
AM_CFLAGS=${GLOBAL_CFLAGS}

View file

@ -102,15 +102,7 @@ pthread_mutex_t pinCount_mutex = PTHREAD_MUTEX_INITIALIZER;
int pinCount = 0;
#endif
#ifdef USE_BUFFER_MANAGER
int bufferManagerType = USE_BUFFER_MANAGER;
#else
int bufferManagerType = BUFFER_MANAGER_HASH;
#endif
static struct LH_ENTRY(table) *activePages; /* page lookup */
/*static Page * activePagePtrs[MAX_BUFFER_SIZE];*/
static pthread_mutex_t loadPagePtr_mutex;

View file

@ -5,11 +5,26 @@
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <linkedlist.h>
/**
@file
File handle that avoids blocking on writes. It attempts to ensure
that the code calling it never waits for a "slow handle" to perform
a write. Instead, when a write request is recieved, it is
temporarly stored in a "fast handle". The caller provides factory
methods that instantiate fast and slow handles.
For effeciency, this file handle imposes a special restriction upon
its callers. It implicitly partitions the underlying file into
blocks based upon the read and write requests it receives. Future
reads and writes must access complete blocks, and may not span
multiple blocks. This works well for page files (where each page is
a block), and log files, where each log entry is a block, as is the
header that stasis appends to the log entry.
Design:
data structures: A rb tree holds a set of "fast" handles that manage
@ -35,8 +50,20 @@
*/
#define INVALID_NODE 2
/* If defined, merge writes immediately (not recommended, as doing so
decreases the granularity of the "dirty" bit, causing clean data to
be written back.). Whether or not this is defined, writes will be
merged by nbw_worker at flush.
*/
//#define EAGER_MERGE
/* If EAGER_MERGE is defined, this limits the number of pages it will
coaleasce into a single write.
*/
//#define MAX_MERGE 4
static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t * h, int error) {
/** @return a read buffer indicating an error has occured */
static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t *h,
int error) {
assert(error);
stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t));
r->h = h;
@ -46,8 +73,9 @@ static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t * h
r->error = error;
return r;
}
static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t * h, int error) {
/** @return a read buffer indicating a write error has occured */
static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t *h,
int error) {
assert(error);
stasis_write_buffer_t * w = malloc(sizeof(stasis_write_buffer_t));
w->h = h;
@ -63,6 +91,8 @@ typedef struct tree_node {
lsn_t start_pos;
lsn_t end_pos;
stasis_handle_t * h;
/** The number of I/O requests this node corresponds to. */
int write_count;
/** The number of threads accessing this handle. The handle cannot
be deallocated unless this is zero. */
int pin_count;
@ -122,6 +152,10 @@ typedef struct nbw_impl {
LinkedList * slow_handles;
int slow_handle_count;
// These two track statistics on write coalescing.
lsn_t requested_bytes_written;
lsn_t total_bytes_written;
// Fields to manage fast handles
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg);
void * fast_factory_arg;
@ -129,6 +163,7 @@ typedef struct nbw_impl {
struct RB_ENTRY(tree) * fast_handles;
int fast_handle_count;
int max_fast_handles;
int min_fast_handles;
lsn_t used_buffer_size;
lsn_t max_buffer_size;
@ -139,6 +174,10 @@ typedef struct nbw_impl {
int still_open;
} nbw_impl;
static inline void freeFastHandle(nbw_impl * impl, const tree_node * n);
/** Obtain a slow handle from the pool of existing ones, or obtain a new one
by calling impl->slow_factory.. */
static stasis_handle_t * getSlowHandle(nbw_impl * impl) {
pthread_mutex_lock(&impl->mut);
stasis_handle_t * slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles);
@ -152,6 +191,7 @@ static stasis_handle_t * getSlowHandle(nbw_impl * impl) {
}
return slow;
}
/** Release a file handle back into the pool of slow handles. */
static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) {
assert(slow);
pthread_mutex_lock(&impl->mut);
@ -163,36 +203,94 @@ static tree_node * allocTreeNode(lsn_t off, lsn_t len) {
tree_node * ret = malloc(sizeof(tree_node));
ret->start_pos = off;
ret->end_pos = off + len;
ret->write_count = 1;
ret->dirty = 0;
ret->pin_count = 1;
return ret;
}
static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) {
static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
lsn_t len) {
tree_node * np = allocTreeNode(off, len);
pthread_mutex_lock(&impl->mut);
const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles);
if(n == np) { // not found
if(impl->fast_handle_count > impl->max_fast_handles ||
impl->used_buffer_size > impl->max_buffer_size) {
RB_ENTRY(delete)(np, impl->fast_handles);
np->dirty = INVALID_NODE;
pthread_mutex_unlock(&impl->mut);
DEBUG("allocFastHandle(%lld)\n", off/PAGE_SIZE);
const tree_node * n = RB_ENTRY(lookup)(RB_LULTEQ, np, impl->fast_handles);
// this code only works when writes / reads are aligned to immutable
// boundaries, and never cross boundaries.
if((!n) ||
!(n->start_pos <= off &&
n->end_pos >= off + len)) {
// no completely overlapping range found; allocate space in np.
if(0 && (impl->fast_handle_count >= impl->max_fast_handles ||
impl->used_buffer_size + len > impl->max_buffer_size)) {
assert(n->end_pos <= off);
if(impl->fast_handle_count >= impl->max_fast_handles) {
printf("Blocking on write. %d handles (%d max)\n",
impl->fast_handle_count, impl->max_fast_handles);
}
if(impl->used_buffer_size + len > impl->max_buffer_size) {
printf("Blocking on write. %lld bytes (%lld max)\n",
impl->used_buffer_size, impl->max_buffer_size);
}
np->dirty = INVALID_NODE;
np->h = getSlowHandle(impl);
} else {
impl->fast_handle_count++;
impl->used_buffer_size += len;
pthread_mutex_unlock(&impl->mut);
np->h = impl->fast_factory(off,len,impl->fast_factory_arg);
}
#ifdef EAGER_MERGE
if(n && n->end_pos == off && n->write_count + 1 < MAX_MERGE) {
DEBUG("Did merge.\n");
((tree_node*)n)->pin_count++;
((tree_node*)n)->write_count++;
((tree_node*)n)->end_pos += len;
} else {
#endif
RB_ENTRY(search)(np, impl->fast_handles);
np->h = impl->fast_factory(off,len,impl->fast_factory_arg);
n = np;
#ifdef EAGER_MERGE
}
#endif
}
} else {
((tree_node*)n)->pin_count++;
pthread_mutex_unlock(&impl->mut);
free(np);
}
#ifdef EAGER_MERGE
// check for a mergable range immediately after the point we're interested in.
tree_node dummy;
dummy.start_pos = n->end_pos;
dummy.end_pos = n->end_pos+1;
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles)) && np->dirty && !np->pin_count && np->write_count + n->write_count < MAX_MERGE) {
DEBUG("Did post-merge of page %lld-%lld (%d) and %lld-%lld (%d) outstanding = %d\n", n->start_pos/PAGE_SIZE, -1+n->end_pos/PAGE_SIZE, n->write_count, np->start_pos/PAGE_SIZE, -1+np->end_pos/PAGE_SIZE, np->write_count, impl->fast_handle_count);
lsn_t appendLen = np->end_pos - np->start_pos;
stasis_read_buffer_t * r= np->h->read_buffer(np->h,np->start_pos, appendLen);
int ret = n->h->write(n->h,np->start_pos,r->buf, appendLen);
assert(!ret);
ret = r->h->release_read_buffer(r);
assert(!ret);
np->dirty = 0;
((tree_node*)n)->write_count += np->write_count;
freeFastHandle(impl,np);
RB_ENTRY(delete)(n,impl->fast_handles);
((tree_node*)n)->end_pos += appendLen;
RB_ENTRY(search)(n,impl->fast_handles);
}
#endif
pthread_mutex_unlock(&impl->mut);
return n;
}
static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) {
@ -210,8 +308,6 @@ static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t
should hold the mutex when calling freeFastHandle. */
static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) {
RB_ENTRY(delete)(n, impl->fast_handles);
impl->fast_handle_count--;
impl->used_buffer_size -= (n->end_pos - n->start_pos);
n->h->close(n->h);
free((void*)n);
}
@ -231,7 +327,9 @@ static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n,
((tree_node*)n)->dirty = setDirty;
}
pthread_mutex_unlock(&impl->mut);
pthread_cond_signal(&impl->pending_writes_cond);
if(impl->fast_handle_count > impl->min_fast_handles) {
pthread_cond_signal(&impl->pending_writes_cond);
}
return 0;
}
}
@ -256,11 +354,15 @@ static int nbw_close(stasis_handle_t * h) {
}
// No longer need latch; this is the only thread allowed to touch the handle.
free(impl->workers);
// printf("nbw had %d slow handles\n", impl->slow_handle_count);
// fflush(stdout);
DEBUG("nbw had %d slow handles\n", impl->slow_handle_count);
DEBUG("fast handles = %d, used buffer = %lld\n", impl->fast_handle_count, impl->used_buffer_size);
if(impl->requested_bytes_written < impl->total_bytes_written) {
printf("nbw: Problem with write coalescing detected.\n"
"Client wrote %lld bytes, handle wrote %lld.\n",
impl->requested_bytes_written, impl->total_bytes_written);
}
assert(impl->fast_handle_count == 0);
assert(impl->used_buffer_size == 0);
@ -322,6 +424,7 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
} else if(off + len > impl->end_pos) {
impl->end_pos = off+len;
}
impl->requested_bytes_written += len;
pthread_mutex_unlock(&impl->mut);
}
@ -334,6 +437,7 @@ static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
pthread_mutex_lock(&impl->mut);
lsn_t off = impl->end_pos;
impl->end_pos += len;
impl->requested_bytes_written += len;
pthread_mutex_unlock(&impl->mut);
return nbw_write_buffer(h, off, len);
@ -344,7 +448,6 @@ static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
const tree_node * n = w_impl->n;
w_impl->w->h->release_write_buffer(w_impl->w);
releaseFastHandle(impl, n, 1);
// pthread_cond_signal(&impl->pending_writes_cond);
free(w_impl);
free(w);
return 0;
@ -361,7 +464,7 @@ static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
r_impl->n = n;
r_impl->r = r;
stasis_read_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
ret->h = h;
ret->off = r->off;
ret->len = r->len;
@ -402,6 +505,7 @@ static int nbw_write(stasis_handle_t * h, lsn_t off,
} else if(off + len > impl->end_pos) {
impl->end_pos = off+len;
}
impl->requested_bytes_written += len;
pthread_mutex_unlock(&impl->mut);
}
return ret;
@ -413,6 +517,7 @@ static int nbw_append(stasis_handle_t * h, lsn_t * off,
pthread_mutex_lock(&impl->mut);
*off = impl->end_pos;
impl->end_pos+= len;
impl->requested_bytes_written += len;
pthread_mutex_unlock(&impl->mut);
int ret = nbw_write(h, *off, dat, len);
@ -468,6 +573,19 @@ struct stasis_handle_t nbw_func = {
.error = 0
};
/**
This worker thread simulates asynchrnous I/O by handling writeback
on behalf of the application. Multiple workers may be spawned for
a non-blocking handle.
This function walks the list of fast handles, writing back dirty
ones, and freeing clean ones. It (almost) never performs a write
while holding the mutex.
@todo Non-blocking handle should not memcpy() buffers while holding
the mutex.
*/
static void * nbw_worker(void * handle) {
stasis_handle_t * h = handle;
nbw_impl * impl = h->impl;
@ -488,15 +606,65 @@ static void * nbw_worker(void * handle) {
lsn_t off = fast->start_position(fast);
lsn_t len = fast->end_position(fast) - off;
stasis_read_buffer_t * r = fast->read_buffer(fast, off, len);
slow->write(slow, off, r->buf, len);
// cast strips const
byte *buf = (byte*)r->buf;
pthread_mutex_lock(&impl->mut);
int first = 1;
off_t buf_off = 0;
tree_node dummy;
dummy.start_pos = node->end_pos;
dummy.end_pos = node->end_pos+1;
tree_node * np;
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles))
&& np->dirty && !np->pin_count) {
lsn_t np_len = np->end_pos - np->start_pos;
len += np_len;
if(first) {
buf = malloc(r->len + len);
memcpy(buf, r->buf, r->len);
buf_off += r->len;
first = 0;
} else {
buf = realloc(buf, len);
}
stasis_handle_t * fast2 = np->h;
stasis_read_buffer_t * r2 = fast2->read_buffer(fast2,np->start_pos, np_len);
memcpy(buf + buf_off, r2->buf, np_len);
buf_off += np_len;
r2->h->release_read_buffer(r2);
np->dirty = 0;
dummy.start_pos = np->end_pos;
dummy.end_pos = np->end_pos+1;
}
impl->total_bytes_written += len;
pthread_mutex_unlock(&impl->mut);
if(len != PAGE_SIZE) {
DEBUG("merged %lld pages at %lld into single write\n", len/PAGE_SIZE, off/PAGE_SIZE);
}
slow->write(slow, off, buf, len);
if(!first) {
free(buf);
}
r->h->release_read_buffer(r);
releaseSlowHandle(impl, slow);
pthread_mutex_lock(&impl->mut);
node->pin_count--;
}
tree_node * new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles);
tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles);
if(!node->dirty && !node->pin_count) {
freeFastHandle(impl, node);
impl->fast_handle_count -= node->write_count;
impl->used_buffer_size -= (node->end_pos - node->start_pos);
freeFastHandle(impl, node);
}
node = new_node;
}
@ -530,13 +698,16 @@ stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_fact
impl->slow_handles = 0;
impl->slow_handle_count = 0;
impl->requested_bytes_written = 0;
impl->total_bytes_written = 0;
impl->fast_factory = fast_factory;
impl->fast_factory_arg = fast_factory_arg;
impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0);
impl->fast_handle_count = 0;
impl->max_fast_handles = max_fast_handles;
impl->min_fast_handles = max_fast_handles / 2;
impl->max_buffer_size = buffer_size;
impl->used_buffer_size = 0;
@ -557,5 +728,9 @@ stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_fact
perror("Coudln't spawn worker thread for non_blocking io");
}
}
DEBUG("Opened non blocking I/O handle; buffer size = %lldmb max outstanding writes = %d\n",
impl->max_buffer_size / (1024 * 1024), impl->max_fast_handles);
return h;
}

View file

@ -11,9 +11,7 @@
#include <stasis/consumer.h>
#include <stasis/lockManager.h>
#include <stasis/compensations.h>
#ifdef USE_PAGEFILE
#include "pageFile.h"
#endif
#include <stasis/pageHandle.h>
#include "page.h"
#include <stasis/logger/logger2.h>
@ -113,12 +111,20 @@ typedef struct sf_args {
int openMode;
int filePerm;
} sf_args;
static stasis_handle_t * slow_factory(void * argsP) {
static stasis_handle_t * slow_file_factory(void * argsP) {
sf_args * args = (sf_args*) argsP;
stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm);
//h = stasis_handle(open_debug)(h);
return h;
}
static stasis_handle_t * slow_pfile_factory(void * argsP) {
stasis_handle_t * h = argsP;
return h;
}
static int (*slow_close)(stasis_handle_t * h) = 0;
static stasis_handle_t * slow_pfile = 0;
static int nop_close(stasis_handle_t*h) { return 0; }
int Tinit() {
pthread_mutex_init(&transactional_2_mutex, NULL);
numActiveXactions = 0;
@ -129,25 +135,66 @@ int Tinit() {
dirtyPagesInit();
LogInit(loggerType);
pageInit();
#ifndef USE_PAGEFILE
struct sf_args * slow_arg = malloc(sizeof(sf_args));
slow_arg->filename = STORE_FILE;
#ifdef PAGE_FILE_O_DIRECT
slow_arg->openMode = O_CREAT | O_RDWR | O_DIRECT;
#else
slow_arg->openMode = O_CREAT | O_RDWR;
switch(bufferManagerFileHandleType) {
case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: {
struct sf_args * slow_arg = malloc(sizeof(sf_args));
slow_arg->filename = STORE_FILE;
#ifndef HAVE_O_DIRECT
if(bufferManagerO_DIRECT) {
printf("O_DIRECT not supported by this build; switching to conventional buffered I/O.\n");
bufferManagerO_DIRECT = 0;
}
#endif
slow_arg->filePerm = FILE_PERM;
// Allow 4MB of outstanding writes.
// @todo Where / how should we open storefile?
stasis_handle_t * pageFile =
stasis_handle(open_non_blocking)(slow_factory, slow_arg, fast_factory,
NULL, 20, PAGE_SIZE * 1024, 1024);
pageHandleOpen(pageFile);
if(bufferManagerO_DIRECT) {
#ifdef HAVE_O_DIRECT
slow_arg->openMode = O_CREAT | O_RDWR | O_DIRECT;
#else
printf("\nWarning: Using old I/O routines.\n");
openPageFile();
#endif // USE_PAGEFILE
printf("Can't happen\n");
abort();
#endif
} else {
slow_arg->openMode = O_CREAT | O_RDWR;
}
slow_arg->filePerm = FILE_PERM;
// Allow 4MB of outstanding writes.
// @todo Where / how should we open storefile?
stasis_handle_t * pageFile;
int worker_thread_count = 4;
if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) {
// printf("\nusing pread()/pwrite()\n");
slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm);
slow_close = slow_pfile->close;
slow_pfile->close = nop_close;
pageFile =
stasis_handle(open_non_blocking)(slow_pfile_factory, slow_pfile, fast_factory,
NULL, worker_thread_count, PAGE_SIZE * 1024 , 1024);
} else if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_FILE) {
pageFile =
stasis_handle(open_non_blocking)(slow_file_factory, slow_arg, fast_factory,
NULL, worker_thread_count, PAGE_SIZE * 1024, 1024);
} else {
printf("Unknown value for config option bufferManagerNonBlockingSlowHandleType\n");
abort();
}
//pageFile = stasis_handle(open_debug)(pageFile);
pageHandleOpen(pageFile);
} break;
case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: {
printf("\nWarning: Using old I/O routines (with known bugs).\n");
openPageFile();
} break;
default: {
printf("\nUnknown buffer manager filehandle type: %d\n",
bufferManagerFileHandleType);
abort();
}
}
//#else
//#endif // USE_PAGEFILE
bufInit(bufferManagerType);
DEBUG("Buffer manager type = %d\n", bufferManagerType);
pageOperationsInit();
@ -364,6 +411,11 @@ int Tdeinit() {
bufDeinit();
DEBUG("Closing page file tdeinit\n");
closePageFile();
if(slow_pfile) {
slow_close(slow_pfile);
slow_pfile = 0;
slow_close = 0;
}
pageDeinit();
LogDeinit();
dirtyPagesDeinit();
@ -374,6 +426,11 @@ int TuncleanShutdown() {
truncationDeinit();
ThashDeinit();
simulateBufferManagerCrash();
if(slow_pfile) {
slow_close(slow_pfile);
slow_pfile = 0;
slow_close = 0;
}
pageDeinit();
LogDeinit();
numActiveXactions = 0;

View file

@ -96,20 +96,6 @@ BEGIN_C_DECLS
typedef struct Page_s Page_s;
typedef struct Page_s Page;
/**
This is the type of buffer manager that is being used.
Before Stasis is intialized it will be set to a default value.
It may be changed before Tinit() is called, or overridden at
compile time by defining USE_BUFFER_MANAGER.
(eg: gcc ... -DUSE_BUFFER_MANAGER=BUFFER_MANAGER_FOO)
@see constants.h for a list of recognized buffer manager implementations. (The constants are named BUFFER_MANAGER_*)
*/
extern int bufferManagerType;
/**
* @param xid The transaction that is pinning the page (used by page-level locking implementations.)
* @param pageid ID of the page you want to load

View file

@ -96,6 +96,12 @@ terms specified in this license.
#define BUFFER_MANAGER_MEM_ARRAY 2
#define BUFFER_MANAGER_DEPRECATED_HASH 3
#define BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING 0
#define BUFFER_MANAGER_FILE_HANDLE_DEPRECATED 1
#define IO_HANDLE_FILE 1
#define IO_HANDLE_PFILE 2
#define LOG_TO_FILE 0
#define LOG_TO_MEMORY 1

View file

@ -91,8 +91,8 @@ typedef struct stasis_write_buffer_t {
typedef struct stasis_read_buffer_t {
stasis_handle_t * h;
const byte * buf;
lsn_t off;
const byte * buf;
lsn_t len;
void * impl;
int error;
@ -100,6 +100,7 @@ typedef struct stasis_read_buffer_t {
stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset);
stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * path, int flags, int mode);
stasis_handle_t * stasis_handle(open_pfile)(lsn_t start_offset, char * path, int flags, int mode);
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg),
void * slow_factory_arg,
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg),

View file

@ -489,6 +489,7 @@ terms specified in this license.
#define __TRANSACTIONAL_H__
#include "common.h"
#include "flags.h"
BEGIN_C_DECLS

View file

@ -132,6 +132,8 @@ typedef struct {
lsn_t trunc_val;
pthread_mutex_t trunc_mut = PTHREAD_MUTEX_INITIALIZER;
int load_handle_truncate_is_supported = 1;
void load_handle(thread_arg* t) {
lsn_t * offsets = malloc(t->count * sizeof(lsn_t));
@ -249,7 +251,7 @@ void load_handle(thread_arg* t) {
}
// Truncate 1% of the time.
if(!myrandom(100)) {
if(!myrandom(100) && load_handle_truncate_is_supported) {
lsn_t pre_start = h->start_position(h);
pthread_mutex_lock(&trunc_mut);
@ -355,7 +357,7 @@ START_TEST(io_fileTest) {
unlink("logfile.txt");
h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// handle_concurrencytest(h);
//handle_concurrencytest(h);
h->close(h);
unlink("logfile.txt");
@ -372,6 +374,33 @@ static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) {
}
START_TEST(io_pfileTest) {
load_handle_truncate_is_supported = 0;
stasis_handle_t * h;
h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// h = stasis_handle(open_debug)(h);
handle_smoketest(h);
h->close(h);
unlink("logfile.txt");
h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//h = stasis_handle(open_debug)(h);
handle_sequentialtest(h);
h->close(h);
unlink("logfile.txt");
h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//handle_concurrencytest(h);
h->close(h);
unlink("logfile.txt");
load_handle_truncate_is_supported = 1;
} END_TEST
typedef struct sf_args {
char * filename;
int openMode;
@ -431,6 +460,7 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */
tcase_add_test(tc, io_memoryTest);
tcase_add_test(tc, io_fileTest);
tcase_add_test(tc, io_pfileTest);
tcase_add_test(tc, io_nonBlockingTest);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);