From 83ad15b6d04e5622866937b505f62ae45068567a Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Fri, 20 Mar 2009 11:36:27 +0000 Subject: [PATCH] more benchmarks; transactions can now be multithreaded --- benchmarks/CMakeLists.txt | 1 + benchmarks/Makefile.am | 3 +- benchmarks/distributedLsnFree.c | 134 ++++++++++++++ benchmarks/lsn_bench_common.h | 265 +++++++++++++++++++++++++++ benchmarks/qos.c | 183 +----------------- benchmarks/writeBack.c | 101 +--------- src/stasis/logger/logger2.c | 18 +- src/stasis/logger/reorderingHandle.c | 15 +- src/stasis/operations/lsnFreeSet.c | 5 +- src/stasis/transactional2.c | 8 +- stasis/logger/logger2.h | 3 +- stasis/logger/reorderingHandle.h | 2 + stasis/operations/lsnFreeSet.h | 3 + test/stasis/check_logWriter.c | 8 +- 14 files changed, 455 insertions(+), 294 deletions(-) create mode 100644 benchmarks/distributedLsnFree.c create mode 100644 benchmarks/lsn_bench_common.h diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index c4e6b54..b48a782 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -26,6 +26,7 @@ CREATE_EXECUTABLE(zeroCopy) CREATE_EXECUTABLE(sequentialThroughput) CREATE_EXECUTABLE(qos) CREATE_EXECUTABLE(writeBack) +CREATE_EXECUTABLE(distributedLsnFree) IF(CHECK_LIBRARY) ADD_TEST(rose rose) ENDIF(CHECK_LIBRARY) diff --git a/benchmarks/Makefile.am b/benchmarks/Makefile.am index 7a39c72..709a990 100644 --- a/benchmarks/Makefile.am +++ b/benchmarks/Makefile.am @@ -10,7 +10,8 @@ noinst_PROGRAMS=lhtableThreaded naiveHash \ naiveMultiThreaded rawSet arrayListSet \ linearHashNTA linkedListNTA pageOrientedListNTA \ linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests \ - transitiveClosure zeroCopy sequentialThroughput rose roseTable qos writeBack + transitiveClosure zeroCopy sequentialThroughput rose roseTable qos writeBack \ + distributedLsnFree endif AM_CFLAGS=${GLOBAL_CFLAGS} AM_CXXFLAGS=${GLOBAL_CXXFLAGS} -I ${top_builddir}/src diff --git a/benchmarks/distributedLsnFree.c b/benchmarks/distributedLsnFree.c new file mode 100644 index 0000000..b34898e --- /dev/null +++ b/benchmarks/distributedLsnFree.c @@ -0,0 +1,134 @@ +#include "lsn_bench_common.h" + +int main(int argc, char ** argv) { + unlink("storefile.txt"); + unlink("logfile.txt"); + char * mode = argv[1]; + long long num_rids = atoll(argv[2]); + long long num_xacts = atoll(argv[3]); + long long writes_per_xact = atoll(argv[4]); + recordid * rids; + recordid * fast; + cached_addr * cache; + int writeback = !(strcmp(mode, "writeback")&&strcmp(mode,"writeback-net")&&strcmp(mode,"writeback-pipeline")); + // stasis_truncation_automatic = 0; + + /*if(!(strcmp(mode, "writeback-pipeline"))) { + // pipelining likes big queues + // stasis_log_write_buffer_size = 50 * 1024 * 1024; + } else { + }*/ + stasis_log_write_buffer_size = 50 * 1024 * 1024; + Tinit(); + + alloc_rids(num_rids,&rids,&fast); + int net = 0; + + if(writeback) { + build_cache(rids,&cache,num_rids); + } + // XXX move above build_cache! + if(!(strcmp(mode, "normal-net")&&strcmp(mode,"writeback-net"))) { + net = 1; + emulate_remote_pages(); + emulate_remote_log(); + } + int num_workers = 100; + if(!strcmp(mode, "writeback-pipeline")) { + emulate_remote_pages(); + net = 1; + num_workers = 10; + stasis_log_reordering_usleep_after_flush = net_latency * 1000; + } + + // stasis_log_reordering_handle_t* handles[num_workers]; + + + lsn_t last_lsn = 0; + for(long long x = 0; x < num_xacts; x++) { + int xid = Tbegin(); + if(net && writeback) { + writeback_unit_of_work_arg a[num_workers]; + pthread_t workers[num_workers]; + for(int i =0 ; i < num_workers; i++) { + a[i].num_rids = num_rids; + a[i].rid_per_xact = writes_per_xact; + a[i].cache = cache; + a[i].done = 0; + a[i].xid = xid; + a[i].workerid = i; + a[i].iterationid = x; + a[i].divisor = num_workers; + pthread_mutex_init(&a[i].mut,0); + + pthread_create(&workers[i], 0, writeback_unit_of_work, &a[i]); + } + for(int i =0; i < num_workers; i++) { + pthread_mutex_lock(&a[i].mut); + a[i].done = 1; + pthread_mutex_unlock(&a[i].mut); + } + for(int i =0 ; i < num_workers; i++) { + pthread_join(workers[i],0); + } + } else { + /* if(writeback && net) { + for(int i = 0; i < num_workers; i++) { + handles[i] = stasis_log_reordering_handle_open( + &XactionTable[xid%MAX_TRANSACTIONS], + stasis_log_file, + (0.9*stasis_log_write_buffer_size)/num_workers, + //512*1024/ua->divisor, // 0.5 mb in log tail at once + 1000000/num_workers, // max num outstanding requests + (50 * 1024 * 1024)/num_workers // max backlog in bytes + ); + } + } */ + + for(long long j = 0; j < writes_per_xact; j++) { + // long long idx = ((x*writes_per_xact)+j)%num_rids; + long long idx = j % num_rids; + // if(!(j % 100)) { printf("."); fflush(stdout); } + if(!(strcmp(mode, "normal")&&strcmp(mode, "normal-net"))) { + TsetLsnFree(xid, rids[idx], &j); + } else { + assert(writeback); + int old = cache[idx].val; + cache[idx].val = j; + if(net) { + /* TsetReorderableWriteBack(xid, handles[j%num_workers], cache[idx].pid, + cache[idx].off, cache[idx].len,&j,&old); + */ + } else { + last_lsn = TsetWriteBack(xid, cache[idx].pid,cache[idx].off, + cache[idx].len,&j,&old); + assert(last_lsn); + } + } + } + /* if(writeback && net) { + for(int j = 0; j < num_workers; j++) { + stasis_log_reordering_handle_close(handles[j]); + } + } */ + } + if(net) { + last_lsn = XactionTable[xid%MAX_TRANSACTIONS].prevLSN; + } + Tcommit(xid); + } + // XXX hack; really, want to register upcall in buffermanager... + if(writeback) { + printf("starting writeback"); fflush(stdout); + assert(last_lsn); + for(long long i = 0; i < num_rids; i++) { + Page *p = loadPage(-1, rids[i].page); + writelock(p->rwlatch,0); + stasis_record_write(-1, p, last_lsn, rids[i], (byte*)&cache[i].val); + unlock(p->rwlatch); + releasePage(p); + releasePage(p); + } + } + Tdeinit(); +} diff --git a/benchmarks/lsn_bench_common.h b/benchmarks/lsn_bench_common.h new file mode 100644 index 0000000..32e3bce --- /dev/null +++ b/benchmarks/lsn_bench_common.h @@ -0,0 +1,265 @@ +#include +#include +#include + +void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) { + *slow = malloc(num_rids * sizeof(**slow)); + *fast = malloc((num_rids / 10) * sizeof(**fast)); + + int xid = Tbegin(); + + byte * old = malloc(PAGE_SIZE); + byte * new = malloc(PAGE_SIZE); + + for(long long i = 0; i < num_rids; ) { + pageid_t pid = TpageAlloc(xid); + Page * p = loadPage(xid, pid); + writelock(p->rwlatch,0); + memcpy(old, p->memAddr, PAGE_SIZE); + stasis_slotted_lsn_free_initialize_page(p); + while(i < num_rids && + ( + ((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size + == sizeof(int) + ) + ) { + stasis_record_alloc_done(xid, p, (*slow)[i]); + if(!(i%10)) { + (*fast)[i/10] = stasis_record_alloc_begin(xid, p, sizeof(int)); + if((*fast)[i/10].size!=sizeof(int)) { + break; // don't increment i + } + stasis_record_alloc_done(xid, p, (*fast)[i/10]); + } + assert((*slow)[i].size != -1); + i++; + } + memcpy(new, p->memAddr, PAGE_SIZE); + memcpy(p->memAddr, old, PAGE_SIZE); + unlock(p->rwlatch); + releasePage(p); + TpageSet(xid, pid, new); + } + free(old); + free(new); + + Tcommit(xid); +} + +typedef struct { + pageid_t pid; + pageoff_t off; + pageoff_t len; + int val; +} cached_addr; + +void build_cache(recordid * rids, cached_addr** cache, long long count) { + *cache = malloc (sizeof(**cache) * count); + lsn_t log_trunc = stasis_log_file->truncation_point(stasis_log_file); + for(long long i = 0; i < count; i++) { + (*cache)[i].pid = rids[i].page; + + Page * p = loadPage(-1, (*cache)[i].pid); + readlock(p->rwlatch,0); + byte * b = stasis_record_write_begin(-1, p, rids[i]); + (*cache)[i].off = b - p->memAddr; + stasis_record_write_done(-1, p, rids[i], b); + stasis_page_lsn_write(-1, p, log_trunc); + (*cache)[i].len = stasis_record_type_to_size(rids[i].size); + (*cache)[i].val = 0; + unlock(p->rwlatch); + // releasePage(p); + } +} +static int net_latency = 2; +static byte * (*origWrite)(int xid, Page *p, recordid rid); +byte * slowWrite(int xid, Page *p, recordid rid) { + usleep(net_latency * 1000); + return origWrite(xid,p,rid); +} + +static const byte * (*origRead)(int xid, Page *p, recordid rid); +const byte * slowRead(int xid, Page *p, recordid rid) { + usleep(net_latency * 1000); + return origRead(xid,p,rid); +} + +static int (*original_write_entry) (struct stasis_log_t* log, LogEntry * e); +int my_write_entry(struct stasis_log_t* log, LogEntry *e) { + usleep(net_latency * 1000); + return original_write_entry(log,e); +} + +void emulate_remote_log() { + original_write_entry = stasis_log_file->write_entry; + stasis_log_file->write_entry = my_write_entry; +} +void emulate_remote_pages() { + origWrite = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite; + origRead = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead; + // xxx a bit of cheating; don't pay latency for lsn write + // (could amortize w/ recordWrite) + stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite = slowWrite; + stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead = slowRead; +} + +/////////// Background workers for parallelizing enqueues to slow logs +typedef struct { + long long num_rids; + long long rid_per_xact; + recordid * rids; + int done; + pthread_mutex_t mut; +} bulk_worker_args; + +void* normal_worker(void * ap) { + bulk_worker_args * a = ap; + pthread_mutex_lock(&a->mut); + for(int i = 0; !a->done; i++) { + pthread_mutex_unlock(&a->mut); + + struct timeval tv; + gettimeofday(&tv, 0); + long long start = tv.tv_usec + tv.tv_sec * 1000000; + + int xid = Tbegin(); + for(int j = 0; j < a->rid_per_xact; j++) { + int val = i * a->rid_per_xact + j; + Tset(xid, a->rids[j%a->num_rids], &val); + } + Tcommit(xid); + + gettimeofday(&tv, 0); + long long stop = tv.tv_usec + tv.tv_sec * 1000000; + printf("low(ms),%lld\n", stop-start); + fflush(stdout); + + pthread_mutex_lock(&a->mut); + } + pthread_mutex_unlock(&a->mut); + return 0; + +} +typedef struct { + long long num_rids; + long long rid_per_xact; + cached_addr * cache; + int done; + int xid; + int workerid; + int iterationid; + int divisor; + pthread_mutex_t mut; +} writeback_unit_of_work_arg; +void * writeback_unit_of_work(void * ap) { + writeback_unit_of_work_arg * ua = ap; + + stasis_log_reordering_handle_t * rh + = stasis_log_reordering_handle_open( + &XactionTable[ua->xid%MAX_TRANSACTIONS], + stasis_log_file, + (0.9*stasis_log_write_buffer_size)/ua->divisor, + //512*1024/ua->divisor, // 0.5 mb in log tail at once + 1000000/ua->divisor, // max num outstanding requests + (50 * 1024 * 1024)/ua->divisor // max backlog in bytes + ); + /* +stasis_log_reordering_handle_open(&XactionTable[ua->xid%MAX_TRANSACTIONS], + stasis_log_file, + (stasis_log_write_buffer_size * 0.25)/ua->divisor, + //512*1024/ua->divisor, // 0.5 mb in log tail at once + 1000000/ua->divisor, // max num outstanding requests + (50 * 1024 * 1024)/ua->divisor // max backlog in bytes + );*/ + for(int j = 0; j < ua->rid_per_xact/ua->divisor; j++) { + long long idx = (ua->workerid+(ua->divisor*j)) % ua->num_rids; + int old = ua->cache[idx].val; + ua->cache[idx].val = (ua->workerid+(ua->divisor*j)); + TsetReorderableWriteBack(ua->xid, rh, ua->cache[idx].pid, + ua->cache[idx].off, ua->cache[idx].len,&ua->cache[idx].val,&old); + // TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val); + + } + stasis_log_reordering_handle_close(rh); + return 0; +} + + + +typedef struct { + int xid; + int n; // which worker am i? + int i; // which iteration is this? + int divisor; + bulk_worker_args * a; +} unit_of_work_arg; +void * bg_unit_of_work(void * ap) { + unit_of_work_arg * ua = ap; + bulk_worker_args * a = ua->a; + + stasis_log_reordering_handle_t * rh + = stasis_log_reordering_handle_open(&XactionTable[ua->xid%MAX_TRANSACTIONS], + stasis_log_file, + (stasis_log_write_buffer_size * 0.25)/ua->divisor, + //512*1024/ua->divisor, // 0.5 mb in log tail at once + 1000000/ua->divisor, // max num outstanding requests + (50 * 1024 * 1024)/ua->divisor // max backlog in bytes + ); + for(int j = 0; j < a->rid_per_xact/ua->divisor; j++) { + int val = ua->i * (a->rid_per_xact/ua->divisor) + j; + TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val); + } + stasis_log_reordering_handle_close(rh); + return 0; +} + +void* bg_worker(void * ap) { + bulk_worker_args * a = ap; + pthread_mutex_lock(&a->mut); + for(int i = 0; !a->done; i++) { + pthread_mutex_unlock(&a->mut); + + struct timeval tv; + gettimeofday(&tv, 0); + long long start = tv.tv_usec + tv.tv_sec * 1000000; + + int xid = Tbegin(); + if(stasis_log_file->write_entry == my_write_entry) { + // based on tweaking; also, normal-net is ~ 100x slower than nromal + int num_worker = 100; + pthread_t workers[num_worker]; + unit_of_work_arg args[num_worker]; + for(int w = 0; w < num_worker; w++) { + args[i].xid = xid; + args[i].n = w; + args[i].i = i; + args[i].divisor = num_worker; + args[i].a = a; + pthread_create(&workers[w], 0, bg_unit_of_work, &(args[i])); + } + for(int w = 0; w < num_worker; w++) { + pthread_join(workers[w], 0); + } + + } else { + unit_of_work_arg unit_arg = { + xid, + 0, + i, + 1, + ap + }; + bg_unit_of_work(&unit_arg); + } + Tcommit(xid); + + gettimeofday(&tv, 0); + long long stop = tv.tv_usec + tv.tv_sec * 1000000; + printf("low(ms),%lld\n", stop-start); + fflush(stdout); + + pthread_mutex_lock(&a->mut); + } + pthread_mutex_unlock(&a->mut); + return 0; +} diff --git a/benchmarks/qos.c b/benchmarks/qos.c index af900a5..3f87322 100644 --- a/benchmarks/qos.c +++ b/benchmarks/qos.c @@ -1,175 +1,4 @@ -#include -#include -#include - -void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) { - *slow = malloc(num_rids * sizeof(**slow)); - *fast = malloc((num_rids / 10) * sizeof(**fast)); - - int xid = Tbegin(); - - byte * old = malloc(PAGE_SIZE); - byte * new = malloc(PAGE_SIZE); - - for(long long i = 0; i < num_rids; ) { - pageid_t pid = TpageAlloc(xid); - Page * p = loadPage(xid, pid); - writelock(p->rwlatch,0); - memcpy(old, p->memAddr, PAGE_SIZE); - stasis_slotted_lsn_free_initialize_page(p); - while(i < num_rids && - ( - ((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size - == sizeof(int) - ) - ) { - stasis_record_alloc_done(xid, p, (*slow)[i]); - if(!(i%10)) { - (*fast)[i/10] = stasis_record_alloc_begin(xid, p, sizeof(int)); - if((*fast)[i/10].size!=sizeof(int)) { - break; // don't increment i - } - stasis_record_alloc_done(xid, p, (*fast)[i/10]); - } - assert((*slow)[i].size != -1); - i++; - } - memcpy(new, p->memAddr, PAGE_SIZE); - memcpy(p->memAddr, old, PAGE_SIZE); - unlock(p->rwlatch); - releasePage(p); - TpageSet(xid, pid, new); - } - free(old); - free(new); - - Tcommit(xid); -} - - -typedef struct { - long long num_rids; - long long rid_per_xact; - recordid * rids; - int done; - pthread_mutex_t mut; -} bulk_worker_args; - -static int (*original_write_entry) (struct stasis_log_t* log, LogEntry * e); - -static int net_latency = 2; - - -int my_write_entry(struct stasis_log_t* log, LogEntry *e) { - usleep(net_latency * 1000); - return original_write_entry(log,e); -} - -void* normal_worker(void * ap) { - bulk_worker_args * a = ap; - pthread_mutex_lock(&a->mut); - for(int i = 0; !a->done; i++) { - pthread_mutex_unlock(&a->mut); - - struct timeval tv; - gettimeofday(&tv, 0); - long long start = tv.tv_usec + tv.tv_sec * 1000000; - - int xid = Tbegin(); - for(int j = 0; j < a->rid_per_xact; j++) { - int val = i * a->rid_per_xact + j; - Tset(xid, a->rids[j%a->num_rids], &val); - } - Tcommit(xid); - - gettimeofday(&tv, 0); - long long stop = tv.tv_usec + tv.tv_sec * 1000000; - printf("low(ms),%lld\n", stop-start); - fflush(stdout); - - pthread_mutex_lock(&a->mut); - } - pthread_mutex_unlock(&a->mut); - return 0; - -} -typedef struct { - int xid; - int n; // which worker am i? - int i; // which iteration is this? - int divisor; - bulk_worker_args * a; -} unit_of_work_arg; -void * bg_unit_of_work(void * ap) { - unit_of_work_arg * ua = ap; - bulk_worker_args * a = ua->a; - - stasis_log_reordering_handle_t * rh - = stasis_log_reordering_handle_open(&XactionTable[ua->xid%MAX_TRANSACTIONS], - stasis_log_file, - (stasis_log_write_buffer_size * 0.25)/ua->divisor, - //512*1024/ua->divisor, // 0.5 mb in log tail at once - 1000000/ua->divisor, // max num outstanding requests - (50 * 1024 * 1024)/ua->divisor // max backlog in bytes - ); - for(int j = 0; j < a->rid_per_xact/ua->divisor; j++) { - int val = ua->i * (a->rid_per_xact/ua->divisor) + j; - TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val); - } - stasis_log_reordering_handle_close(rh); - return 0; -} - -void* bg_worker(void * ap) { - bulk_worker_args * a = ap; - pthread_mutex_lock(&a->mut); - for(int i = 0; !a->done; i++) { - pthread_mutex_unlock(&a->mut); - - struct timeval tv; - gettimeofday(&tv, 0); - long long start = tv.tv_usec + tv.tv_sec * 1000000; - - int xid = Tbegin(); - if(stasis_log_file->write_entry == my_write_entry) { - // based on tweaking; also, normal-net is ~ 100x slower than nromal - int num_worker = 100; - pthread_t workers[num_worker]; - unit_of_work_arg args[num_worker]; - for(int w = 0; w < num_worker; w++) { - args[i].xid = xid; - args[i].n = w; - args[i].i = i; - args[i].divisor = num_worker; - args[i].a = a; - pthread_create(&workers[w], 0, bg_unit_of_work, &(args[i])); - } - for(int w = 0; w < num_worker; w++) { - pthread_join(workers[w], 0); - } - - } else { - unit_of_work_arg unit_arg = { - xid, - 0, - i, - 1, - ap - }; - bg_unit_of_work(&unit_arg); - } - Tcommit(xid); - - gettimeofday(&tv, 0); - long long stop = tv.tv_usec + tv.tv_sec * 1000000; - printf("low(ms),%lld\n", stop-start); - fflush(stdout); - - pthread_mutex_lock(&a->mut); - } - pthread_mutex_unlock(&a->mut); - return 0; -} +#include "lsn_bench_common.h" int main(int argc, char** argv) { char * mode = argv[1]; @@ -188,7 +17,7 @@ int main(int argc, char** argv) { // XXX instead of overriding this, set tail of priority log to 80% // stasis log buf or something... - stasis_log_write_buffer_size = 50 * 1024 * 1024; + // stasis_log_write_buffer_size = 50 * 1024 * 1024; printf("%s %s %s %s %lld\n", argv[0], argv[1], argv[2], argv[3], stasis_log_write_buffer_size); @@ -205,12 +34,10 @@ int main(int argc, char** argv) { } else if (!strcmp(mode, "normal")) { pthread_create(&worker, 0, normal_worker, &a); } else if (!strcmp(mode, "normal-net")) { - original_write_entry = stasis_log_file->write_entry; - stasis_log_file->write_entry = my_write_entry; + emulate_remote_log(); pthread_create(&worker, 0, normal_worker, &a); } else if (!strcmp(mode, "bg-net")) { - original_write_entry = stasis_log_file->write_entry; - stasis_log_file->write_entry = my_write_entry; + emulate_remote_log(); pthread_create(&worker, 0, bg_worker, &a); } else { assert(!strcmp(mode, "bg")); @@ -218,7 +45,6 @@ int main(int argc, char** argv) { } sleep(10); - // sleep 10 (reach steady state) // run benchmark here for(int i = 0; i < 60; i++) { @@ -237,7 +63,6 @@ int main(int argc, char** argv) { } - pthread_mutex_lock(&a.mut); a.done = 1; pthread_mutex_unlock(&a.mut); diff --git a/benchmarks/writeBack.c b/benchmarks/writeBack.c index c91fc0f..cc8055f 100644 --- a/benchmarks/writeBack.c +++ b/benchmarks/writeBack.c @@ -1,86 +1,4 @@ -#include -#include -#include -void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) { - *slow = malloc(num_rids * sizeof(**slow)); - *fast = malloc((num_rids / 10) * sizeof(**fast)); - - int xid = Tbegin(); - - byte * old = malloc(PAGE_SIZE); - byte * new = malloc(PAGE_SIZE); - - for(long long i = 0; i < num_rids; ) { - pageid_t pid = TpageAlloc(xid); - Page * p = loadPage(xid, pid); - writelock(p->rwlatch,0); - memcpy(old, p->memAddr, PAGE_SIZE); - stasis_slotted_lsn_free_initialize_page(p); - while(i < num_rids && - ( - ((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size - == sizeof(int) - ) - ) { - stasis_record_alloc_done(xid, p, (*slow)[i]); - if(!(i%10)) { - (*fast)[i/10] = stasis_record_alloc_begin(xid, p, sizeof(int)); - if((*fast)[i/10].size!=sizeof(int)) { - break; // don't increment i - } - stasis_record_alloc_done(xid, p, (*fast)[i/10]); - } - assert((*slow)[i].size != -1); - i++; - } - memcpy(new, p->memAddr, PAGE_SIZE); - memcpy(p->memAddr, old, PAGE_SIZE); - unlock(p->rwlatch); - releasePage(p); - TpageSet(xid, pid, new); - } - free(old); - free(new); - - Tcommit(xid); -} - -typedef struct { - pageid_t pid; - pageoff_t off; - pageoff_t len; - int val; -} cached_addr; - -void build_cache(recordid * rids, cached_addr** cache, long long count) { - *cache = malloc (sizeof(**cache) * count); - lsn_t log_trunc = stasis_log_file->truncation_point(stasis_log_file); - for(long long i = 0; i < count; i++) { - (*cache)[i].pid = rids[i].page; - - Page * p = loadPage(-1, (*cache)[i].pid); - readlock(p->rwlatch,0); - byte * b = stasis_record_write_begin(-1, p, rids[i]); - (*cache)[i].off = b - p->memAddr; - stasis_record_write_done(-1, p, rids[i], b); - stasis_page_lsn_write(-1, p, log_trunc); - (*cache)[i].len = stasis_record_type_to_size(rids[i].size); - (*cache)[i].val = 0; - unlock(p->rwlatch); - // releasePage(p); - } -} -int net_latency = 2; -static byte * (*origWrite)(int xid, Page *p, recordid rid); -static byte * slowWrite(int xid, Page *p, recordid rid) { - usleep(net_latency * 1000); - return origWrite(xid,p,rid); -} -static const byte * (*origRead)(int xid, Page *p, recordid rid); -static const byte * slowRead(int xid, Page *p, recordid rid) { - usleep(net_latency * 1000); - return origRead(xid,p,rid); -} +#include "lsn_bench_common.h" int main (int argc, char ** argv) { unlink("storefile.txt"); @@ -97,22 +15,15 @@ int main (int argc, char ** argv) { Tinit(); alloc_rids(num_rids,&rids,&fast); + + if(!(strcmp(mode, "normal-net")&&strcmp(mode,"writeback-net"))) { + emulate_remote_pages(); + } + if(writeback) { build_cache(rids,&cache,num_rids); } - if(!(strcmp(mode, "normal-net")&&strcmp(mode,"writeback-net"))) { - origWrite = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite; - origRead = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead; - - // xxx a bit of cheating; don't pay latency for lsn write - // (could amortize w/ recordWrite) - - stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite = slowWrite; - stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead = slowRead; - } - - lsn_t last_lsn = 0; for(long long i = 0; i < num_xacts; i++) { int xid = Tbegin(); diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index 0db3684..a2541df 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -77,14 +77,12 @@ stasis_log_t* stasis_log_file = 0; static int pendingCommits; -TransactionLog LogTransBegin(stasis_log_t* log, int xid) { - TransactionLog tl; - tl.xid = xid; +void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* tl) { + tl->xid = xid; DEBUG("Log Begin %d\n", xid); - tl.prevLSN = -1; - tl.recLSN = -1; - return tl; + tl->prevLSN = -1; + tl->recLSN = -1; } static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) { @@ -93,8 +91,11 @@ static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) { log->write_entry(log, e); + pthread_mutex_lock(&l->mut); if(l->prevLSN == -1) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; + pthread_mutex_unlock(&l->mut); + DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); @@ -113,8 +114,10 @@ static lsn_t LogTransCommonPrepare(stasis_log_t* log, TransactionLog * l) { e->xid, e->prevLSN, l->recLSN, getPrepareRecLSN(e)); log->write_entry(log, e); + pthread_mutex_lock(&l->mut); if(l->prevLSN == -1) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; + pthread_mutex_unlock(&l->mut); DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); @@ -137,9 +140,10 @@ LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l, log->write_entry(log, e); DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size); - + pthread_mutex_lock(&l->mut); if(l->prevLSN == -1) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; + pthread_mutex_unlock(&l->mut); return e; } diff --git a/src/stasis/logger/reorderingHandle.c b/src/stasis/logger/reorderingHandle.c index eddeca2..def2443 100644 --- a/src/stasis/logger/reorderingHandle.c +++ b/src/stasis/logger/reorderingHandle.c @@ -1,6 +1,9 @@ #include #include #include + +long stasis_log_reordering_usleep_after_flush = 0; + static void* stasis_log_reordering_handle_worker(void * a) { stasis_log_reordering_handle_t * h = (typeof(h))a; pthread_mutex_lock(&h->mut); @@ -17,10 +20,6 @@ static void* stasis_log_reordering_handle_worker(void * a) { assert(e->xid != INVALID_XID); chunk_len += sizeofLogEntry(e); - h->cur_len--; - h->phys_size -= sizeofLogEntry(e); - h->cur_off = (h->cur_off+1)%h->max_len; - if(h->queue[h->cur_off].p) { Page * p = h->queue[h->cur_off].p; writelock(p->rwlatch,0); @@ -30,11 +29,19 @@ static void* stasis_log_reordering_handle_worker(void * a) { } /* else it's the caller's problem; flush(), and checking the xaction table for prevLSN is their friend. */ + + h->cur_len--; + h->phys_size -= sizeofLogEntry(e); + h->cur_off = (h->cur_off+1)%h->max_len; + } if(chunk_len > 0) { lsn_t to_force = h->l->prevLSN; pthread_mutex_unlock(&h->mut); LogForce(h->log, to_force, LOG_FORCE_COMMIT); + if(stasis_log_reordering_usleep_after_flush) { + usleep(stasis_log_reordering_usleep_after_flush); + } pthread_mutex_lock(&h->mut); } } diff --git a/src/stasis/operations/lsnFreeSet.c b/src/stasis/operations/lsnFreeSet.c index c77d819..9058eb2 100644 --- a/src/stasis/operations/lsnFreeSet.c +++ b/src/stasis/operations/lsnFreeSet.c @@ -76,13 +76,14 @@ int TsetReorderableWriteBack(int xid, stasis_log_reordering_handle_t * h, byte * b = (byte*)&a[2]; memcpy(b,dat,len); memcpy(b+len,dat,len); + lsn_t ret = 0; if(!h) { - TwritebackUpdate(xid,page,buf,sz,OPERATION_SET_LSN_FREE); + ret = TwritebackUpdate(xid,page,buf,sz,OPERATION_SET_LSN_FREE); } else { TreorderableWritebackUpdate(xid,h,page,buf,sz,OPERATION_SET_LSN_FREE); } free(buf); - return 0; + return ret; } int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len, const void * dat, const void * olddat) { return TsetReorderableWriteBack(xid,0,page,off,len,dat,olddat); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 3e7e24c..8f3cbab 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -52,6 +52,9 @@ pthread_mutex_t transactional_2_mutex; void setupOperationsTable() { memset(XactionTable, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS); + for(int i = 0; i < MAX_TRANSACTIONS; i++) { + pthread_mutex_init(&XactionTable[i].mut,0); + } // @todo clean out unused constants... operationsTable[OPERATION_SET] = getSet(); operationsTable[OPERATION_SET_INVERSE] = getSetInverse(); @@ -282,7 +285,7 @@ int Tbegin() { pthread_mutex_unlock(&transactional_2_mutex); - XactionTable[index] = LogTransBegin(stasis_log_file, xidCount_tmp); + LogTransBegin(stasis_log_file, xidCount_tmp, &XactionTable[index]); if(globalLockManager.begin) { globalLockManager.begin(XactionTable[index].xid); } @@ -317,6 +320,7 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page, stasis_log_reordering_handle_t * h = (typeof(h))hp; assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid); Page * p = loadPage(xid, page); + assert(p); try { if(globalLockManager.writeLockPage) { globalLockManager.writeLockPage(xid, p->id); @@ -363,7 +367,7 @@ void TreorderableWritebackUpdate(int xid, void* hp, pthread_mutex_lock(&h->mut); LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen); stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(e)); - + pthread_mutex_unlock(&h->mut); } compensated_function void TupdateStr(int xid, pageid_t page, const char *dat, size_t datlen, int op) { diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index 8364f77..15f7e0d 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -65,6 +65,7 @@ typedef struct TransactionLog { int xid; lsn_t prevLSN; lsn_t recLSN; + pthread_mutex_t mut; } TransactionLog; typedef struct stasis_log_t stasis_log_t; @@ -202,7 +203,7 @@ void LogForce(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode); Inform the logging layer that a new transaction has begun, and obtain a handle. */ -TransactionLog LogTransBegin(stasis_log_t* log, int xid); +void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* l); /** Write a transaction PREPARE to the log tail. Blocks until the diff --git a/stasis/logger/reorderingHandle.h b/stasis/logger/reorderingHandle.h index 04e1926..188103a 100644 --- a/stasis/logger/reorderingHandle.h +++ b/stasis/logger/reorderingHandle.h @@ -3,6 +3,8 @@ #include #include +extern long stasis_log_reordering_usleep_after_flush; + typedef struct { Page * p; unsigned int op; diff --git a/stasis/operations/lsnFreeSet.h b/stasis/operations/lsnFreeSet.h index 19624af..665798c 100644 --- a/stasis/operations/lsnFreeSet.h +++ b/stasis/operations/lsnFreeSet.h @@ -8,4 +8,7 @@ int TsetReorderable(int xid, stasis_log_reordering_handle_t * h, recordid rid, const void *dat); int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len, const void * dat, const void * olddat); +int TsetReorderableWriteBack(int xid, stasis_log_reordering_handle_t * h, + pageid_t page, pageoff_t off, pageoff_t len, + const void * dat, const void * olddat); #endif //__LSN_FREE_SET_H diff --git a/test/stasis/check_logWriter.c b/test/stasis/check_logWriter.c index d3623f3..86c8133 100644 --- a/test/stasis/check_logWriter.c +++ b/test/stasis/check_logWriter.c @@ -387,9 +387,9 @@ START_TEST(loggerCheckThreaded) { Tdeinit(); } END_TEST - +void setupOperationsTable(); void reopenLogWorkload(int truncating) { - + setupOperationsTable(); stasis_truncation_automatic = 0; const int ENTRY_COUNT = 1000; @@ -408,7 +408,9 @@ void reopenLogWorkload(int truncating) { } int xid = 1; - TransactionLog l = LogTransBegin(stasis_log_file, xid); + TransactionLog l; + pthread_mutex_init(&l.mut,0); + LogTransBegin(stasis_log_file, xid, &l); lsn_t startLSN = 0; LogEntry * entries[ENTRY_COUNT];