added dup method to io handles

This commit is contained in:
Sears Russell 2010-04-08 23:50:01 +00:00
parent ff2f2e9e1e
commit 7c5eb2a7e9
7 changed files with 101 additions and 24 deletions

View file

@ -35,6 +35,15 @@ static int debug_close(stasis_handle_t * h) {
free(h); free(h);
return ret; return ret;
} }
static stasis_handle_t * debug_dup(stasis_handle_t * h) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call dup(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout);
stasis_handle_t * ret = hh->dup(hh);
printf("tid=%9ld retn dup(%lx) = %lx\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, (unsigned long)ret); fflush(stdout);
free(h->impl);
free(h);
return ret;
}
static lsn_t debug_start_position(stasis_handle_t *h) { static lsn_t debug_start_position(stasis_handle_t *h) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h; stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call start_position(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout); printf("tid=%9ld call start_position(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout);
@ -170,6 +179,7 @@ struct stasis_handle_t debug_func = {
.num_copies = debug_num_copies, .num_copies = debug_num_copies,
.num_copies_buffer = debug_num_copies_buffer, .num_copies_buffer = debug_num_copies_buffer,
.close = debug_close, .close = debug_close,
.dup = debug_dup,
.start_position = debug_start_position, .start_position = debug_start_position,
.end_position = debug_end_position, .end_position = debug_end_position,
.write = debug_write, .write = debug_write,

View file

@ -46,6 +46,11 @@ static int file_close(stasis_handle_t * h) {
if(!ret) return 0; if(!ret) return 0;
else return errno; else return errno;
} }
static stasis_handle_t* file_dup(stasis_handle_t * h) {
file_impl * impl = h->impl;
return stasis_handle_open_file(impl->start_pos, impl->filename, impl->file_flags, impl->file_mode);
}
static lsn_t file_start_position(stasis_handle_t *h) { static lsn_t file_start_position(stasis_handle_t *h) {
file_impl * impl = (file_impl*)h->impl; file_impl * impl = (file_impl*)h->impl;
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&(impl->mut));
@ -548,6 +553,7 @@ struct stasis_handle_t file_func = {
.num_copies = file_num_copies, .num_copies = file_num_copies,
.num_copies_buffer = file_num_copies_buffer, .num_copies_buffer = file_num_copies_buffer,
.close = file_close, .close = file_close,
.dup = file_dup,
.start_position = file_start_position, .start_position = file_start_position,
.end_position = file_end_position, .end_position = file_end_position,
.write = file_write, .write = file_write,

View file

@ -6,18 +6,28 @@ typedef struct mem_impl {
lsn_t start_pos; lsn_t start_pos;
lsn_t end_pos; lsn_t end_pos;
byte * buf; byte * buf;
int refcount;
} mem_impl; } mem_impl;
static int mem_num_copies(stasis_handle_t * h) { return 1; } static int mem_num_copies(stasis_handle_t * h) { return 1; }
static int mem_num_copies_buffer(stasis_handle_t * h) { return 0; } static int mem_num_copies_buffer(stasis_handle_t * h) { return 0; }
static int mem_close(stasis_handle_t * h) { static int mem_close(stasis_handle_t * h) {
free(((mem_impl*)h->impl)->buf); mem_impl *impl = h->impl;
pthread_mutex_destroy(&(((mem_impl*)h->impl)->mut)); (impl->refcount)--;
if(impl->refcount) { return 0; }
free(impl->buf);
pthread_mutex_destroy(&(impl->mut));
free(h->impl); free(h->impl);
free(h); free(h);
return 0; return 0;
} }
static stasis_handle_t * mem_dup(stasis_handle_t *h) {
mem_impl *impl = h->impl;
(impl->refcount)++;
return h;
}
static lsn_t mem_start_position(stasis_handle_t *h) { static lsn_t mem_start_position(stasis_handle_t *h) {
lsn_t ret; lsn_t ret;
mem_impl* impl = (mem_impl*)(h->impl); mem_impl* impl = (mem_impl*)(h->impl);
@ -243,6 +253,7 @@ struct stasis_handle_t mem_func = {
.num_copies = mem_num_copies, .num_copies = mem_num_copies,
.num_copies_buffer = mem_num_copies_buffer, .num_copies_buffer = mem_num_copies_buffer,
.close = mem_close, .close = mem_close,
.dup = mem_dup,
.start_position = mem_start_position, .start_position = mem_start_position,
.end_position = mem_end_position, .end_position = mem_end_position,
.write = mem_write, .write = mem_write,
@ -270,6 +281,7 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset) {
impl->start_pos = start_offset; impl->start_pos = start_offset;
impl->end_pos = start_offset; impl->end_pos = start_offset;
impl->buf = malloc(0); impl->buf = malloc(0);
impl->refcount = 1;
return ret; return ret;
} }

View file

@ -179,6 +179,7 @@ typedef struct nbw_impl {
pthread_cond_t force_completed_cond; pthread_cond_t force_completed_cond;
pthread_cond_t pending_writes_cond; pthread_cond_t pending_writes_cond;
int still_open; int still_open;
int refcount;
} nbw_impl; } nbw_impl;
static inline void freeFastHandle(nbw_impl * impl, const tree_node * n); static inline void freeFastHandle(nbw_impl * impl, const tree_node * n);
@ -332,6 +333,12 @@ static int nbw_close(stasis_handle_t * h) {
nbw_impl * impl = h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
(impl->refcount)--;
if(impl->refcount) {
pthread_mutex_unlock(&impl->mut);
return 0;
}
impl->still_open = 0; impl->still_open = 0;
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
pthread_cond_broadcast(&impl->pending_writes_cond); pthread_cond_broadcast(&impl->pending_writes_cond);
@ -374,6 +381,11 @@ static int nbw_close(stasis_handle_t * h) {
free(h); free(h);
return ret; return ret;
} }
static stasis_handle_t * nbw_dup(stasis_handle_t *h) {
nbw_impl * impl = h->impl;
(impl->refcount)++;
return h;
}
static lsn_t nbw_start_position(stasis_handle_t *h) { static lsn_t nbw_start_position(stasis_handle_t *h) {
nbw_impl * impl = h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
@ -614,6 +626,7 @@ struct stasis_handle_t nbw_func = {
.num_copies = nbw_num_copies, .num_copies = nbw_num_copies,
.num_copies_buffer = nbw_num_copies_buffer, .num_copies_buffer = nbw_num_copies_buffer,
.close = nbw_close, .close = nbw_close,
.dup = nbw_dup,
.start_position = nbw_start_position, .start_position = nbw_start_position,
.end_position = nbw_end_position, .end_position = nbw_end_position,
.write = nbw_write, .write = nbw_write,
@ -830,6 +843,7 @@ stasis_handle_t * stasis_handle(open_non_blocking)
pthread_cond_init(&impl->force_completed_cond, 0); pthread_cond_init(&impl->force_completed_cond, 0);
impl->still_open = 1; impl->still_open = 1;
impl->refcount++;
stasis_handle_t *h = malloc(sizeof(stasis_handle_t)); stasis_handle_t *h = malloc(sizeof(stasis_handle_t));
*h = nbw_func; *h = nbw_func;

View file

@ -74,6 +74,11 @@ static int pfile_close(stasis_handle_t *h) {
else return errno; else return errno;
} }
static stasis_handle_t * pfile_dup(stasis_handle_t *h) {
pfile_impl *impl = h->impl;
return stasis_handle_open_pfile(impl->start_pos, impl->filename, impl->file_flags, impl->file_mode);
}
static lsn_t pfile_start_position(stasis_handle_t *h) { static lsn_t pfile_start_position(stasis_handle_t *h) {
pfile_impl *impl = (pfile_impl*)h->impl; pfile_impl *impl = (pfile_impl*)h->impl;
return impl->start_pos; return impl->start_pos;
@ -415,6 +420,7 @@ struct stasis_handle_t pfile_func = {
.num_copies = pfile_num_copies, .num_copies = pfile_num_copies,
.num_copies_buffer = pfile_num_copies_buffer, .num_copies_buffer = pfile_num_copies_buffer,
.close = pfile_close, .close = pfile_close,
.dup = pfile_dup,
.start_position = pfile_start_position, .start_position = pfile_start_position,
.end_position = pfile_end_position, .end_position = pfile_end_position,
.write = pfile_write, .write = pfile_write,

View file

@ -104,6 +104,15 @@ typedef struct stasis_handle_t {
/** Close this handle, and release any associated resources. */ /** Close this handle, and release any associated resources. */
int (*close)(struct stasis_handle_t *); int (*close)(struct stasis_handle_t *);
/** Duplicate this handle. This is useful for performance hinting;
* sending sequential disk operations to different handles than random
* operations allows the kernel's prefetch algorithm to kick in.
*
* XXX calling dup on a handle, then calling truncate or append on the resulting handles has undefined semantics.
*
* @return a stasis_handle_t that should eventually have close() called on it.
*/
struct stasis_handle_t * (*dup)(struct stasis_handle_t *h);
/** The offset of the handle's first byte */ /** The offset of the handle's first byte */
lsn_t (*start_position)(struct stasis_handle_t * h); lsn_t (*start_position)(struct stasis_handle_t * h);

View file

@ -129,10 +129,15 @@ void load_handle(thread_arg* t) {
stasis_handle_t * h = t->h; stasis_handle_t * h = t->h;
for(int i = 0; i < t->count; i++) { if(handle_truncate_is_supported) {
offsets[i] = -1; for(int i = 0; i < t->count; i++) {
offsets[i] = -1;
}
} else {
for(int i = 0; i < t->count; i++) {
offsets[i] = i * sizeof(int);
}
} }
for(int i = 0; i < OPS_PER_THREAD; i++) { for(int i = 0; i < OPS_PER_THREAD; i++) {
int val = myrandom(t->count); int val = myrandom(t->count);
@ -178,21 +183,25 @@ void load_handle(thread_arg* t) {
} }
} break; } break;
case 2: { // append case 2: { // append
lsn_t oldend = h->end_position(h); if(handle_truncate_is_supported) {
int ret = h->append(h, &(offsets[val]), (const byte*)&(t->values[val]), sizeof(int)); lsn_t oldend = h->end_position(h);
assert(!ret || oldend < h->start_position(h)); int ret = h->append(h, &(offsets[val]), (const byte*)&(t->values[val]), sizeof(int));
assert(!ret || oldend < h->start_position(h));
}
} break; } break;
case 3: { // append_buffer case 3: { // append_buffer
lsn_t oldend = h->end_position(h); if(handle_truncate_is_supported) {
stasis_write_buffer_t * w = h->append_buffer(h, sizeof(int)); lsn_t oldend = h->end_position(h);
if(!w->error) { stasis_write_buffer_t * w = h->append_buffer(h, sizeof(int));
*((int*)w->buf) = t->values[val]; if(!w->error) {
assert(w->len == sizeof(int)); *((int*)w->buf) = t->values[val];
offsets[val] = w->off; assert(w->len == sizeof(int));
} else { offsets[val] = w->off;
assert(oldend < h->start_position(h)); } else {
} assert(oldend < h->start_position(h));
w->h->release_write_buffer(w); }
w->h->release_write_buffer(w);
}
} break; } break;
default: { default: {
abort(); abort();
@ -202,7 +211,7 @@ void load_handle(thread_arg* t) {
int check; int check;
int ret = h->read(h, offsets[val], (byte*)&check, sizeof(int)); int ret = h->read(h, offsets[val], (byte*)&check, sizeof(int));
if(!ret) { if(!ret) {
assert(check == t->values[val]); assert(check == t->values[val]);
} }
@ -219,7 +228,8 @@ void load_handle(thread_arg* t) {
assert(j == t->values[val]); assert(j == t->values[val]);
} else { } else {
assert(ret == EDOM); assert(ret == EDOM);
assert(h->start_position(h) > offsets[val]); if(handle_truncate_is_supported)
assert(h->start_position(h) > offsets[val]);
} }
} break; } break;
case 1: { // read_buffer case 1: { // read_buffer
@ -231,7 +241,8 @@ void load_handle(thread_arg* t) {
} else { } else {
assert(r->error == EDOM); assert(r->error == EDOM);
r->h->release_read_buffer(r); r->h->release_read_buffer(r);
assert(h->start_position(h) > offsets[val]); if(handle_truncate_is_supported)
assert(h->start_position(h) > offsets[val]);
} }
} break; } break;
default: default:
@ -287,7 +298,7 @@ void handle_sequentialtest(stasis_handle_t * h) {
void handle_concurrencytest(stasis_handle_t * h) { void handle_concurrencytest(stasis_handle_t * h) {
int vc = myrandom(VALUE_COUNT) + 10; int vc = myrandom(VALUE_COUNT) + 10;
printf("Running concurrency test with %d values", vc); fflush(stdout); printf("Running concurrency test with %d values\n", vc); fflush(stdout);
int * values = malloc(vc * sizeof(int)); int * values = malloc(vc * sizeof(int));
@ -297,12 +308,17 @@ void handle_concurrencytest(stasis_handle_t * h) {
thread_arg * args = malloc(THREAD_COUNT * sizeof(thread_arg)); thread_arg * args = malloc(THREAD_COUNT * sizeof(thread_arg));
pthread_t * threads = malloc(THREAD_COUNT * sizeof(pthread_t)); pthread_t * threads = malloc(THREAD_COUNT * sizeof(pthread_t));
stasis_handle_t ** handles = malloc(THREAD_COUNT / 2 * sizeof(*handles));
int val_per_thread = vc / THREAD_COUNT; int val_per_thread = vc / THREAD_COUNT;
trunc_val = 0; trunc_val = 0;
for(int i = 0; i < THREAD_COUNT; i++) { for(int i = 0; i < THREAD_COUNT; i++) {
args[i].values = &(values[i * val_per_thread]); args[i].values = &(values[i * val_per_thread]);
args[i].count = val_per_thread; args[i].count = val_per_thread;
if(!(i % 2)) {
handles[i/2] = h->dup(h);
h = handles[i/2];
}
args[i].h = h; args[i].h = h;
pthread_create(&threads[i], 0, (void*(*)(void*))load_handle, &args[i]); pthread_create(&threads[i], 0, (void*(*)(void*))load_handle, &args[i]);
} }
@ -310,9 +326,13 @@ void handle_concurrencytest(stasis_handle_t * h) {
for(int i = 0; i < THREAD_COUNT; i++) { for(int i = 0; i < THREAD_COUNT; i++) {
pthread_join(threads[i], 0); pthread_join(threads[i], 0);
} }
for(int i = 0; i < THREAD_COUNT / 2; i++) {
handles[i]->close(handles[i]);
}
free(values); free(values);
free(args); free(args);
free(threads); free(threads);
free(handles);
} }
/** /**
@test @test
@ -353,7 +373,7 @@ START_TEST(io_fileTest) {
unlink("logfile.txt"); unlink("logfile.txt");
h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//handle_concurrencytest(h); // handle_concurrencytest(h); // fails by design
h->close(h); h->close(h);
unlink("logfile.txt"); unlink("logfile.txt");
@ -390,7 +410,7 @@ START_TEST(io_pfileTest) {
unlink("logfile.txt"); unlink("logfile.txt");
h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//handle_concurrencytest(h); handle_concurrencytest(h);
h->close(h); h->close(h);
unlink("logfile.txt"); unlink("logfile.txt");