diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index 5b58f75..fa4202b 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -14,6 +14,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c logger/inMemoryLog.c logger/logHandle.c logger/logger2.c logger/logMemory.c page/raw.c page/slotted.c page/lsnFree.c + logger/reorderingHandle.c page/fixed.c compensations.c operations/pageOperations.c page/indirect.c operations/decrement.c operations/increment.c diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index 4f17cb5..3076a4b 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -10,6 +10,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common logger/filePool.c \ logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \ logger/logMemory.c \ + logger/reorderingHandle.c \ page/raw.c page/slotted.c page/lsnFree.c page/fixed.c compensations.c \ operations/pageOperations.c page/indirect.c operations/decrement.c \ operations/increment.c operations/prepare.c operations/set.c \ diff --git a/src/stasis/logger/reorderingHandle.c b/src/stasis/logger/reorderingHandle.c new file mode 100644 index 0000000..5829400 --- /dev/null +++ b/src/stasis/logger/reorderingHandle.c @@ -0,0 +1,101 @@ +#include +#include +#include +static void* stasis_log_reordering_handle_worker(void * a) { + stasis_log_reordering_handle_t * h = (typeof(h))a; + pthread_mutex_lock(&h->mut); + while(h->cur_len || !h->closed) { + while(h->cur_len) { + size_t chunk_len = 0; + while(chunk_len < h->chunk_len && h->cur_len) { + LogEntry * e = LogUpdate(h->log, + h->l, + h->queue[h->cur_off].p, + h->queue[h->cur_off].op, + h->queue[h->cur_off].arg, + h->queue[h->cur_off].arg_size); + assert(e->xid != INVALID_XID); + chunk_len += sizeofLogEntry(e); + Page * p = h->queue[h->cur_off].p; + + h->cur_len--; + h->cur_off = (h->cur_off+1)%h->max_len; + + writelock(p->rwlatch,0); + stasis_page_lsn_write(e->xid, p, e->LSN); + unlock(p->rwlatch); + releasePage(p); + } + if(chunk_len > 0) { + lsn_t to_force = h->l->prevLSN; + pthread_mutex_unlock(&h->mut); + LogForce(h->log, to_force, LOG_FORCE_COMMIT); + pthread_mutex_lock(&h->mut); + } + } + pthread_cond_signal(&h->done); + if(!h->closed) { // XXX hack! + pthread_cond_wait(&h->ready, &h->mut); + } + } + pthread_mutex_unlock(&h->mut); + return 0; +} + +void stasis_log_reordering_handle_flush(stasis_log_reordering_handle_t * h) { + pthread_mutex_lock(&h->mut); + while(h->cur_len > 0) { + pthread_cond_wait(&h->done, &h->mut); + } + pthread_mutex_unlock(&h->mut); +} +void stasis_log_reordering_handle_close(stasis_log_reordering_handle_t * h) { + h->closed = 1; + pthread_cond_signal(&h->ready); + pthread_join(h->worker,0); + assert(h->cur_len == 0); + pthread_mutex_destroy(&h->mut); + pthread_cond_destroy(&h->ready); + pthread_cond_destroy(&h->done); + free(h->queue); + free(h); +} +stasis_log_reordering_handle_t * +stasis_log_reordering_handle_open(TransactionLog * l, + stasis_log_t* log, + size_t chunk_len, + size_t max_len) { + stasis_log_reordering_handle_t * ret = malloc(sizeof(*ret)); + + ret->l = l; + ret->log = log; + pthread_mutex_init(&ret->mut,0); + pthread_cond_init(&ret->done,0); + pthread_cond_init(&ret->ready,0); + ret->closed = 0; + ret->queue = malloc(sizeof(stasis_log_reordering_op_t)*max_len); + ret->chunk_len = chunk_len; + ret->max_len = max_len; + ret->cur_off = 0; + ret->cur_len = 0; + pthread_create(&ret->worker,0,stasis_log_reordering_handle_worker,ret); + return ret; +} +void stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h, + Page * p, + unsigned int op, + const byte * arg, + size_t arg_size + ) { + while(h->cur_len == h->max_len) { + pthread_cond_wait(&h->done, &h->mut); + } + intptr_t idx = (h->cur_off+h->cur_len)%h->max_len; + h->queue[idx].p = p; + h->queue[idx].op = op; + h->queue[idx].arg = malloc(arg_size); + memcpy(h->queue[idx].arg,arg,arg_size); + h->queue[idx].arg_size = arg_size; + h->cur_len++; + pthread_cond_signal(&h->ready); +} diff --git a/src/stasis/operations/lsnFreeSet.c b/src/stasis/operations/lsnFreeSet.c index 6cfd2b6..8126da4 100644 --- a/src/stasis/operations/lsnFreeSet.c +++ b/src/stasis/operations/lsnFreeSet.c @@ -1,6 +1,6 @@ #include #include - +#include #include static int op_lsn_free_set(const LogEntry *e, Page *p) { if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { abort() ; } @@ -24,7 +24,8 @@ static int op_lsn_free_unset(const LogEntry *e, Page *p) { memcpy(p->memAddr + a[0], b+a[1], a[1]); return 0; } -int TsetLSNFree(int xid, recordid rid, const void * dat) { +int TsetLsnFreeReorderable(int xid, stasis_log_reordering_handle_t * h, + recordid rid, const void * dat) { Page * p = loadPage(xid, rid.page); readlock(p->rwlatch,0); rid = stasis_record_dereference(xid,p,rid); @@ -34,6 +35,7 @@ int TsetLSNFree(int xid, recordid rid, const void * dat) { fflush(stderr); abort(); unlock(p->rwlatch); + return 1; } else { rid.size = stasis_record_type_to_size(rid.size); intptr_t sz = 2 * (sizeof(pageoff_t) + rid.size); @@ -51,12 +53,18 @@ int TsetLSNFree(int xid, recordid rid, const void * dat) { stasis_record_read(xid, p, rid, b+rid.size); unlock(p->rwlatch); - - Tupdate(xid,rid.page,buf,sz,OPERATION_SET_LSN_FREE); + if(!h) { + Tupdate(xid,rid.page,buf,sz,OPERATION_SET_LSN_FREE); + } else { + TreorderableUpdate(xid,h,rid.page,buf,sz,OPERATION_SET_LSN_FREE); + } free(buf); } return 0; } +int TsetLsnFree(int xid, recordid rid, const void * dat) { + return TsetLsnFreeReorderable(xid, 0, rid, dat); +} Operation getSetLsnFree() { Operation o = { diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 3843fd1..ed72a93 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -28,7 +28,7 @@ #include #include -static TransactionLog XactionTable[MAX_TRANSACTIONS]; +TransactionLog XactionTable[MAX_TRANSACTIONS]; static int numActiveXactions = 0; static int xidCount = 0; @@ -312,6 +312,35 @@ static compensated_function void TactionHelper(int xid, unlock(p->rwlatch); } +void TreorderableUpdate(int xid, void * hp, pageid_t page, + const void *dat, size_t datlen, int op) { + stasis_log_reordering_handle_t * h = (typeof(h))hp; + assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid); + Page * p = loadPage(xid, page); + try { + if(globalLockManager.writeLockPage) { + globalLockManager.writeLockPage(xid, p->id); + } + } end; + + pthread_mutex_lock(&h->mut); + + // e = LogUpdate(stasis_log_file, &XactionTable[xid % MAX_TRANSACTIONS], + // p, op, dat, datlen); + stasis_log_reordering_handle_append(h, p, op, dat, datlen); + + LogEntry * e = allocUpdateLogEntry(-1, h->l->xid, op, + p ? p->id : INVALID_PAGE, + dat, datlen); + e->LSN = 0; + writelock(p->rwlatch,0); + doUpdate(e, p); + unlock(p->rwlatch); + pthread_mutex_unlock(&h->mut); + releasePage(p); + freeLogEntry(e); +} + compensated_function void TupdateStr(int xid, pageid_t page, const char *dat, size_t datlen, int op) { Tupdate(xid, page, dat, datlen, op); diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index 7235026..8364f77 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -55,6 +55,20 @@ terms specified in this license. #ifndef __LOGGER2_H__ #define __LOGGER2_H__ +#include + +/** + Contains the state needed by the logging layer to perform + operations on a transaction. + */ +typedef struct TransactionLog { + int xid; + lsn_t prevLSN; + lsn_t recLSN; +} TransactionLog; + +typedef struct stasis_log_t stasis_log_t; + #include /** @@ -64,21 +78,16 @@ terms specified in this license. */ typedef int (guard_fcn_t)(const LogEntry *, void *); -typedef struct stasis_log_t stasis_log_t; typedef enum { LOG_FORCE_COMMIT, LOG_FORCE_WAL } stasis_log_force_mode_t; /** - Contains the state needed by the logging layer to perform - operations on a transaction. - */ -typedef struct { - int xid; - lsn_t prevLSN; - lsn_t recLSN; -} TransactionLog; + XXX TransactionTable should be private to transactional2.c! +*/ +extern TransactionLog XactionTable[MAX_TRANSACTIONS]; + /** This is the log implementation that is being used. diff --git a/stasis/logger/reorderingHandle.h b/stasis/logger/reorderingHandle.h new file mode 100644 index 0000000..ca23824 --- /dev/null +++ b/stasis/logger/reorderingHandle.h @@ -0,0 +1,43 @@ +#ifndef __STASIS_LOG_REORDERING_HANDLE_H +#define __STASIS_LOG_REORDERING_HANDLE_H +#include +#include + +typedef struct { + Page * p; + unsigned int op; + byte * arg; + size_t arg_size; +} stasis_log_reordering_op_t; + +typedef struct stasis_log_reordering_handle_t { + TransactionLog *l; + stasis_log_t * log; + pthread_mutex_t mut; + pthread_cond_t done; + pthread_cond_t ready; + int closed; + pthread_t worker; + stasis_log_reordering_op_t * queue; + size_t chunk_len; + size_t max_len; + size_t cur_off; + size_t cur_len; +} stasis_log_reordering_handle_t; + +#include + +void stasis_log_reordering_handle_flush(stasis_log_reordering_handle_t * h); +void stasis_log_reordering_handle_close(stasis_log_reordering_handle_t * h); +stasis_log_reordering_handle_t * +stasis_log_reordering_handle_open(TransactionLog * l, + stasis_log_t* log, + size_t chunk_len, + size_t max_len); +void stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h, + Page * p, + unsigned int op, + const byte * arg, + size_t arg_size); + +#endif //__STASIS_LOG_REORDERING_HANDLE_H diff --git a/stasis/operations/lsnFreeSet.h b/stasis/operations/lsnFreeSet.h index 1f65687..95baee1 100644 --- a/stasis/operations/lsnFreeSet.h +++ b/stasis/operations/lsnFreeSet.h @@ -1,6 +1,9 @@ #ifndef __LSN_FREE_SET_H #define __LSN_FREE_SET_H +#include Operation getSetLsnFree(); Operation getSetLsnFreeInverse(); -int TsetLSNFree(int xid, recordid rid, const void *dat); +int TsetLsnFree(int xid, recordid rid, const void *dat); +int TsetLsnReorderable(int xid, stasis_log_reordering_handle_t * h, + recordid rid, const void *dat); #endif //__LSN_FREE_SET_H diff --git a/stasis/transactional.h b/stasis/transactional.h index f67cf47..c68f6b2 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -541,7 +541,6 @@ terms specified in this license. #include "common.h" #include "flags.h" - BEGIN_C_DECLS //XXX doesn't belong here. @@ -615,6 +614,9 @@ compensated_function void Tupdate(int xid, pageid_t page, */ compensated_function void TupdateStr(int xid, pageid_t page, const char *dat, size_t datlen, int op); + +void TreorderableUpdate(int xid, void * h, pageid_t page, + const void * dat, size_t datlen, int op); /** * Read the value of a record. * diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 50d6857..0ba4a7b 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -50,6 +50,7 @@ terms specified in this license. #include #define LOG_NAME "check_operations.log" + #include /** @@ -586,7 +587,7 @@ START_TEST(operation_lsn_free) { int foo; Tread(xid[i%2],rid[i], &foo); assert(foo == 42); - TsetLSNFree(xid[i%2], rid[i], &i); + TsetLsnFree(xid[i%2], rid[i], &i); Tread(xid[i%2],rid[i], &foo); assert(foo == i); } @@ -614,6 +615,85 @@ START_TEST(operation_lsn_free) { } END_TEST + +START_TEST(operation_reorderable) { + Tinit(); + recordid rid[100]; + { + int xid = Tbegin(); + pageid_t pid = TpageAlloc(xid); + Page * p = loadPage(xid,pid); + writelock(p->rwlatch,0); + stasis_slotted_lsn_free_initialize_page(p); + // XXX hack! + byte * old = malloc(PAGE_SIZE); + memcpy(old, p->memAddr, PAGE_SIZE); + int fortyTwo = 42; + for(int i = 0; i < 100; i++) { + rid[i] = stasis_record_alloc_begin(xid, p, sizeof(int)); + stasis_record_alloc_done(xid, p, rid[i]); + stasis_record_write(xid, p, -1, rid[i], (const byte*)&fortyTwo); + } + byte * new = malloc(PAGE_SIZE); + memcpy(new, p->memAddr, PAGE_SIZE); + memcpy(p->memAddr, old, PAGE_SIZE); + unlock(p->rwlatch); + releasePage(p); + TpageSet(xid, pid, new); + free(old); + free(new); + Tcommit(xid); + } + { + int xid[2]; + + xid[0] = Tbegin(); + xid[1] = Tbegin(); + + stasis_log_reordering_handle_t * rh + = stasis_log_reordering_handle_open( + &XactionTable[xid[0]% MAX_TRANSACTIONS], + stasis_log_file, + 100, // bytes (far too low!) + 5 // log entries + ); + for(int i = 0; i < 100; i++) { + int foo; + Tread(xid[i%2],rid[i], &foo); + assert(foo == 42); + if(i%2) { + TsetLsnFree(xid[i%2], rid[i], &i); + } else { + TsetLsnFreeReorderable(xid[i%2], rh, rid[i], &i); + } + Tread(xid[i%2],rid[i], &foo); + assert(foo == i); + } + stasis_log_reordering_handle_close(rh); + Tcommit(xid[0]); + Tabort(xid[1]); + } + Tdeinit(); + + Tinit(); + { + int xid = Tbegin(); + + for(int i = 0; i < 100; i++) { + int foo; + Tread(xid, rid[i], &foo); + if(i%2) { + assert(foo == 42); + } else { + assert(foo == i); + } + } + Tcommit(xid); + } + Tdeinit(); + +} END_TEST + /** Add suite declarations here */ @@ -634,7 +714,8 @@ Suite * check_suite(void) { tcase_add_test(tc, operation_alloc_test); tcase_add_test(tc, operation_array_list); tcase_add_test(tc, operation_lsn_free); - /* --------------------------------------------- */ + tcase_add_test(tc, operation_reorderable); + /* --------------------------------------------- */ tcase_add_checked_fixture(tc, setup, teardown); suite_add_tcase(s, tc);