diff --git a/benchmarks/butterfly.c b/benchmarks/butterfly.c new file mode 100644 index 0000000..0331f51 --- /dev/null +++ b/benchmarks/butterfly.c @@ -0,0 +1,55 @@ +/* + * butterfly.c + * + * Created on: Sep 9, 2011 + * Author: sears + */ +#include +#include +#include +#include +#include +#include + +int main(int argc, char * argv[]) { + if(argc != 4) { + printf("usage: %s filename file_size sector_size\n", argv[0]); fflush(stdout); abort(); + } + const char * filename = argv[1]; + uint64_t file_size = atoll(argv[2]); + int sector_size = atoi(argv[3]); + + int fd = -1; + fd = open(filename, O_WRONLY|O_DSYNC); //|O_DIRECT); + + struct timeval start, stop; + void * buf; + posix_memalign(&buf, sector_size, sector_size); + memset(buf, 0, sector_size); + + + gettimeofday(&stop,0); + for(uint64_t i = 0; i < file_size/sector_size; i++) { + int err = pwrite(fd, buf, sector_size, 0); + if(err == -1) { + perror("Couldn't write"); + abort(); + } + assert(err && err == sector_size); + gettimeofday(&start, 0); + double reset_elapsed = stasis_timeval_to_double(stasis_subtract_timeval(start,stop)); + err = pwrite(fd, buf, sector_size, i * sector_size); + if(err == -1) { + perror("Couldn't write"); + abort(); + } + assert(err && err == sector_size); + + gettimeofday(&stop, 0); + double elapsed = stasis_timeval_to_double(stasis_subtract_timeval(stop,start)); + printf("%lld\t%0.5f\t%0.5f\n", (long long) i, elapsed, reset_elapsed); + } + + close(fd); +} + diff --git a/benchmarks/stride.c b/benchmarks/stride.c new file mode 100644 index 0000000..3c1f8a7 --- /dev/null +++ b/benchmarks/stride.c @@ -0,0 +1,48 @@ +/** + * stride.c + * + * Created on: Sep 1, 2011 + * Author: sears + */ +#include +#include +#include +#include +#include +#include + +int main(int argc, char * argv[]) { + if(argc != 6) { + printf("usage: %s filename num_workers num_ops write_size stride", argv[0]); fflush(stdout); abort(); + } + int NUM_WORKERS = atoi(argv[2]); + uint64_t ops = atoll(argv[3]); + int write_size = atoi(argv[4]); + uint64_t stride = atoll(argv[5]); + int fd = -1; + fd = open(argv[1], O_WRONLY|O_DSYNC); //|O_DIRECT); + + struct timeval start, stop; + void * buf; + posix_memalign(&buf, 512, write_size); + memset(buf, 0, write_size); + + gettimeofday(&start, 0); + + for(uint64_t i = 0; i < ops; i++) { + int err = pwrite(fd, buf, write_size, stride * i * write_size); + if(err == -1) { + perror("Couldn't write"); + abort(); + } + assert(err && err == write_size); + } + + gettimeofday(&stop, 0); + + double elapsed = stasis_timeval_to_double(stasis_subtract_timeval(stop,start)); + double ops_per_sec = ((double)ops) / elapsed; + printf("%lld ops in %f seconds = %f ops/second, %f speedup, %f mb/sec\n", (long long)ops, elapsed, ops_per_sec, ops_per_sec/ /*166.666*/(/*7200.0*/5400.0/60.0), ((double)ops*write_size)/(1024.0*1024.0*elapsed)); + + close(fd); +} diff --git a/benchmarks/stride.pl b/benchmarks/stride.pl new file mode 100755 index 0000000..afe5418 --- /dev/null +++ b/benchmarks/stride.pl @@ -0,0 +1,17 @@ +#!/usr/bin/perl -w + +use strict; + +my @strides; +my $x = 1; +for(my $i = 0; $i < 400; $i++) { + $strides[$i] = $i; + $x *= 2; +} + +$|=1; + +foreach my $s (@strides) { + print "$s\t"; + system qq(./stride foo.out 3 500 4096 $s); +} diff --git a/benchmarks/turbine.c b/benchmarks/turbine.c new file mode 100644 index 0000000..e72eee5 --- /dev/null +++ b/benchmarks/turbine.c @@ -0,0 +1,121 @@ +/* + * turbine.c + * + * Created on: Aug 31, 2011 + * Author: sears + */ +#include +#include +#include +#include +#include + +int many_handles = 1; + +typedef struct worker { + int fd; + const char * filename; + uint64_t stride; + uint64_t offset; + uint64_t last; + uint64_t write_size; + pthread_cond_t cond; + pthread_mutex_t mutex; + struct worker * next; +} worker; +/** + * Straightforward implementation: + * - each worker thread has a mutex and a condition variable. + * - we grab our successor's mutex, update it's incoming value, release the mutex, and signal them. + */ +void * func(void * argp) { + worker * arg = argp; + void * buf; + posix_memalign(&buf, 512, arg->write_size); + memset(buf, 0, arg->write_size); +// void * buf = calloc(arg->write_size, 1); + pthread_mutex_lock(&arg->mutex); + uint64_t offset = 0; + if(many_handles) { + arg->fd = open(arg->filename, O_WRONLY|O_DSYNC); //|O_DIRECT); + if(arg->fd == -1) { + perror("Couldn't open file"); + abort(); + } + } + while(offset < arg->last) { + while(arg->offset == 0) { + pthread_cond_wait(&arg->cond, &arg->mutex); + } + offset = arg->offset; + arg->offset = 0; +// if(!(offset % 10000)) printf("%lld < %lld\n", (long long)arg->offset, (long long) arg->last); + pthread_mutex_lock(&arg->next->mutex); + arg->next->offset = offset + 1; + pthread_mutex_unlock(&arg->next->mutex); + pthread_cond_signal(&arg->next->cond); +// struct timespec slp = stasis_double_to_timespec(0.001); + int err = pwrite(arg->fd, buf, arg->write_size, arg->stride * offset * arg->write_size); + if(err == -1) { + perror("Couldn't write"); + abort(); + } + assert(err && err == arg->write_size); + } + pthread_mutex_unlock(&arg->mutex); + if(many_handles) { + close(arg->fd); + } + return 0; +} + +int main(int argc, char * argv[]) { + if(argc != 6) { + printf("usage: %s filename num_workers num_ops write_size stride", argv[0]); fflush(stdout); abort(); + } + int NUM_WORKERS = atoi(argv[2]); + uint64_t ops = atoll(argv[3]); + int write_size = atoi(argv[4]); + worker * workers = malloc(sizeof(worker) * NUM_WORKERS); + pthread_t* threads = malloc(sizeof(pthread_t) * NUM_WORKERS); + uint64_t stride = atoll(argv[5]); + int fd = -1; + if(!many_handles) { + fd = open(argv[1], O_WRONLY|O_DSYNC); //|O_DIRECT); + } + + for(int i = 0; i < NUM_WORKERS; i++) { + workers[i].fd = fd; + workers[i].filename = argv[1]; + workers[i].offset = 0; + workers[i].last = ops; + workers[i].write_size = write_size; + workers[i].stride = stride; + pthread_cond_init(&workers[i].cond,0); + pthread_mutex_init(&workers[i].mutex,0); + workers[i].next = &workers[(i+1) % NUM_WORKERS]; + } + for(int i = 0; i < NUM_WORKERS; i++) { + pthread_create(&threads[i], 0, func, &workers[i]); + } + + struct timeval start, stop; + gettimeofday(&start, 0); + workers[0].offset = 1; + pthread_cond_signal(&workers[0].cond); + + for(int i = 0; i < NUM_WORKERS; i++) { + pthread_join(threads[i], 0); + } + gettimeofday(&stop, 0); + + double elapsed = stasis_timeval_to_double(stasis_subtract_timeval(stop,start)); + double ops_per_sec = ((double)ops) / elapsed; + printf("%lld ops in %f seconds = %f ops/second, %f speedup, %f mb/sec\n", (long long)ops, elapsed, ops_per_sec, ops_per_sec/ /*166.666*/(/*7200.0*/5400.0/60.0), ((double)ops*write_size)/(1024.0*1024.0*elapsed)); + if(!many_handles) { +// fdatasync(fd); + close(fd); + } + + return 0; +} diff --git a/benchmarks/turbine.pl b/benchmarks/turbine.pl new file mode 100755 index 0000000..3d45e43 --- /dev/null +++ b/benchmarks/turbine.pl @@ -0,0 +1,17 @@ +#!/usr/bin/perl -w + +use strict; + +my @strides; +my $x = 1; +for(my $i = 0; $i < 100; $i++) { + $strides[$i] = $i; + $x *= 2; +} + +$|=1; + +foreach my $s (@strides) { + print "$s\t"; + system qq(./turbine foo.out 3 1000 4096 $s); +}