Final commit from google.

This commit is contained in:
Sears Russell 2007-08-24 23:01:08 +00:00
parent 5bd2138a8b
commit 5f954eb239
8 changed files with 399 additions and 166 deletions

View file

@ -0,0 +1,78 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stasis/transactional.h>
#include <stasis/truncation.h>
/*static stasis_handle_t * memory_factory(lsn_t off, lsn_t len, void * ignored) {
stasis_handle_t * h = stasis_handle(open_memory)(off);
//h = stasis_handle(open_debug)(h);
stasis_write_buffer_t * w = h->append_buffer(h, len);
w->h->release_write_buffer(w);
return h;
}
typedef struct sf_args {
char * filename;
int openMode;
int filePerm;
} sf_args;
static stasis_handle_t * traditional_file_factory(void * argsP) {
sf_args * args = (sf_args*) argsP;
stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm);
//h = stasis_handle(open_debug)(h);
return h;
}
*/
static inline long mb_to_page(long mb) {
return (mb * 1024 * 1024) / PAGE_SIZE;
}
int main(int argc, char ** argv) {
int direct = 0;
int legacyBM = 0;
int legacyFH = 0;
long page_count = mb_to_page(100);
for(int i = 1; i < argc; i++) {
if(!strcmp(argv[i], "--direct")) {
direct = 1;
bufferManagerO_DIRECT = 1;
} else if(!strcmp(argv[i], "--deprecatedBM")) {
bufferManagerType = BUFFER_MANAGER_DEPRECATED_HASH;
legacyBM = 1;
} else if(!strcmp(argv[i], "--deprecatedFH")) {
bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_DEPRECATED;
legacyFH = 1;
} else if(!strcmp(argv[i], "--pfile")) {
bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE;
} else if(!strcmp(argv[i], "--file")) {
bufferManagerNonBlockingSlowHandleType = IO_HANDLE_FILE;
} else if(!strcmp(argv[i], "--mb")) {
i++;
page_count = mb_to_page(atoll(argv[i]));
} else {
printf("Unknown argument: %s\n", argv[i]);
return 1;
}
}
if(legacyFH && direct) {
printf("--direct and --deprecatedFH are incompatible with each other\n");
return 1;
}
Tinit();
for(long i =0; i < page_count; i++) {
Page * p = loadPage(-1, i);
dirtyPages_add(p);
releasePage(p);
}
Tdeinit();
return 0;
}

View file

@ -22,6 +22,17 @@
#include <unistd.h>
#ifndef __LIBDFA_RW_H
#define __LIBDFA_RW_H
#ifdef __cplusplus
# define BEGIN_C_DECLS extern "C" {
# define END_C_DECLS }
#else /* !__cplusplus */
# define BEGIN_C_DECLS
# define END_C_DECLS
#endif /* __cplusplus */
BEGIN_C_DECLS
typedef struct {
pthread_mutex_t *mut;
int writers;
@ -55,4 +66,7 @@ rwargs *newRWargs (rwl *l, int i, long d);
void *reader (void *args);
void *writer (void *args);
*/
END_C_DECLS
#endif /* rw.h */

View file

@ -106,9 +106,9 @@ inline static Page * writeBackOnePage() {
lru->remove(lru, victim);
Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(int));
assert(old == victim);
// printf("Write(%ld)\n", (long)victim->id);
pageWrite(victim);
pageWrite(victim); /// XXX pageCleanup and pageFlushed might be heavyweight.
pageCleanup(victim);
// Make sure that no one mistakenly thinks this is still a live copy.
victim->id = -1;

View file

@ -62,7 +62,7 @@
//#define MAX_MERGE 4
/** @return a read buffer indicating an error has occured */
static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t *h,
static inline stasis_read_buffer_t* alloc_read_buffer_error(stasis_handle_t *h,
int error) {
assert(error);
stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t));
@ -74,8 +74,8 @@ static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t *h,
return r;
}
/** @return a read buffer indicating a write error has occured */
static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t *h,
int error) {
static inline stasis_write_buffer_t* alloc_write_buffer_error
(stasis_handle_t *h, int error) {
assert(error);
stasis_write_buffer_t * w = malloc(sizeof(stasis_write_buffer_t));
w->h = h;
@ -87,7 +87,7 @@ static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t *
return w;
}
/** Wraps stasis_handle_t so that it can be stored in an rbtree. */
typedef struct tree_node {
typedef struct tree_node {
lsn_t start_pos;
lsn_t end_pos;
stasis_handle_t * h;
@ -97,43 +97,43 @@ typedef struct tree_node {
be deallocated unless this is zero. */
int pin_count;
/** set to 1 when the handle is written to, 0 when the handle is
written back to disk, INVALID_NODE when the handle is not in
written back to disk, INVALID_NODE when the handle is not in
the tree. */
int dirty;
} tree_node;
/** Wrapper for write buffers */
typedef struct write_buffer_impl {
typedef struct write_buffer_impl {
/** The tree node that contains this buffer */
const tree_node * n;
/** The underlying buffer. */
stasis_write_buffer_t * w;
} write_buffer_impl;
typedef struct read_buffer_impl {
/** The tree node that contains this buffer, or NULL if the buffer
typedef struct read_buffer_impl {
/** The tree node that contains this buffer, or NULL if the buffer
is from a slow handle. */
const tree_node * n;
/** The underlying buffer. */
stasis_read_buffer_t * r;
} read_buffer_impl;
/**
/**
Compare two tree_node structs. Two tree nodes are equal if they
are zero length, and start at the same point, or if they overlap.
*/
static int cmp_handle(const void * ap, const void * bp, const void * ignored) {
tree_node * a = (tree_node*)ap;
tree_node * b = (tree_node*)bp;
if(a->start_pos == b->start_pos &&
if(a->start_pos == b->start_pos &&
a->start_pos == a->end_pos &&
b->start_pos == b->end_pos ) {
b->start_pos == b->end_pos ) {
return 0;
}
if(a->end_pos <= b->start_pos) {
if(a->end_pos <= b->start_pos) {
return -1;
} else if(a->start_pos >= b->end_pos) {
return 1;
} else {
} else {
return 0;
}
}
@ -143,7 +143,7 @@ typedef struct nbw_impl {
// Handle state
lsn_t start_pos;
lsn_t end_pos;
lsn_t end_pos;
// Fields to manage slow handles
stasis_handle_t * (*slow_factory)(void * arg);
@ -178,7 +178,7 @@ static inline void freeFastHandle(nbw_impl * impl, const tree_node * n);
/** Obtain a slow handle from the pool of existing ones, or obtain a new one
by calling impl->slow_factory.. */
static stasis_handle_t * getSlowHandle(nbw_impl * impl) {
static stasis_handle_t * getSlowHandle(nbw_impl * impl) {
pthread_mutex_lock(&impl->mut);
stasis_handle_t * slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles);
assert(slow);
@ -186,20 +186,20 @@ static stasis_handle_t * getSlowHandle(nbw_impl * impl) {
impl->slow_handle_count++;
pthread_mutex_unlock(&impl->mut);
slow = impl->slow_factory(impl->slow_factory_arg);
} else {
} else {
pthread_mutex_unlock(&impl->mut);
}
return slow;
}
/** Release a file handle back into the pool of slow handles. */
static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) {
static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) {
assert(slow);
pthread_mutex_lock(&impl->mut);
addVal(&impl->slow_handles, (long)slow);
pthread_mutex_unlock(&impl->mut);
}
static tree_node * allocTreeNode(lsn_t off, lsn_t len) {
static tree_node * allocTreeNode(lsn_t off, lsn_t len) {
tree_node * ret = malloc(sizeof(tree_node));
ret->start_pos = off;
ret->end_pos = off + len;
@ -235,7 +235,7 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
impl->fast_handle_count, impl->max_fast_handles);
}
if(impl->used_buffer_size + len > impl->max_buffer_size) {
printf("Blocking on write. %lld bytes (%lld max)\n",
printf("Blocking on write. %lld bytes (%lld max)\n",
impl->used_buffer_size, impl->max_buffer_size);
}
@ -267,15 +267,24 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
}
#ifdef EAGER_MERGE
// check for a mergable range immediately after the point we're interested in.
// check for a mergable range immediately after the point we're
// interested in. (DEAD CODE)
tree_node dummy;
dummy.start_pos = n->end_pos;
dummy.end_pos = n->end_pos+1;
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles)) && np->dirty && !np->pin_count && np->write_count + n->write_count < MAX_MERGE) {
DEBUG("Did post-merge of page %lld-%lld (%d) and %lld-%lld (%d) outstanding = %d\n", n->start_pos/PAGE_SIZE, -1+n->end_pos/PAGE_SIZE, n->write_count, np->start_pos/PAGE_SIZE, -1+np->end_pos/PAGE_SIZE, np->write_count, impl->fast_handle_count);
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles))
&& np->dirty
&& !np->pin_count
&& np->write_count + n->write_count < MAX_MERGE) {
DEBUG("Did post-merge of page %lld-%lld (%d) and %lld-%lld (%d) "
"outstanding = %d\n", n->start_pos/PAGE_SIZE,
-1+n->end_pos/PAGE_SIZE, n->write_count, np->start_pos/PAGE_SIZE,
-1+np->end_pos/PAGE_SIZE, np->write_count, impl->fast_handle_count);
lsn_t appendLen = np->end_pos - np->start_pos;
stasis_read_buffer_t * r= np->h->read_buffer(np->h,np->start_pos, appendLen);
stasis_read_buffer_t * r= np->h->read_buffer(np->h,np->start_pos,
appendLen);
int ret = n->h->write(n->h,np->start_pos,r->buf, appendLen);
assert(!ret);
ret = r->h->release_read_buffer(r);
@ -293,7 +302,8 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
return n;
}
static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) {
static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off,
lsn_t len) {
tree_node * np = allocTreeNode(off, len);
pthread_mutex_lock(&impl->mut);
@ -311,19 +321,19 @@ static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) {
n->h->close(n->h);
free((void*)n);
}
static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n,
static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n,
int setDirty) {
if(n->dirty == INVALID_NODE) {
if(n->dirty == INVALID_NODE) {
// Not in tree; cast removes "const"
releaseSlowHandle(impl, n->h);
free((void*)n);
return 0;
} else {
} else {
assert(setDirty == 0 || setDirty == 1);
assert(n->dirty == 0 || n->dirty == 1);
pthread_mutex_lock(&impl->mut);
((tree_node*)n)->pin_count--;
if(n->dirty == 0) {
if(n->dirty == 0) {
((tree_node*)n)->dirty = setDirty;
}
pthread_mutex_unlock(&impl->mut);
@ -334,14 +344,14 @@ static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n,
}
}
/** @todo nbw_num_copies is unimplemented. */
static int nbw_num_copies(stasis_handle_t * h) {
static int nbw_num_copies(stasis_handle_t * h) {
return 0;
}
/** @todo nbw_num_copies_buffer is unimplemented. */
static int nbw_num_copies_buffer(stasis_handle_t * h) {
static int nbw_num_copies_buffer(stasis_handle_t * h) {
return 0;
}
static int nbw_close(stasis_handle_t * h) {
static int nbw_close(stasis_handle_t * h) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
@ -357,7 +367,8 @@ static int nbw_close(stasis_handle_t * h) {
free(impl->workers);
DEBUG("nbw had %d slow handles\n", impl->slow_handle_count);
DEBUG("fast handles = %d, used buffer = %lld\n", impl->fast_handle_count, impl->used_buffer_size);
DEBUG("fast handles = %d, used buffer = %lld\n", impl->fast_handle_count,
impl->used_buffer_size);
if(impl->requested_bytes_written < impl->total_bytes_written) {
printf("nbw: Problem with write coalescing detected.\n"
"Client wrote %lld bytes, handle wrote %lld.\n",
@ -377,47 +388,47 @@ static int nbw_close(stasis_handle_t * h) {
destroyList(&impl->slow_handles);
assert(impl->slow_handle_count == 0);
free(h->impl);
free(h);
return 0;
}
static lsn_t nbw_start_position(stasis_handle_t *h) {
static lsn_t nbw_start_position(stasis_handle_t *h) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
lsn_t ret = impl->start_pos;
pthread_mutex_unlock(&impl->mut);
return ret;
}
static lsn_t nbw_end_position(stasis_handle_t *h) {
static lsn_t nbw_end_position(stasis_handle_t *h) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
lsn_t ret = impl->end_pos;
pthread_mutex_unlock(&impl->mut);
return ret;
}
static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
nbw_impl * impl = h->impl;
const tree_node * n = allocFastHandle(impl, off, len);
stasis_write_buffer_t * w = n->h->write_buffer(n->h, off, len);
write_buffer_impl * w_impl = malloc(sizeof(write_buffer_impl));
w_impl->n = n;
w_impl->n = n;
w_impl->w = w;
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
ret->h = h;
ret->off = w->off;
ret->len = w->len;
ret->buf = w->buf;
ret->h = h;
ret->off = w->off;
ret->len = w->len;
ret->buf = w->buf;
ret->error = w->error;
ret->impl = w_impl;
if(!ret->error) {
ret->impl = w_impl;
if(!ret->error) {
pthread_mutex_lock(&impl->mut);
assert(impl->start_pos <= impl->end_pos);
if(off < impl->start_pos) {
if(off < impl->start_pos) {
// Note: We're returning a valid write buffer to space before
// the handle's truncation point. Spooky.
ret->error = EDOM;
@ -430,8 +441,8 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
return ret;
}
static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
lsn_t len) {
static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
lsn_t len) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
@ -442,7 +453,7 @@ static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
return nbw_write_buffer(h, off, len);
}
static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
nbw_impl * impl = w->h->impl;
write_buffer_impl * w_impl = w->impl;
const tree_node * n = w_impl->n;
@ -453,7 +464,7 @@ static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
return 0;
}
static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
lsn_t off, lsn_t len) {
nbw_impl * impl = h->impl;
const tree_node * n = findFastHandle(impl, off, len);
stasis_read_buffer_t * r;
@ -474,16 +485,17 @@ static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
return ret;
}
static int nbw_release_read_buffer(stasis_read_buffer_t * r) {
static int nbw_release_read_buffer(stasis_read_buffer_t * r) {
nbw_impl * impl = r->h->impl;
read_buffer_impl * r_impl = r->impl;
const tree_node * n = r_impl->n;
stasis_handle_t * oldHandle = r_impl->r->h;
r_impl->r->h->release_read_buffer(r_impl->r);
// XXX shouldn't need to check for this here; getFastHandle does something similar...
if(n) {
// XXX shouldn't need to check for this here; getFastHandle does
// something similar...
if(n) {
releaseFastHandle(impl, n, 0);
} else {
} else {
assert(oldHandle);
releaseSlowHandle(impl, oldHandle);
}
@ -491,16 +503,16 @@ static int nbw_release_read_buffer(stasis_read_buffer_t * r) {
free(r);
return 0;
}
static int nbw_write(stasis_handle_t * h, lsn_t off,
const byte * dat, lsn_t len) {
static int nbw_write(stasis_handle_t * h, lsn_t off,
const byte * dat, lsn_t len) {
nbw_impl * impl = h->impl;
const tree_node * n = allocFastHandle(impl, off, len);
int ret = n->h->write(n->h, off, dat, len);
releaseFastHandle(impl, n, 1);
if(!ret) {
if(!ret) {
pthread_mutex_lock(&impl->mut);
assert(impl->start_pos <= impl->end_pos);
if(off < impl->start_pos) {
if(off < impl->start_pos) {
ret = EDOM;
} else if(off + len > impl->end_pos) {
impl->end_pos = off+len;
@ -510,8 +522,8 @@ static int nbw_write(stasis_handle_t * h, lsn_t off,
}
return ret;
}
static int nbw_append(stasis_handle_t * h, lsn_t * off,
const byte * dat, lsn_t len) {
static int nbw_append(stasis_handle_t * h, lsn_t * off,
const byte * dat, lsn_t len) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
@ -523,33 +535,33 @@ static int nbw_append(stasis_handle_t * h, lsn_t * off,
int ret = nbw_write(h, *off, dat, len);
return ret;
}
static int nbw_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) {
static int nbw_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) {
nbw_impl * impl = h->impl;
const tree_node * n = findFastHandle(impl, off, len);
int ret;
// XXX should be handled by releaseFastHandle.
if(n) {
if(n) {
ret = n->h->read(n->h, off, buf, len);
releaseFastHandle(impl, n, 0);
} else {
} else {
stasis_handle_t * slow = getSlowHandle(impl);
ret = slow->read(slow, off, buf, len);
releaseSlowHandle(impl, slow);
}
return ret;
}
static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) {
static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) {
nbw_impl * impl = h->impl;
int error = 0;
pthread_mutex_lock(&impl->mut);
if(new_start <= impl->end_pos && new_start > impl->start_pos) {
if(new_start <= impl->end_pos && new_start > impl->start_pos) {
impl->start_pos = new_start;
} else {
} else {
error = EDOM;
}
pthread_mutex_unlock(&impl->mut);
if(!error) {
if(!error) {
// XXX close all slow handles; truncate of them. (ie: implement truncate)
}
return error;
@ -586,12 +598,12 @@ struct stasis_handle_t nbw_func = {
the mutex.
*/
static void * nbw_worker(void * handle) {
static void * nbw_worker(void * handle) {
stasis_handle_t * h = handle;
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
while(1) {
while(1) {
// cast strips const.
tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles);
int writes = 0;
@ -633,7 +645,8 @@ static void * nbw_worker(void * handle) {
}
stasis_handle_t * fast2 = np->h;
stasis_read_buffer_t * r2 = fast2->read_buffer(fast2,np->start_pos, np_len);
stasis_read_buffer_t * r2 = fast2->read_buffer(fast2,np->start_pos,
np_len);
memcpy(buf + buf_off, r2->buf, np_len);
buf_off += np_len;
r2->h->release_read_buffer(r2);
@ -647,7 +660,8 @@ static void * nbw_worker(void * handle) {
pthread_mutex_unlock(&impl->mut);
if(len != PAGE_SIZE) {
DEBUG("merged %lld pages at %lld into single write\n", len/PAGE_SIZE, off/PAGE_SIZE);
DEBUG("merged %lld pages at %lld into single write\n",
len/PAGE_SIZE, off/PAGE_SIZE);
}
slow->write(slow, off, buf, len);
@ -660,15 +674,16 @@ static void * nbw_worker(void * handle) {
pthread_mutex_lock(&impl->mut);
node->pin_count--;
}
tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles);
if(!node->dirty && !node->pin_count) {
tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node,
impl->fast_handles);
if(!node->dirty && !node->pin_count) {
impl->fast_handle_count -= node->write_count;
impl->used_buffer_size -= (node->end_pos - node->start_pos);
freeFastHandle(impl, node);
}
node = new_node;
}
if(!impl->fast_handle_count || !writes) {
if(!impl->fast_handle_count || !writes) {
if(impl->still_open) {
pthread_cond_wait(&impl->pending_writes_cond, &impl->mut);
} else {
@ -680,12 +695,11 @@ static void * nbw_worker(void * handle) {
return 0;
}
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg),
void * slow_factory_arg,
stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *),
void * fast_factory_arg,
int worker_thread_count,
lsn_t buffer_size, int max_fast_handles) {
stasis_handle_t * stasis_handle(open_non_blocking)
(stasis_handle_t * (*slow_factory)(void * arg), void * slow_factory_arg,
stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *),
void * fast_factory_arg, int worker_thread_count, lsn_t buffer_size,
int max_fast_handles) {
nbw_impl * impl = malloc(sizeof(nbw_impl));
pthread_mutex_init(&impl->mut, 0);
@ -715,16 +729,16 @@ stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_fact
impl->worker_count = worker_thread_count;
pthread_cond_init(&impl->pending_writes_cond, 0);
impl->still_open = 1;
stasis_handle_t *h = malloc(sizeof(stasis_handle_t));
*h = nbw_func;
h->impl = impl;
for(int i = 0; i < impl->worker_count; i++) {
for(int i = 0; i < impl->worker_count; i++) {
int err = pthread_create(&(impl->workers[i]), 0, nbw_worker, h);
if(err) {
if(err) {
perror("Coudln't spawn worker thread for non_blocking io");
}
}

View file

@ -5,8 +5,10 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <assert.h>
#include <stasis/common.h>
#include <stasis/io/handle.h>
#include <pthread.h>
@ -22,62 +24,95 @@
@see handle.h
*/
/**
Per-handle information for pfile
*/
typedef struct pfile_impl {
/**
This should be held whenever end_pos is accessed.
*/
pthread_mutex_t mut;
/**
The logical offset of the file. Once the file is open, this will
never change, as pfile doesn't support truncation.
*/
lsn_t start_pos;
/**
The logical end of the file.
*/
lsn_t end_pos;
/**
File descriptor
*/
int fd;
/**
Flags passed into open
*/
int file_flags;
/**
File creation mode
*/
int file_mode;
char * filename;
/**
The name of the underlying file.
*/
char *filename;
} pfile_impl;
/**
We can pass the caller's buffer directly into pread()/pwrite()
without making any copies.
*/
static int pfile_num_copies(stasis_handle_t *h) { return 0; }
/**
We have to call malloc(), but not memcpy(). Maybe this should return 1.
*/
static int pfile_num_copies_buffer(stasis_handle_t *h) { return 0; }
static int pfile_close(stasis_handle_t *h) {
pfile_impl * impl = (pfile_impl*)h->impl;
static int pfile_close(stasis_handle_t *h) {
pfile_impl *impl = (pfile_impl*)h->impl;
DEBUG("Closing pfile: end = %lld\n", impl->end_pos);
int fd = impl->fd;
free((void*)impl->filename);
free(impl);
free(h);
int ret = close(fd);
if(!ret) return 0;
if (!ret) return 0;
else return errno;
}
static lsn_t pfile_start_position(stasis_handle_t *h) {
pfile_impl * impl = (pfile_impl*)h->impl;
pfile_impl *impl = (pfile_impl*)h->impl;
return impl->start_pos;
}
static lsn_t pfile_end_position(stasis_handle_t *h) {
pfile_impl * impl = (pfile_impl*)h->impl;
pfile_impl *impl = (pfile_impl*)h->impl;
pthread_mutex_lock(&impl->mut);
lsn_t ret = impl->end_pos;
pthread_mutex_unlock(&impl->mut);
return ret;
}
inline static int pfile_write_unlocked(int fd, lsn_t off, const byte * dat,
inline static int pfile_write_unlocked(int fd, lsn_t off, const byte *dat,
lsn_t len) {
int error = 0;
ssize_t bytes_written = 0;
while(bytes_written < len) {
while (bytes_written < len) {
ssize_t count = pwrite(fd,
dat + bytes_written,
len - bytes_written,
off + bytes_written);
if(count == -1) {
if(errno == EAGAIN || errno == EINTR) {
if (count == -1) {
if (errno == EAGAIN || errno == EINTR) {
// @see file.c for an explanation; basically; we ignore these,
// and try again.
count = 0;
} else {
if(errno == EBADF) {
if (errno == EBADF) {
error = EBADF;
} else {
error = errno;
@ -86,35 +121,49 @@ inline static int pfile_write_unlocked(int fd, lsn_t off, const byte * dat,
}
}
bytes_written += count;
if(bytes_written != len) {
if (bytes_written != len) {
DEBUG("pwrite spinning\n");
}
}
return error;
}
static int pfile_read(stasis_handle_t * h, lsn_t off, byte * buf, lsn_t len) {
pfile_impl * impl = (pfile_impl*)(h->impl);
static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) {
pfile_impl *impl = (pfile_impl*)(h->impl);
int error = 0;
if(off < impl->start_pos) {
// reading impl->end_pos is probably atomic on most hardware, but
// this is safer.
pthread_mutex_lock(&impl->mut);
lsn_t end_pos = impl->end_pos;
pthread_mutex_unlock(&impl->mut);
// checking end_pos is adequate, (we assume this is the only handle
// touching the file)
if (off < impl->start_pos) {
error = EDOM;
} else if(off + len > impl->end_pos) {
} else if (off + len > end_pos) {
error = EDOM;
} else {
ssize_t bytes_read = 0;
while(bytes_read < len) {
while (bytes_read < len) {
ssize_t count = pread(impl->fd,
buf + bytes_read,
len - bytes_read,
off + bytes_read - impl->start_pos);
if(count == -1) {
if(errno == EAGAIN || errno == EINTR) {
if (count == -1) {
if (errno == EAGAIN || errno == EINTR) {
count = 0;
} else {
if(errno == EBADF) {
if (errno == EBADF) {
h->error = EBADF;
} else {
int err = errno;
// XXX Why is sys_errlist[] is unavailable here?
perror("pfile_read encountered an unknown error code.");
fprintf(stderr, "pread() returned -1; errno is %d\n",err);
abort(); // XXX other errors?
}
error = errno;
@ -122,24 +171,28 @@ static int pfile_read(stasis_handle_t * h, lsn_t off, byte * buf, lsn_t len) {
}
}
bytes_read += count;
if(bytes_read != len) {
if (bytes_read != len) {
DEBUG("pread spinning\n");
}
}
assert(bytes_read == len);
}
return error;
}
static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat, lsn_t len) {
static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat,
lsn_t len) {
pfile_impl *impl = (pfile_impl*)(h->impl);
int error = 0;
lsn_t phys_off;
if(impl->start_pos > off) {
if (impl->start_pos > off) {
error = EDOM;
} else {
pthread_mutex_lock(&impl->mut);
if(impl->end_pos < off+len) {
if (impl->end_pos < off+len) {
// update end_pos now; the caller is not allowed to look at this
// part of the file until we return, so if they notice that the
// file hasn't been extended yet, it's a bug on their end.
impl->end_pos = off+len;
}
pthread_mutex_unlock(&impl->mut);
@ -148,47 +201,49 @@ static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat, lsn_t len
}
return error;
}
static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat, lsn_t len) {
static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat,
lsn_t len) {
pfile_impl *impl = (pfile_impl*)(h->impl);
pthread_mutex_lock(&impl->mut);
*off = impl->end_pos;
impl->end_pos += len;
pthread_mutex_unlock(&impl->mut);
lsn_t phys_off = *off - impl->start_pos;
int error = pfile_write_unlocked(impl->fd, phys_off, dat,len);
return error;
return pfile_write_unlocked(impl->fd, phys_off, dat,len);
}
static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t * h,
static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h,
lsn_t off, lsn_t len) {
stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t));
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
if(!ret) {
if (!ret) {
h->error = ENOMEM;
return NULL;
}
pfile_impl * impl = (pfile_impl*)h->impl;
pfile_impl *impl = (pfile_impl*)h->impl;
int error = 0;
if(impl->start_pos > off) {
if (impl->start_pos > off) {
error = EDOM;
}
pthread_mutex_lock(&impl->mut);
if(off + len > impl->end_pos) {
// @todo Come up with a reasonable way to avoid sparse files.
if (off + len > impl->end_pos) {
impl->end_pos = off+len;
}
pthread_mutex_unlock(&impl->mut);
byte * buf;
if(!error) {
byte *buf;
if (!error) {
buf = malloc(len);
if(!buf) { error = ENOMEM; }
if (!buf) { error = ENOMEM; }
}
if(error) {
if (error) {
ret->h = h;
ret->off = 0;
ret->buf = 0;
@ -205,14 +260,14 @@ static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t * h,
}
return ret;
}
static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t * h,
static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t *h,
lsn_t len) {
// Allocate the handle
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
if(!ret) { return NULL; }
stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t));
if (!ret) { return NULL; }
pfile_impl * impl = (pfile_impl*)h->impl;
pfile_impl *impl = (pfile_impl*)h->impl;
// Obtain an appropriate offset
pthread_mutex_lock(&(impl->mut));
@ -222,13 +277,13 @@ static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t * h,
pthread_mutex_unlock(&(impl->mut));
// Allocate the buffer
byte * buf = malloc(len);
byte *buf = malloc(len);
int error = 0;
if(!buf) {
if (!buf) {
error = ENOMEM;
}
if(error) {
if (error) {
ret->h = h;
ret->off = 0;
ret->buf = 0;
@ -244,50 +299,52 @@ static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t * h,
ret->error = 0;
}
return ret;
}
static int pfile_release_write_buffer(stasis_write_buffer_t * w) {
pfile_impl * impl = (pfile_impl*)(w->h->impl);
static int pfile_release_write_buffer(stasis_write_buffer_t *w) {
pfile_impl *impl = (pfile_impl*)(w->h->impl);
pthread_mutex_lock(&(impl->mut));
int error = 0;
if(impl->end_pos < w->off + w->len ||
if (impl->end_pos < w->off + w->len ||
impl->start_pos > w->off) {
error = EDOM;
}
pthread_mutex_unlock(&(impl->mut));
if(!error) {
error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf, w->len);
if (!error) {
error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf,
w->len);
}
if (w->buf) {
free(w->buf);
}
free(w->buf);
free(w);
return error;
}
static stasis_read_buffer_t * pfile_read_buffer(stasis_handle_t * h,
static stasis_read_buffer_t *pfile_read_buffer(stasis_handle_t *h,
lsn_t off, lsn_t len) {
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
if(!ret) { return NULL; }
stasis_read_buffer_t *ret = malloc(sizeof(stasis_read_buffer_t));
if (!ret) { return NULL; }
byte * buf = malloc(len);
byte *buf = malloc(len);
int error = 0;
if(!buf) { error = ENOMEM; }
if (!buf) { error = ENOMEM; }
if(!error) {
if (!error) {
error = pfile_read(h, off, buf, len);
}
if(error) {
if (error) {
ret->h = h;
ret->buf = 0;
ret->off = 0;
ret->len = 0;
ret->impl = 0;
ret->error = error;
if(buf) { free(buf); }
if (buf) { free(buf); }
} else {
ret->h = h;
ret->buf = buf;
@ -298,19 +355,25 @@ static stasis_read_buffer_t * pfile_read_buffer(stasis_handle_t * h,
}
return ret;
}
static int pfile_release_read_buffer(stasis_read_buffer_t * r) {
free((void*)r->buf);
static int pfile_release_read_buffer(stasis_read_buffer_t *r) {
if (r->buf) {
free((void*)r->buf);
}
free(r);
return 0;
}
static int pfile_truncate_start(stasis_handle_t * h, lsn_t new_start) {
static int pfile_truncate_start(stasis_handle_t *h, lsn_t new_start) {
static int truncate_warned = 0;
if(!truncate_warned) {
printf("\nWarning: pfile doesn't support truncation; ignoring truncation request\n");
if (!truncate_warned) {
printf("\nWarning: pfile doesn't support truncation; "
"ignoring truncation request\n");
truncate_warned = 1;
}
return 0;
}
struct stasis_handle_t pfile_func = {
.num_copies = pfile_num_copies,
.num_copies_buffer = pfile_num_copies_buffer,
@ -328,28 +391,34 @@ struct stasis_handle_t pfile_func = {
.truncate_start = pfile_truncate_start,
.error = 0
};
stasis_handle_t * stasis_handle(open_pfile)(lsn_t start_offset, char * filename, int flags, int mode) {
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
if(!ret) { return NULL; }
stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset,
char *filename,
int flags, int mode) {
stasis_handle_t *ret = malloc(sizeof(stasis_handle_t));
if (!ret) { return NULL; }
*ret = pfile_func;
pfile_impl * impl = malloc(sizeof(pfile_impl));
pfile_impl *impl = malloc(sizeof(pfile_impl));
if (!impl) { free(ret); return NULL; }
ret->impl = impl;
pthread_mutex_init(&(impl->mut), 0);
impl->fd = open(filename, flags, mode);
assert(sizeof(off_t) >= (64/8));
if(impl->fd == -1) {
if (impl->fd == -1) {
ret->error = errno;
}
impl->start_pos = start_offset;
off_t file_len = lseek(impl->fd,0,SEEK_END);
if(file_len == (off_t)-1) {
if (file_len == (off_t)-1) {
ret->error = errno;
}
impl->end_pos = file_len + start_offset;
DEBUG("file len = %lld, start_off = %lld, end = %lld\n", file_len, start_offset, impl->end_pos);
DEBUG("file len = %lld, start_off = %lld, end = %lld\n",
file_len, start_offset, impl->end_pos);
impl->filename = strdup(filename);
impl->file_flags = flags;

View file

@ -450,8 +450,10 @@ recordid TlsmAppendPage(int xid, recordid tree,
if(ret.size == INVALID_SLOT) {
if(lastLeaf->id != p->id) {
assert(s->lastLeaf != tree.page);
unlock(lastLeaf->rwlatch);
releasePage(lastLeaf); // don't need that page anymore...
lastLeaf = 0;
}
// traverse down the root of the tree.
@ -537,6 +539,7 @@ recordid TlsmAppendPage(int xid, recordid tree,
writeNodeRecord(xid, lastLeaf, ret.slot, key, keySize, val_page);
if(lastLeaf->id != p->id) {
assert(s->lastLeaf != tree.page);
unlock(lastLeaf->rwlatch);
releasePage(lastLeaf);
}
@ -638,6 +641,37 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) {
}
pageid_t TlsmLastPage(int xid, recordid tree) {
Page * root = loadPage(xid, tree.page);
readlock(root->rwlatch,0);
lsmTreeState *state = root->impl;
int keySize = getKeySize(xid,root);
if(state->lastLeaf == -1) {
const lsmTreeNodeRecord *nr = readNodeRecord(xid,root,DEPTH,
keySize);
int depth = nr->ptr;
state->lastLeaf = findLastLeaf(xid,root,depth);
}
pageid_t ret = state->lastLeaf;
unlock(root->rwlatch);
// ret points to the last internal node at this point.
releasePage(root);
Page * p = loadPage(xid, ret);
readlock(p->rwlatch,0);
if(*recordcount_ptr(p) == 2) {
ret = -1;
} else {
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,(*recordcount_ptr(p))-1,keySize);
ret = nr->ptr;
}
unlock(p->rwlatch);
releasePage(p);
return ret;
}
/**
The buffer manager calls this when the lsmTree's root page is
loaded. This function allocates some storage for cached values
@ -683,6 +717,9 @@ lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) {
releasePage(p);
p = loadPage(xid,leafid);
readlock(p->rwlatch,0);
assert(depth != 0);
} else {
assert(depth == 0);
}
lsmIteratorImpl *impl = malloc(sizeof(lsmIteratorImpl));
impl->p = p;
@ -708,6 +745,24 @@ lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) {
} */
return it;
}
lladdIterator_t *lsmTreeIterator_copy(int xid, lladdIterator_t* i) {
lsmIteratorImpl *it = i->impl;
lsmIteratorImpl *mine = malloc(sizeof(lsmIteratorImpl));
if(it->p) {
mine->p = loadPage(xid, it->p->id);
readlock(mine->p->rwlatch,0);
} else {
mine->p = 0;
}
memcpy(&mine->current, &it->current,sizeof(recordid));
mine->t = it->t;
mine->justOnePage = it->justOnePage;
lladdIterator_t * ret = malloc(sizeof(lladdIterator_t));
ret->type = -1; // XXX LSM_TREE_ITERATOR
ret->impl = mine;
return ret;
}
void lsmTreeIterator_close(int xid, lladdIterator_t *it) {
lsmIteratorImpl *impl = it->impl;
if(impl->p) {
@ -738,7 +793,7 @@ int lsmTreeIterator_next(int xid, lladdIterator_t *it) {
impl->current.size = keySize;
} else {
impl->p = 0;
impl->current.size = -1;
impl->current.size = INVALID_SLOT;
}
} else {
assert(impl->current.size == keySize + sizeof(lsmTreeNodeRecord));

View file

@ -82,10 +82,10 @@ terms specified in this license.
#ifndef MAX_BUFFER_SIZE
//#define MAX_BUFFER_SIZE 100003
/*#define MAX_BUFFER_SIZE 20029 */
#define MAX_BUFFER_SIZE 20029
//#define MAX_BUFFER_SIZE 10007
//#define MAX_BUFFER_SIZE 5003
#define MAX_BUFFER_SIZE 2003
//#define MAX_BUFFER_SIZE 2003
//#define MAX_BUFFER_SIZE 4006
/* #define MAX_BUFFER_SIZE 71 */
/*#define MAX_BUFFER_SIZE 7 */

View file

@ -63,7 +63,10 @@ void TlsmSetPageAllocator(pageid_t (*allocer)(int xid, void * ignored),
*/
pageid_t TlsmFindPage(int xid, recordid tree,
const byte *key);
/**
@todo TlsmFirstPage for symmetry?
*/
pageid_t TlsmLastPage(int xid, recordid tree);
/// --------------- Iterator implementation
typedef struct lsmTreeNodeRecord {
@ -93,7 +96,7 @@ lladdIterator_t * lsmTreeIterator_open(int xid, recordid tree);
*/
void lsmTreeIterator_close(int xid, lladdIterator_t * it);
int lsmTreeIterator_next (int xid, lladdIterator_t * it);
lladdIterator_t *lsmTreeIterator_copy(int xid, lladdIterator_t* i);
static inline int lsmTreeIterator_key (int xid, lladdIterator_t *it,
byte **key) {
lsmIteratorImpl * impl = (lsmIteratorImpl*)it->impl;