From d413bb27f3c152eae5ecfc2eb196a2fa924ad382 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 19 Mar 2009 03:36:13 +0000 Subject: [PATCH] support for lsn-free writeback; partial benchmark code for submission --- benchmarks/CMakeLists.txt | 3 +- benchmarks/Makefile.am | 2 +- benchmarks/qos.c | 248 +++++++++++++++++++++++++++ benchmarks/writeBack.c | 147 ++++++++++++++++ benchmarks/writeback.sh | 43 +++++ src/stasis/logger/reorderingHandle.c | 40 ++++- src/stasis/operations/lsnFreeSet.c | 26 ++- src/stasis/page.c | 4 + src/stasis/transactional2.c | 32 +++- stasis/logger/reorderingHandle.h | 11 +- stasis/operations/lsnFreeSet.h | 4 +- stasis/page.h | 10 ++ stasis/transactional.h | 13 ++ test/stasis/check_operations.c | 5 +- 14 files changed, 565 insertions(+), 23 deletions(-) create mode 100644 benchmarks/qos.c create mode 100644 benchmarks/writeBack.c create mode 100644 benchmarks/writeback.sh diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 7d20df2..c4e6b54 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -24,7 +24,8 @@ CREATE_EXECUTABLE(linearHashNTAWriteRequests) CREATE_EXECUTABLE(transitiveClosure) CREATE_EXECUTABLE(zeroCopy) CREATE_EXECUTABLE(sequentialThroughput) - +CREATE_EXECUTABLE(qos) +CREATE_EXECUTABLE(writeBack) IF(CHECK_LIBRARY) ADD_TEST(rose rose) ENDIF(CHECK_LIBRARY) diff --git a/benchmarks/Makefile.am b/benchmarks/Makefile.am index f48dd90..7a39c72 100644 --- a/benchmarks/Makefile.am +++ b/benchmarks/Makefile.am @@ -10,7 +10,7 @@ noinst_PROGRAMS=lhtableThreaded naiveHash \ naiveMultiThreaded rawSet arrayListSet \ linearHashNTA linkedListNTA pageOrientedListNTA \ linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests \ - transitiveClosure zeroCopy sequentialThroughput rose roseTable + transitiveClosure zeroCopy sequentialThroughput rose roseTable qos writeBack endif AM_CFLAGS=${GLOBAL_CFLAGS} AM_CXXFLAGS=${GLOBAL_CXXFLAGS} -I ${top_builddir}/src diff --git a/benchmarks/qos.c b/benchmarks/qos.c new file mode 100644 index 0000000..af900a5 --- /dev/null +++ b/benchmarks/qos.c @@ -0,0 +1,248 @@ +#include +#include +#include + +void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) { + *slow = malloc(num_rids * sizeof(**slow)); + *fast = malloc((num_rids / 10) * sizeof(**fast)); + + int xid = Tbegin(); + + byte * old = malloc(PAGE_SIZE); + byte * new = malloc(PAGE_SIZE); + + for(long long i = 0; i < num_rids; ) { + pageid_t pid = TpageAlloc(xid); + Page * p = loadPage(xid, pid); + writelock(p->rwlatch,0); + memcpy(old, p->memAddr, PAGE_SIZE); + stasis_slotted_lsn_free_initialize_page(p); + while(i < num_rids && + ( + ((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size + == sizeof(int) + ) + ) { + stasis_record_alloc_done(xid, p, (*slow)[i]); + if(!(i%10)) { + (*fast)[i/10] = stasis_record_alloc_begin(xid, p, sizeof(int)); + if((*fast)[i/10].size!=sizeof(int)) { + break; // don't increment i + } + stasis_record_alloc_done(xid, p, (*fast)[i/10]); + } + assert((*slow)[i].size != -1); + i++; + } + memcpy(new, p->memAddr, PAGE_SIZE); + memcpy(p->memAddr, old, PAGE_SIZE); + unlock(p->rwlatch); + releasePage(p); + TpageSet(xid, pid, new); + } + free(old); + free(new); + + Tcommit(xid); +} + + +typedef struct { + long long num_rids; + long long rid_per_xact; + recordid * rids; + int done; + pthread_mutex_t mut; +} bulk_worker_args; + +static int (*original_write_entry) (struct stasis_log_t* log, LogEntry * e); + +static int net_latency = 2; + + +int my_write_entry(struct stasis_log_t* log, LogEntry *e) { + usleep(net_latency * 1000); + return original_write_entry(log,e); +} + +void* normal_worker(void * ap) { + bulk_worker_args * a = ap; + pthread_mutex_lock(&a->mut); + for(int i = 0; !a->done; i++) { + pthread_mutex_unlock(&a->mut); + + struct timeval tv; + gettimeofday(&tv, 0); + long long start = tv.tv_usec + tv.tv_sec * 1000000; + + int xid = Tbegin(); + for(int j = 0; j < a->rid_per_xact; j++) { + int val = i * a->rid_per_xact + j; + Tset(xid, a->rids[j%a->num_rids], &val); + } + Tcommit(xid); + + gettimeofday(&tv, 0); + long long stop = tv.tv_usec + tv.tv_sec * 1000000; + printf("low(ms),%lld\n", stop-start); + fflush(stdout); + + pthread_mutex_lock(&a->mut); + } + pthread_mutex_unlock(&a->mut); + return 0; + +} +typedef struct { + int xid; + int n; // which worker am i? + int i; // which iteration is this? + int divisor; + bulk_worker_args * a; +} unit_of_work_arg; +void * bg_unit_of_work(void * ap) { + unit_of_work_arg * ua = ap; + bulk_worker_args * a = ua->a; + + stasis_log_reordering_handle_t * rh + = stasis_log_reordering_handle_open(&XactionTable[ua->xid%MAX_TRANSACTIONS], + stasis_log_file, + (stasis_log_write_buffer_size * 0.25)/ua->divisor, + //512*1024/ua->divisor, // 0.5 mb in log tail at once + 1000000/ua->divisor, // max num outstanding requests + (50 * 1024 * 1024)/ua->divisor // max backlog in bytes + ); + for(int j = 0; j < a->rid_per_xact/ua->divisor; j++) { + int val = ua->i * (a->rid_per_xact/ua->divisor) + j; + TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val); + } + stasis_log_reordering_handle_close(rh); + return 0; +} + +void* bg_worker(void * ap) { + bulk_worker_args * a = ap; + pthread_mutex_lock(&a->mut); + for(int i = 0; !a->done; i++) { + pthread_mutex_unlock(&a->mut); + + struct timeval tv; + gettimeofday(&tv, 0); + long long start = tv.tv_usec + tv.tv_sec * 1000000; + + int xid = Tbegin(); + if(stasis_log_file->write_entry == my_write_entry) { + // based on tweaking; also, normal-net is ~ 100x slower than nromal + int num_worker = 100; + pthread_t workers[num_worker]; + unit_of_work_arg args[num_worker]; + for(int w = 0; w < num_worker; w++) { + args[i].xid = xid; + args[i].n = w; + args[i].i = i; + args[i].divisor = num_worker; + args[i].a = a; + pthread_create(&workers[w], 0, bg_unit_of_work, &(args[i])); + } + for(int w = 0; w < num_worker; w++) { + pthread_join(workers[w], 0); + } + + } else { + unit_of_work_arg unit_arg = { + xid, + 0, + i, + 1, + ap + }; + bg_unit_of_work(&unit_arg); + } + Tcommit(xid); + + gettimeofday(&tv, 0); + long long stop = tv.tv_usec + tv.tv_sec * 1000000; + printf("low(ms),%lld\n", stop-start); + fflush(stdout); + + pthread_mutex_lock(&a->mut); + } + pthread_mutex_unlock(&a->mut); + return 0; +} + +int main(int argc, char** argv) { + char * mode = argv[1]; + bulk_worker_args a; + a.num_rids = atoll(argv[2]); + a.rid_per_xact = atoll(argv[3]); + + a.done = 0; + pthread_mutex_init(&a.mut,0); + unlink("storefile.txt"); + unlink("logfile.txt"); + + // disable truncation, as it interferes w/ the benchmark. + + stasis_truncation_automatic = 0; + // XXX instead of overriding this, set tail of priority log to 80% + // stasis log buf or something... + + stasis_log_write_buffer_size = 50 * 1024 * 1024; + + printf("%s %s %s %s %lld\n", argv[0], argv[1], argv[2], argv[3], + stasis_log_write_buffer_size); + + Tinit(); + + // 10% as big as slow rids; interspersed + recordid * fast_rids; + + alloc_rids(a.num_rids, &a.rids, &fast_rids); + pthread_t worker; + if(!strcmp(mode, "none")) { + // nop + } else if (!strcmp(mode, "normal")) { + pthread_create(&worker, 0, normal_worker, &a); + } else if (!strcmp(mode, "normal-net")) { + original_write_entry = stasis_log_file->write_entry; + stasis_log_file->write_entry = my_write_entry; + pthread_create(&worker, 0, normal_worker, &a); + } else if (!strcmp(mode, "bg-net")) { + original_write_entry = stasis_log_file->write_entry; + stasis_log_file->write_entry = my_write_entry; + pthread_create(&worker, 0, bg_worker, &a); + } else { + assert(!strcmp(mode, "bg")); + pthread_create(&worker, 0, bg_worker, &a); + } + + sleep(10); + // sleep 10 (reach steady state) + + // run benchmark here + for(int i = 0; i < 60; i++) { + struct timeval tv; + gettimeofday(&tv, 0); + long long start = tv.tv_usec + tv.tv_sec * 1000000; + int xid = Tbegin(); + TsetLsnFree(xid, fast_rids[i % (a.num_rids/10)], &i); + Tcommit(xid); + gettimeofday(&tv, 0); + long long stop = tv.tv_usec + tv.tv_sec * 1000000; + + printf("high(ms),%lld\n", stop-start); + fflush(stdout); + sleep(1); + + } + + + pthread_mutex_lock(&a.mut); + a.done = 1; + pthread_mutex_unlock(&a.mut); + + pthread_join(worker, 0); + Tdeinit(); + +} diff --git a/benchmarks/writeBack.c b/benchmarks/writeBack.c new file mode 100644 index 0000000..c91fc0f --- /dev/null +++ b/benchmarks/writeBack.c @@ -0,0 +1,147 @@ +#include +#include +#include +void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) { + *slow = malloc(num_rids * sizeof(**slow)); + *fast = malloc((num_rids / 10) * sizeof(**fast)); + + int xid = Tbegin(); + + byte * old = malloc(PAGE_SIZE); + byte * new = malloc(PAGE_SIZE); + + for(long long i = 0; i < num_rids; ) { + pageid_t pid = TpageAlloc(xid); + Page * p = loadPage(xid, pid); + writelock(p->rwlatch,0); + memcpy(old, p->memAddr, PAGE_SIZE); + stasis_slotted_lsn_free_initialize_page(p); + while(i < num_rids && + ( + ((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size + == sizeof(int) + ) + ) { + stasis_record_alloc_done(xid, p, (*slow)[i]); + if(!(i%10)) { + (*fast)[i/10] = stasis_record_alloc_begin(xid, p, sizeof(int)); + if((*fast)[i/10].size!=sizeof(int)) { + break; // don't increment i + } + stasis_record_alloc_done(xid, p, (*fast)[i/10]); + } + assert((*slow)[i].size != -1); + i++; + } + memcpy(new, p->memAddr, PAGE_SIZE); + memcpy(p->memAddr, old, PAGE_SIZE); + unlock(p->rwlatch); + releasePage(p); + TpageSet(xid, pid, new); + } + free(old); + free(new); + + Tcommit(xid); +} + +typedef struct { + pageid_t pid; + pageoff_t off; + pageoff_t len; + int val; +} cached_addr; + +void build_cache(recordid * rids, cached_addr** cache, long long count) { + *cache = malloc (sizeof(**cache) * count); + lsn_t log_trunc = stasis_log_file->truncation_point(stasis_log_file); + for(long long i = 0; i < count; i++) { + (*cache)[i].pid = rids[i].page; + + Page * p = loadPage(-1, (*cache)[i].pid); + readlock(p->rwlatch,0); + byte * b = stasis_record_write_begin(-1, p, rids[i]); + (*cache)[i].off = b - p->memAddr; + stasis_record_write_done(-1, p, rids[i], b); + stasis_page_lsn_write(-1, p, log_trunc); + (*cache)[i].len = stasis_record_type_to_size(rids[i].size); + (*cache)[i].val = 0; + unlock(p->rwlatch); + // releasePage(p); + } +} +int net_latency = 2; +static byte * (*origWrite)(int xid, Page *p, recordid rid); +static byte * slowWrite(int xid, Page *p, recordid rid) { + usleep(net_latency * 1000); + return origWrite(xid,p,rid); +} +static const byte * (*origRead)(int xid, Page *p, recordid rid); +static const byte * slowRead(int xid, Page *p, recordid rid) { + usleep(net_latency * 1000); + return origRead(xid,p,rid); +} + +int main (int argc, char ** argv) { + unlink("storefile.txt"); + unlink("logfile.txt"); + char * mode = argv[1]; + long long num_rids = atoll(argv[2]); + long long num_xacts = atoll(argv[3]); + long long writes_per_xact = atoll(argv[4]); + recordid * rids; + recordid * fast; + cached_addr * cache; + int writeback = !(strcmp(mode, "writeback")&&strcmp(mode,"writeback-net")); + // stasis_truncation_automatic = 0; + Tinit(); + + alloc_rids(num_rids,&rids,&fast); + if(writeback) { + build_cache(rids,&cache,num_rids); + } + + if(!(strcmp(mode, "normal-net")&&strcmp(mode,"writeback-net"))) { + origWrite = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite; + origRead = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead; + + // xxx a bit of cheating; don't pay latency for lsn write + // (could amortize w/ recordWrite) + + stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite = slowWrite; + stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead = slowRead; + } + + + lsn_t last_lsn = 0; + for(long long i = 0; i < num_xacts; i++) { + int xid = Tbegin(); + + for(long long j = 0; j < writes_per_xact; j++) { + long long idx = ((i*writes_per_xact)+j)%num_rids; + + if(!(strcmp(mode, "normal")&&strcmp(mode, "normal-net"))) { + TsetLsnFree(xid, rids[idx], &j); + } else { + assert(writeback); + int old = cache[idx].val; + cache[idx].val = j; + last_lsn = TsetWriteBack(xid, cache[idx].pid,cache[idx].off, + cache[idx].len,&j,&old); + } + } + Tcommit(xid); + } + // XXX hack; really, want to register upcall in buffermanager... + if(writeback) { + for(long long i = 0; i < num_rids; i++) { + Page *p = loadPage(-1, rids[i].page); + writelock(p->rwlatch,0); + stasis_record_write(-1, p, last_lsn, rids[i], (byte*)&cache[i].val); + unlock(p->rwlatch); + releasePage(p); + releasePage(p); + } + } + Tdeinit(); +} diff --git a/benchmarks/writeback.sh b/benchmarks/writeback.sh new file mode 100644 index 0000000..b99844f --- /dev/null +++ b/benchmarks/writeback.sh @@ -0,0 +1,43 @@ +echo w1 +time ./writeBack writeback 100 1 1000000 + +echo n1 +time ./writeBack normal 100 1 1000000 + +echo w10 +time ./writeBack writeback 100 10 100000 + +echo n10 +time ./writeBack normal 100 10 100000 + +echo w100 +time ./writeBack writeback 100 100 10000 + +echo n100 +time ./writeBack normal 100 100 10000 + +echo w1000 +time ./writeBack writeback 100 1000 1000 + +echo n1000 +time ./writeBack normal 100 1000 1000 + +echo w10000 +time ./writeBack writeback 100 10000 100 + +echo n10000 +time ./writeBack normal 100 10000 100 + +#echo w100000 +#time ./writeBack writeback 100 100000 10 + +#echo n100000 +#time ./writeBack normal 100 100000 10 + +#echo w1000000 +#time ./writeBack writeback 100 1000000 1 + +#echo n1000000 +#time ./writeBack normal 100 1000000 1 + + diff --git a/src/stasis/logger/reorderingHandle.c b/src/stasis/logger/reorderingHandle.c index 5829400..eddeca2 100644 --- a/src/stasis/logger/reorderingHandle.c +++ b/src/stasis/logger/reorderingHandle.c @@ -16,15 +16,20 @@ static void* stasis_log_reordering_handle_worker(void * a) { h->queue[h->cur_off].arg_size); assert(e->xid != INVALID_XID); chunk_len += sizeofLogEntry(e); - Page * p = h->queue[h->cur_off].p; h->cur_len--; + h->phys_size -= sizeofLogEntry(e); h->cur_off = (h->cur_off+1)%h->max_len; - writelock(p->rwlatch,0); - stasis_page_lsn_write(e->xid, p, e->LSN); - unlock(p->rwlatch); - releasePage(p); + if(h->queue[h->cur_off].p) { + Page * p = h->queue[h->cur_off].p; + writelock(p->rwlatch,0); + stasis_page_lsn_write(e->xid, p, e->LSN); + unlock(p->rwlatch); + releasePage(p); + } /* + else it's the caller's problem; flush(), and checking the + xaction table for prevLSN is their friend. */ } if(chunk_len > 0) { lsn_t to_force = h->l->prevLSN; @@ -64,7 +69,8 @@ stasis_log_reordering_handle_t * stasis_log_reordering_handle_open(TransactionLog * l, stasis_log_t* log, size_t chunk_len, - size_t max_len) { + size_t max_len, + size_t max_size) { stasis_log_reordering_handle_t * ret = malloc(sizeof(*ret)); ret->l = l; @@ -78,18 +84,32 @@ stasis_log_reordering_handle_open(TransactionLog * l, ret->max_len = max_len; ret->cur_off = 0; ret->cur_len = 0; + ret->phys_size = 0; + ret->max_size = max_size; pthread_create(&ret->worker,0,stasis_log_reordering_handle_worker,ret); return ret; } -void stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h, +static int AskedForBump = 0; +size_t stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h, Page * p, unsigned int op, const byte * arg, - size_t arg_size + size_t arg_size, + size_t phys_size ) { - while(h->cur_len == h->max_len) { + while(h->phys_size >= h->max_size) { pthread_cond_wait(&h->done, &h->mut); } + + while(h->cur_len == h->max_len) { + if(!AskedForBump) { + printf("Warning: bump max_len\n"); fflush(stdout); + AskedForBump = 1; + } + pthread_cond_wait(&h->done, &h->mut); + } + + intptr_t idx = (h->cur_off+h->cur_len)%h->max_len; h->queue[idx].p = p; h->queue[idx].op = op; @@ -97,5 +117,7 @@ void stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h, memcpy(h->queue[idx].arg,arg,arg_size); h->queue[idx].arg_size = arg_size; h->cur_len++; + h->phys_size += phys_size; pthread_cond_signal(&h->ready); + return h->phys_size; } diff --git a/src/stasis/operations/lsnFreeSet.c b/src/stasis/operations/lsnFreeSet.c index 8126da4..c77d819 100644 --- a/src/stasis/operations/lsnFreeSet.c +++ b/src/stasis/operations/lsnFreeSet.c @@ -24,7 +24,7 @@ static int op_lsn_free_unset(const LogEntry *e, Page *p) { memcpy(p->memAddr + a[0], b+a[1], a[1]); return 0; } -int TsetLsnFreeReorderable(int xid, stasis_log_reordering_handle_t * h, +int TsetReorderable(int xid, stasis_log_reordering_handle_t * h, recordid rid, const void * dat) { Page * p = loadPage(xid, rid.page); readlock(p->rwlatch,0); @@ -63,7 +63,29 @@ int TsetLsnFreeReorderable(int xid, stasis_log_reordering_handle_t * h, return 0; } int TsetLsnFree(int xid, recordid rid, const void * dat) { - return TsetLsnFreeReorderable(xid, 0, rid, dat); + return TsetReorderable(xid, 0, rid, dat); +} +int TsetReorderableWriteBack(int xid, stasis_log_reordering_handle_t * h, + pageid_t page, pageoff_t off, pageoff_t len, + const void * dat, const void * olddat) { + intptr_t sz = 2 * (sizeof(pageoff_t) + len); + byte * buf = calloc(sz,1); + pageoff_t * a = (pageoff_t*)buf; + a[0] = off; + a[1] = len; + byte * b = (byte*)&a[2]; + memcpy(b,dat,len); + memcpy(b+len,dat,len); + if(!h) { + TwritebackUpdate(xid,page,buf,sz,OPERATION_SET_LSN_FREE); + } else { + TreorderableWritebackUpdate(xid,h,page,buf,sz,OPERATION_SET_LSN_FREE); + } + free(buf); + return 0; +} +int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len, const void * dat, const void * olddat) { + return TsetReorderableWriteBack(xid,0,page,off,len,dat,olddat); } Operation getSetLsnFree() { diff --git a/src/stasis/page.c b/src/stasis/page.c index cbc1d46..abba130 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -145,6 +145,10 @@ int stasis_page_impl_register(page_impl p) { page_impls[p.page_type] = p; return 0; } +page_impl * stasis_page_impl_get(int id) { + assert(page_impls[id].page_type == id); + return & page_impls[id]; +} void stasis_record_write(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) { assertlocked(p->rwlatch); assert( (p->id == rid.page) && (p->memAddr != NULL) ); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index ed72a93..3e7e24c 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -325,13 +325,12 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page, pthread_mutex_lock(&h->mut); - // e = LogUpdate(stasis_log_file, &XactionTable[xid % MAX_TRANSACTIONS], - // p, op, dat, datlen); - stasis_log_reordering_handle_append(h, p, op, dat, datlen); - LogEntry * e = allocUpdateLogEntry(-1, h->l->xid, op, p ? p->id : INVALID_PAGE, dat, datlen); + + stasis_log_reordering_handle_append(h, p, op, dat, datlen, sizeofLogEntry(e)); + e->LSN = 0; writelock(p->rwlatch,0); doUpdate(e, p); @@ -340,7 +339,32 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page, releasePage(p); freeLogEntry(e); } +lsn_t TwritebackUpdate(int xid, pageid_t page, + const void *dat, size_t datlen, int op) { + assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid); + LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen); + TransactionLog* l = &XactionTable[xid % MAX_TRANSACTIONS]; + stasis_log_file->write_entry(stasis_log_file, e); + if(l->prevLSN == -1) { l->recLSN = e->LSN; } + l->prevLSN = e->LSN; + + freeLogEntry(e); + return l->prevLSN; +} +/** DANGER: you need to set the LSN's on the pages that you want to write back, + this method doesn't let you do that, so the only option is to pin until + commit, then set a conservative (too high) lsn */ +void TreorderableWritebackUpdate(int xid, void* hp, + pageid_t page, const void * dat, + size_t datlen, int op) { + stasis_log_reordering_handle_t* h = hp; + assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid); + pthread_mutex_lock(&h->mut); + LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen); + stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(e)); + +} compensated_function void TupdateStr(int xid, pageid_t page, const char *dat, size_t datlen, int op) { Tupdate(xid, page, dat, datlen, op); diff --git a/stasis/logger/reorderingHandle.h b/stasis/logger/reorderingHandle.h index ca23824..04e1926 100644 --- a/stasis/logger/reorderingHandle.h +++ b/stasis/logger/reorderingHandle.h @@ -23,6 +23,8 @@ typedef struct stasis_log_reordering_handle_t { size_t max_len; size_t cur_off; size_t cur_len; + size_t max_size; + size_t phys_size; } stasis_log_reordering_handle_t; #include @@ -33,11 +35,14 @@ stasis_log_reordering_handle_t * stasis_log_reordering_handle_open(TransactionLog * l, stasis_log_t* log, size_t chunk_len, - size_t max_len); -void stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h, + size_t max_len, + size_t max_size); +size_t stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h, Page * p, unsigned int op, const byte * arg, - size_t arg_size); + size_t arg_size, + size_t phys_size + ); #endif //__STASIS_LOG_REORDERING_HANDLE_H diff --git a/stasis/operations/lsnFreeSet.h b/stasis/operations/lsnFreeSet.h index 95baee1..19624af 100644 --- a/stasis/operations/lsnFreeSet.h +++ b/stasis/operations/lsnFreeSet.h @@ -4,6 +4,8 @@ Operation getSetLsnFree(); Operation getSetLsnFreeInverse(); int TsetLsnFree(int xid, recordid rid, const void *dat); -int TsetLsnReorderable(int xid, stasis_log_reordering_handle_t * h, +int TsetReorderable(int xid, stasis_log_reordering_handle_t * h, recordid rid, const void *dat); +int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len, + const void * dat, const void * olddat); #endif //__LSN_FREE_SET_H diff --git a/stasis/page.h b/stasis/page.h index 2029972..f1dd99b 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -865,6 +865,16 @@ typedef struct page_impl { */ int stasis_page_impl_register(page_impl impl); +/** + Get the page_impl for a particular page type. This isn't set + const, so you can write to the function pointers. However, no + attempt has been made to make such things thread safe, so stasis' + worker threads can cause all sorts of undefined trouble if you poke + this. + */ +page_impl* stasis_page_impl_get(int id); + + // -------------------- Page specific, general purpose methods /** diff --git a/stasis/transactional.h b/stasis/transactional.h index c68f6b2..f9f7c70 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -617,6 +617,19 @@ compensated_function void TupdateStr(int xid, pageid_t page, void TreorderableUpdate(int xid, void * h, pageid_t page, const void * dat, size_t datlen, int op); +/** Note; it is *your* responsibility to set the lsn on the page; this + function returns a plausible value */ +lsn_t TwritebackUpdate(int xid, pageid_t page, + const void * dat, size_t datlen, int op); + + +/** DANGER: you need to set the LSN's on the pages that you want to write back, + this method doesn't help you do that, so the only option is to pin until + commit, then set a conservative (too high) lsn */ +void TreorderableWritebackUpdate(int xid, void* h, + pageid_t page, const void * dat, + size_t datlen, int op); + /** * Read the value of a record. * diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 0ba4a7b..b6abff0 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -655,7 +655,8 @@ START_TEST(operation_reorderable) { &XactionTable[xid[0]% MAX_TRANSACTIONS], stasis_log_file, 100, // bytes (far too low!) - 5 // log entries + 10, // log entries + 500 // max byte size ); for(int i = 0; i < 100; i++) { int foo; @@ -664,7 +665,7 @@ START_TEST(operation_reorderable) { if(i%2) { TsetLsnFree(xid[i%2], rid[i], &i); } else { - TsetLsnFreeReorderable(xid[i%2], rh, rid[i], &i); + TsetReorderable(xid[i%2], rh, rid[i], &i); } Tread(xid[i%2],rid[i], &foo); assert(foo == i);