From 377788a13e0b9c86e9f59c448588892c8126b4fc Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Mon, 13 Jun 2011 06:58:34 +0000 Subject: [PATCH] move a bunch of multiplexer and lsm related stuff into experimental/ directory --- CMakeLists.txt | 6 + benchmarks/CMakeLists.txt | 13 +- benchmarks/rose.cpp | 6 +- benchmarks/roseTable.h | 3 +- benchmarks/roseTableTpcCH.h | 3 +- benchmarks/transitiveClosure.c | 6 +- src/stasis/CMakeLists.txt | 22 +- src/stasis/bufferManager.c | 1 + .../{ => experimental}/arrayCollection.c | 15 +- src/stasis/{ => experimental}/consumer.c | 4 +- src/stasis/{ => experimental}/fifo.c | 16 +- src/stasis/{ => experimental}/graph.c | 2 +- .../logStructured.c => experimental/group.c} | 1 + src/stasis/experimental/lockManagerImpl.c | 385 +++++++++++++++++ .../{logger => experimental}/logMemory.c | 14 +- .../{operations => experimental}/lsmTree.c | 2 +- src/stasis/{ => experimental}/multiplexer.c | 4 +- src/stasis/{ => experimental}/ringbuffer.c | 2 +- src/stasis/iterator.c | 36 +- src/stasis/lockManager.c | 393 +----------------- src/stasis/operations/linearHashNTA.c | 10 +- src/stasis/page.c | 1 - src/stasis/page/latchFree/lfSlotted.c | 8 +- src/stasis/transactionTable.c | 2 +- src/stasis/transactional2.c | 2 - stasis/{ => experimental}/arrayCollection.h | 2 + stasis/{ => experimental}/consumer.h | 0 stasis/{ => experimental}/fifo.h | 5 +- stasis/{ => experimental}/graph.h | 2 +- stasis/{operations => experimental}/group.h | 0 stasis/{logger => experimental}/logMemory.h | 8 +- .../lsmIterators.h | 12 +- .../{operations => experimental}/lsmTable.h | 0 stasis/{operations => experimental}/lsmTree.h | 7 + .../{operations => experimental}/lsmWorkers.h | 0 stasis/{ => experimental}/multiplexer.h | 2 +- stasis/{ => experimental}/ringbuffer.h | 0 stasis/operations.h | 2 - stasis/page/fixed.h | 2 - test/stasis/CMakeLists.txt | 10 +- test/stasis/check_groupBy.c | 5 +- test/stasis/check_iterator.c | 3 +- test/stasis/check_lsmTree.c | 3 + test/stasis/check_multiplexer.c | 11 +- test/stasis/check_ringbuffer.c | 2 +- 45 files changed, 548 insertions(+), 485 deletions(-) rename src/stasis/{ => experimental}/arrayCollection.c (84%) rename src/stasis/{ => experimental}/consumer.c (91%) rename src/stasis/{ => experimental}/fifo.c (94%) rename src/stasis/{ => experimental}/graph.c (98%) rename src/stasis/{operations/group/logStructured.c => experimental/group.c} (99%) create mode 100644 src/stasis/experimental/lockManagerImpl.c rename src/stasis/{logger => experimental}/logMemory.c (96%) rename src/stasis/{operations => experimental}/lsmTree.c (99%) rename src/stasis/{ => experimental}/multiplexer.c (98%) rename src/stasis/{ => experimental}/ringbuffer.c (99%) rename stasis/{ => experimental}/arrayCollection.h (90%) rename stasis/{ => experimental}/consumer.h (100%) rename stasis/{ => experimental}/fifo.h (95%) rename stasis/{ => experimental}/graph.h (94%) rename stasis/{operations => experimental}/group.h (100%) rename stasis/{logger => experimental}/logMemory.h (85%) rename stasis/{operations => experimental}/lsmIterators.h (98%) rename stasis/{operations => experimental}/lsmTable.h (100%) rename stasis/{operations => experimental}/lsmTree.h (98%) rename stasis/{operations => experimental}/lsmWorkers.h (100%) rename stasis/{ => experimental}/multiplexer.h (98%) rename stasis/{ => experimental}/ringbuffer.h (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8556cdc..1ec5796 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -122,6 +122,12 @@ MACRO(CREATE_CHECK NAME) ADD_TEST(${NAME} nice ./${NAME}) ENDMACRO(CREATE_CHECK) +MACRO(CREATE_EXPERIMENTAL_CHECK NAME) + ADD_EXECUTABLE(${NAME} ${NAME}.c) + TARGET_LINK_LIBRARIES(${NAME} stasis_experimental ${COMMON_LIBRARIES}) + ADD_TEST(${NAME} nice ./${NAME}) +ENDMACRO(CREATE_EXPERIMENTAL_CHECK) + MACRO(CREATE_CHECK_OPT NAME OPT) ADD_EXECUTABLE(${NAME} ${NAME}.c) TARGET_LINK_LIBRARIES(${NAME} ${COMMON_LIBRARIES}) diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 6076da9..c500eed 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -2,16 +2,20 @@ SUBDIRS(multicore) IF(HAVE_GETLINE) ADD_EXECUTABLE(rose rose.cpp) -TARGET_LINK_LIBRARIES(rose ${COMMON_LIBRARIES}) +TARGET_LINK_LIBRARIES(rose stasis_experimental ${COMMON_LIBRARIES}) ADD_EXECUTABLE(roseTable roseTable.cpp) -TARGET_LINK_LIBRARIES(roseTable ${COMMON_LIBRARIES}) +TARGET_LINK_LIBRARIES(roseTable stasis_experimental ${COMMON_LIBRARIES}) ADD_EXECUTABLE(roseTableTpcCH-workload1 roseTableTpcCH-workload1.cpp) -TARGET_LINK_LIBRARIES(roseTableTpcCH-workload1 ${COMMON_LIBRARIES}) +TARGET_LINK_LIBRARIES(roseTableTpcCH-workload1 stasis_experimental ${COMMON_LIBRARIES}) ADD_EXECUTABLE(roseTableTpcCH-workload2 roseTableTpcCH-workload2.cpp) -TARGET_LINK_LIBRARIES(roseTableTpcCH-workload2 ${COMMON_LIBRARIES}) +TARGET_LINK_LIBRARIES(roseTableTpcCH-workload2 stasis_experimental ${COMMON_LIBRARIES}) + +ADD_EXECUTABLE(transitiveClosure transitiveClosure.c) +TARGET_LINK_LIBRARIES(transitiveClosure stasis_experimental ${COMMON_LIBRARIES}) + ENDIF(HAVE_GETLINE) @@ -27,7 +31,6 @@ CREATE_EXECUTABLE(pageOrientedListNTA) CREATE_EXECUTABLE(linearHashNTAThreaded) CREATE_EXECUTABLE(linearHashNTAMultiReader) CREATE_EXECUTABLE(linearHashNTAWriteRequests) -CREATE_EXECUTABLE(transitiveClosure) CREATE_EXECUTABLE(zeroCopy) CREATE_EXECUTABLE(sequentialThroughput) CREATE_EXECUTABLE(qos) diff --git a/benchmarks/rose.cpp b/benchmarks/rose.cpp index 1333635..272521f 100644 --- a/benchmarks/rose.cpp +++ b/benchmarks/rose.cpp @@ -10,15 +10,16 @@ //typedef int32_t val_t; // rose executable's input contains 32 bit values typedef int64_t val_t; // rose executable's input contains 64 bit values +#include "stasis/experimental/lsmTree.h" #include "stasis/page/compression/for-impl.h" #include "stasis/page/compression/pstar-impl.h" #include "stasis/page/compression/rle-impl.h" #include "stasis/page/compression/multicolumn-impl.h" #include "stasis/page/compression/tuple.h" -#include "stasis/operations/lsmIterators.h" +#include "stasis/experimental/lsmIterators.h" -#include "stasis/operations/lsmWorkers.h" +#include "stasis/experimental/lsmWorkers.h" #undef end #undef begin @@ -355,6 +356,7 @@ void run_test(unsigned int inserts, column_number_t column_count, int num_pages = 0; + stasis_page_impl_register(lsmRootImpl()); Tinit(); recordid tree = NULLRID; diff --git a/benchmarks/roseTable.h b/benchmarks/roseTable.h index 0bf472c..d0b7fce 100644 --- a/benchmarks/roseTable.h +++ b/benchmarks/roseTable.h @@ -1,7 +1,7 @@ #include #include #include -#include "stasis/operations/lsmTable.h" +#include "stasis/experimental/lsmTable.h" #include "stasis/transactional.h" @@ -89,6 +89,7 @@ namespace rose { // bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; + stasis_page_impl_register(lsmRootImpl()); Tinit(); int xid = Tbegin(); diff --git a/benchmarks/roseTableTpcCH.h b/benchmarks/roseTableTpcCH.h index 48b0edd..242e779 100644 --- a/benchmarks/roseTableTpcCH.h +++ b/benchmarks/roseTableTpcCH.h @@ -1,7 +1,7 @@ #include #include #include -#include "stasis/operations/lsmTable.h" +#include "stasis/experimental/lsmTable.h" #include "stasis/transactional.h" @@ -50,6 +50,7 @@ namespace rose { // bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; + stasis_page_impl_register(lsmRootImpl()); Tinit(); int xid = Tbegin(); diff --git a/benchmarks/transitiveClosure.c b/benchmarks/transitiveClosure.c index 3aa1551..504af00 100644 --- a/benchmarks/transitiveClosure.c +++ b/benchmarks/transitiveClosure.c @@ -1,7 +1,7 @@ #include -#include -#include -#include +#include +#include +#include #include #include #include diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index 2308057..c59392d 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -8,16 +8,15 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c concurrentHash.c rw.c doubleLink page.c bufferPool.c blobManager.c recovery2.c truncation.c transactional2.c dirtyPageTable.c transactionTable.c - allocationPolicy.c lockManager.c iterator.c - consumer.c arrayCollection.c ringbuffer.c fifo.c - multiplexer.c graph.c logger/logEntry.c + allocationPolicy.c iterator.c + logger/logEntry.c logger/safeWrites.c logger/logWriterUtils.c logger/filePool.c logger/inMemoryLog.c logger/logHandle.c logger/logger2.c - logger/logMemory.c logger/reorderingHandle.c logger/groupForce.c + lockManager.c page/uninitialized.c page/slotted.c page/fixed.c @@ -32,13 +31,12 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c concurrentHash.c rw.c doubleLink operations/arrayList.c operations/lsnFreeSet.c operations/segmentFile.c - operations/group/logStructured.c hash.c operations/naiveLinearHash.c operations/linearHashNTA.c operations/linkedListNTA.c operations/pageOrientedListNTA.c - operations/regions.c operations/lsmTree.c + operations/regions.c operations/bTree.c io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/raid1.c io/non_blocking.c io/debug.c io/handle.c @@ -48,5 +46,17 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c concurrentHash.c rw.c doubleLink util/log2.c util/histogram.c util/hashFunctions.c util/min.c util/ringbuffer.c util/multiset.c util/slab.c stlredblack.cpp util/stlslab.cpp) +ADD_LIBRARY(stasis_experimental + experimental/consumer.c + experimental/fifo.c + experimental/graph.c + experimental/logMemory.c + experimental/multiplexer.c + experimental/ringbuffer.c + experimental/arrayCollection.c + experimental/lockManagerImpl.c + experimental/group.c + experimental/lsmTree.c + ) INSTALL(TARGETS stasis LIBRARY DESTINATION lib) diff --git a/src/stasis/bufferManager.c b/src/stasis/bufferManager.c index 6e13ea3..0d80c26 100644 --- a/src/stasis/bufferManager.c +++ b/src/stasis/bufferManager.c @@ -53,6 +53,7 @@ terms specified in this license. #endif #include +#include #include diff --git a/src/stasis/arrayCollection.c b/src/stasis/experimental/arrayCollection.c similarity index 84% rename from src/stasis/arrayCollection.c rename to src/stasis/experimental/arrayCollection.c index 1301b79..4660002 100644 --- a/src/stasis/arrayCollection.c +++ b/src/stasis/experimental/arrayCollection.c @@ -61,7 +61,16 @@ typedef struct { int elementCount; } pointer_array_iterator_t; +static void noopTupDone(int xid, void * foo) { } -/*lladdIterator_t * ptrArray_iterator(byte * array, int elementCount) { - -}*/ +void stasis_arrayCollection_init() { + lladdIterator_def_t array_def = { + arrayIterator_close, + arrayIterator_next, + arrayIterator_next, + arrayIterator_key, + arrayIterator_value, + noopTupDone, + }; + lladdIterator_register(ARRAY_ITERATOR, array_def); +} diff --git a/src/stasis/consumer.c b/src/stasis/experimental/consumer.c similarity index 91% rename from src/stasis/consumer.c rename to src/stasis/experimental/consumer.c index ad68033..7eef443 100644 --- a/src/stasis/consumer.c +++ b/src/stasis/experimental/consumer.c @@ -1,6 +1,6 @@ -#include +#include #include -#include +#include static lladdConsumer_def_t consumers[MAX_CONSUMER_TYPES]; diff --git a/src/stasis/fifo.c b/src/stasis/experimental/fifo.c similarity index 94% rename from src/stasis/fifo.c rename to src/stasis/experimental/fifo.c index 8c74622..e10b064 100644 --- a/src/stasis/fifo.c +++ b/src/stasis/experimental/fifo.c @@ -1,6 +1,6 @@ -#include +#include #include -#include +#include #include #include @@ -240,3 +240,15 @@ lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerC return pool; } + +void stasis_fifo_init() { + lladdIterator_def_t pointer_def = { + lladdFifoPool_iterator_close, + lladdFifoPool_iterator_next, + lladdFifoPool_iterator_tryNext, + lladdFifoPool_iterator_key, + lladdFifoPool_iterator_value, + lladdFifoPool_iterator_tupleDone, + }; + lladdIterator_register(POINTER_ITERATOR, pointer_def); +} diff --git a/src/stasis/graph.c b/src/stasis/experimental/graph.c similarity index 98% rename from src/stasis/graph.c rename to src/stasis/experimental/graph.c index 0da5aa9..6fd9aaf 100644 --- a/src/stasis/graph.c +++ b/src/stasis/experimental/graph.c @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include #include diff --git a/src/stasis/operations/group/logStructured.c b/src/stasis/experimental/group.c similarity index 99% rename from src/stasis/operations/group/logStructured.c rename to src/stasis/experimental/group.c index 7497968..81e8a28 100644 --- a/src/stasis/operations/group/logStructured.c +++ b/src/stasis/experimental/group.c @@ -6,6 +6,7 @@ */ #include #include +#include #include typedef struct { struct LH_ENTRY(table) * table; diff --git a/src/stasis/experimental/lockManagerImpl.c b/src/stasis/experimental/lockManagerImpl.c new file mode 100644 index 0000000..c24349d --- /dev/null +++ b/src/stasis/experimental/lockManagerImpl.c @@ -0,0 +1,385 @@ +#include +#include +#include +#include + +#include +#include + +#include +#include + +static pthread_mutex_t stasis_lock_manager_ht_mut = PTHREAD_MUTEX_INITIALIZER; + +static int pblHtInsert_r(pblHashTable_t * h, void * key, size_t keylen, void * val) { + pthread_mutex_lock(&stasis_lock_manager_ht_mut); + int ret = pblHtInsert(h, key, keylen, val); + pthread_mutex_unlock(&stasis_lock_manager_ht_mut); + return ret; +} +static void * pblHtLookup_r(pblHashTable_t * h, void * key, size_t keylen) { + pthread_mutex_lock(&stasis_lock_manager_ht_mut); + void * ret = pblHtLookup(h, key, keylen); + pthread_mutex_unlock(&stasis_lock_manager_ht_mut); + return ret; +} +static int pblHtRemove_r(pblHashTable_t * h, void * key, size_t keylen) { + pthread_mutex_lock(&stasis_lock_manager_ht_mut); + int ret = pblHtRemove(h, key, keylen); + pthread_mutex_unlock(&stasis_lock_manager_ht_mut); + return ret; +} +static void * pblHtFirst_r(pblHashTable_t *h) { + pthread_mutex_lock(&stasis_lock_manager_ht_mut); + void * ret = pblHtFirst(h); + pthread_mutex_unlock(&stasis_lock_manager_ht_mut); + return ret; +} +static void * pblHtNext_r(pblHashTable_t *h) { + pthread_mutex_lock(&stasis_lock_manager_ht_mut); + void * ret = pblHtNext(h); + pthread_mutex_unlock(&stasis_lock_manager_ht_mut); + return ret; +} +static void * pblHtCurrentKey_r(pblHashTable_t *h) { + pthread_mutex_lock(&stasis_lock_manager_ht_mut); + void * ret = pblHtCurrentKey(h); + pthread_mutex_unlock(&stasis_lock_manager_ht_mut); + return ret; +} + +#define MUTEX_COUNT 32 +// These next two correspond to MUTEX count, and are the appropriate values to pass into hash(). +#define MUTEX_BITS 5 +#define MUTEX_EXT 32 + +static pthread_mutex_t mutexes[MUTEX_COUNT]; + +static pthread_mutex_t xid_table_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t * getMutex(byte * dat, int datLen) { + return &mutexes[stasis_linear_hash(dat, datLen, MUTEX_BITS, MUTEX_EXT)]; +} + +static pblHashTable_t * xidLockTable; +static pblHashTable_t * ridLockTable; + +typedef struct { + pthread_cond_t writeOK; + pthread_cond_t readOK; + int readers; + int writers; + int waiting; + int active; +} lock; + +void lockManagerInitHashed() { + int i = 0; + for(i = 0; i < MUTEX_COUNT; i++) { + pthread_mutex_init(&mutexes[i], NULL); + } + xidLockTable = pblHtCreate(); + ridLockTable = pblHtCreate(); + +} +pblHashTable_t * lockManagerBeginTransactionUnlocked(int xid) { + pblHashTable_t * xidLocks = pblHtCreate(); + pblHtInsert_r(xidLockTable, &xid, sizeof(int), xidLocks); + return xidLocks; +} +int lockManagerBeginTransaction(int xid) { + pthread_mutex_lock(&xid_table_mutex); + lockManagerBeginTransactionUnlocked(xid); + pthread_mutex_unlock(&xid_table_mutex); + return 0; +} + +lock* createLock(byte * dat, int datLen) { + lock * ret = malloc(sizeof(lock)); + + if(!ret) { return NULL; } + + // pthread_mutex_init(&ret->mut, NULL); + pthread_cond_init(&ret->writeOK, NULL); + pthread_cond_init(&ret->readOK, NULL); + ret->active = 0; + ret->readers = 0; + ret->writers = 0; + ret->waiting = 0; + + pblHtInsert_r(ridLockTable, dat, datLen, ret); + return ret; +} + +void destroyLock(byte * dat, int datLen, lock * l) { + pthread_cond_destroy(&l->writeOK); + pthread_cond_destroy(&l->readOK); + free (l); + pblHtRemove_r(ridLockTable, dat, datLen); +} + +#define LM_READLOCK 1 +#define LM_WRITELOCK 2 + +int lockManagerReadLockHashed(int xid, byte * dat, int datLen) { + if(xid == -1) { return 0; } + pthread_mutex_lock(&xid_table_mutex); + pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int)); + if(!xidLocks) { + xidLocks = lockManagerBeginTransactionUnlocked(xid); + } + long currentLockLevel = (long)pblHtLookup_r(xidLocks, dat, datLen); + // printf("xid %d read lock (%d)\n", xid, currentLockLevel); + if(currentLockLevel >= LM_READLOCK) { + pthread_mutex_unlock(&xid_table_mutex); + return 0; + } + assert(!currentLockLevel); + pthread_mutex_unlock(&xid_table_mutex); + pthread_mutex_t * mut = getMutex(dat, datLen); + + pthread_mutex_lock(mut); + lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen); + + if(!ridLock) { + ridLock = createLock(dat, datLen); + } + ridLock->active++; + + if(ridLock->writers || ridLock->waiting) { + struct timeval tv; + int tod_ret = gettimeofday (&tv, NULL); + tv.tv_sec++; // Wait up to one second to obtain a lock before detecting deadlock. + struct timespec ts; + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000; + if(tod_ret != 0) { + perror("Could not get time of day"); + return LLADD_INTERNAL_ERROR; + } + do { + int wait_ret = pthread_cond_timedwait(&ridLock->readOK, mut, &ts); + if(wait_ret == ETIMEDOUT) { + ridLock->active--; + pthread_mutex_unlock(mut); + return LLADD_DEADLOCK; + } + } while(ridLock->writers); + } + if(currentLockLevel < LM_READLOCK) { + ridLock->readers++; + pblHtRemove_r(xidLocks, dat, datLen); + pblHtInsert_r(xidLocks, dat, datLen, (void*)LM_READLOCK); + } + ridLock->active--; + pthread_mutex_unlock(mut); + return 0; +} +int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) { + + if(xid == -1) { return 0; } + pthread_mutex_lock(&xid_table_mutex); + pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int)); + + if(!xidLocks) { + xidLocks = lockManagerBeginTransactionUnlocked(xid); + } + + long currentLockLevel = (long)pblHtLookup_r(xidLocks, dat, datLen); + + // printf("xid %d write lock (%d)\n", xid, currentLockLevel); + + int me = 0; + pthread_mutex_unlock(&xid_table_mutex); + + if(currentLockLevel >= LM_WRITELOCK) { + return 0; + } else if(currentLockLevel == LM_READLOCK) { + me = 1; + } + + pthread_mutex_t * mut = getMutex(dat, datLen); + + pthread_mutex_lock(mut); + lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen); + if(!ridLock) { + ridLock = createLock(dat, datLen); + } + + ridLock->active++; + ridLock->waiting++; + if(ridLock->writers || (ridLock->readers - me)) { + struct timeval tv; + int tod_ret = gettimeofday(&tv, NULL); + tv.tv_sec++; + struct timespec ts; + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000; + if(tod_ret != 0) { + perror("Could not get time of day"); + return LLADD_INTERNAL_ERROR; + } + while(ridLock->writers || (ridLock->readers - me)) { + int lockret = pthread_cond_timedwait(&ridLock->writeOK, mut, &ts); + if(lockret == ETIMEDOUT) { + ridLock->waiting--; + ridLock->active--; + pthread_mutex_unlock(mut); + return LLADD_DEADLOCK; + } + } + } + ridLock->waiting--; + if(currentLockLevel == 0) { + ridLock->readers++; + ridLock->writers++; + } else if (currentLockLevel == LM_READLOCK) { + ridLock->writers++; + pblHtRemove_r(xidLocks, dat, datLen); + } + if(currentLockLevel != LM_WRITELOCK) { + pblHtInsert_r(xidLocks, dat, datLen, (void*)LM_WRITELOCK); + } + + ridLock->active--; + pthread_mutex_unlock(mut); + return 0; +} + +static int decrementLock(void * dat, int datLen, int currentLevel) { + // pthread_mutex_unlock(&xid_table_mutex); + pthread_mutex_t * mut = getMutex(dat, datLen); + pthread_mutex_lock(mut); + lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen); + assert(ridLock); + ridLock->active++; + if(currentLevel == LM_WRITELOCK) { + ridLock->writers--; + ridLock->readers--; + } else if(currentLevel == LM_READLOCK) { + ridLock->readers--; + } else if(currentLevel == 0) { + assert(0); // Someone tried to release a lock they didn't own! + } else { + fprintf(stderr, "Unknown lock type encountered!"); + ridLock->active--; + pthread_mutex_unlock(mut); + return LLADD_INTERNAL_ERROR; + } + + ridLock->active--; + + if(!(ridLock->active || ridLock->waiting || ridLock->readers || ridLock->writers)) { + // printf("destroyed lock"); + destroyLock(dat, datLen, ridLock); + } else { + // printf("(%d %d %d %d)", ridLock->active, ridLock->waiting, ridLock->readers, ridLock->writers); + } + pthread_mutex_unlock(mut); + return 0; +} + +int lockManagerUnlockHashed(int xid, byte * dat, int datLen) { + + + if(xid == -1) { return 0; } + // printf("xid %d unlock\n", xid); + + pthread_mutex_lock(&xid_table_mutex); + + pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int)); + + if(!xidLocks) { + xidLocks = lockManagerBeginTransactionUnlocked(xid); + } + + pthread_mutex_unlock(&xid_table_mutex); + + long currentLevel = (long)pblHtLookup_r(xidLocks, dat, datLen); + + assert(currentLevel); + pblHtRemove_r(xidLocks, dat, datLen); + decrementLock(dat, datLen, currentLevel); + + return 0; +} + +int lockManagerCommitHashed(int xid, int datLen) { + if(xid == -1) { return 0; } + pthread_mutex_lock(&xid_table_mutex); + + pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int)); + pblHtRemove_r(xidLockTable, &xid, sizeof(int)); + if(!xidLocks) { + xidLocks = lockManagerBeginTransactionUnlocked(xid); + } + + pthread_mutex_unlock(&xid_table_mutex); + long currentLevel; + int ret = 0; + for(currentLevel = (long)pblHtFirst_r(xidLocks); currentLevel; currentLevel = (long)pblHtNext_r(xidLocks)) { + void * currentKey = pblHtCurrentKey_r(xidLocks); + int tmpret = decrementLock(currentKey, datLen, currentLevel); + // Pass any error(s) up to the user. + // (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR) + if(tmpret) { + ret = tmpret; + } + } + pblHtDelete(xidLocks); + return ret; +} + +int lockManagerReadLockRecord(int xid, recordid rid) { + return lockManagerReadLockHashed(xid, (byte*)&rid, sizeof(recordid)); +} +int lockManagerWriteLockRecord(int xid, recordid rid) { + return lockManagerWriteLockHashed(xid, (byte*)&rid, sizeof(recordid)); +} +int lockManagerUnlockRecord(int xid, recordid rid) { + return lockManagerUnlockHashed(xid, (byte*)&rid, sizeof(recordid)); +} +int lockManagerCommitRecords(int xid) { + return lockManagerCommitHashed(xid, sizeof(recordid)); +} + +int lockManagerReadLockPage(int xid, pageid_t p) { + return lockManagerReadLockHashed(xid, (byte*)&p, sizeof(p)); +} +int lockManagerWriteLockPage(int xid, pageid_t p) { + return lockManagerWriteLockHashed(xid, (byte*)&p, sizeof(p)); +} +int lockManagerUnlockPage(int xid, pageid_t p) { + return lockManagerUnlockHashed(xid, (byte*)&p, sizeof(p)); +} +int lockManagerCommitPages(int xid) { + return lockManagerCommitHashed(xid, sizeof(pageid_t)); +} + +void setupLockManagerCallbacksPage() { + globalLockManager.init = &lockManagerInitHashed; + globalLockManager.readLockPage = &lockManagerReadLockPage; + globalLockManager.writeLockPage = &lockManagerWriteLockPage; + globalLockManager.unlockPage = &lockManagerUnlockPage; + globalLockManager.readLockRecord = NULL; + globalLockManager.writeLockRecord = NULL; + globalLockManager.unlockRecord = NULL; + globalLockManager.commit = &lockManagerCommitPages; + globalLockManager.abort = &lockManagerCommitPages; + globalLockManager.begin = &lockManagerBeginTransaction; + + globalLockManager.init(); +} + +void setupLockManagerCallbacksRecord () { + globalLockManager.init = &lockManagerInitHashed; + globalLockManager.readLockPage = NULL; + globalLockManager.writeLockPage = NULL; + globalLockManager.unlockPage = NULL; + globalLockManager.readLockRecord = &lockManagerReadLockRecord; + globalLockManager.writeLockRecord = &lockManagerWriteLockRecord; + globalLockManager.unlockRecord = &lockManagerUnlockRecord; + globalLockManager.commit = &lockManagerCommitRecords; + globalLockManager.abort = &lockManagerCommitRecords; + globalLockManager.begin = &lockManagerBeginTransaction; + globalLockManager.init(); +} diff --git a/src/stasis/logger/logMemory.c b/src/stasis/experimental/logMemory.c similarity index 96% rename from src/stasis/logger/logMemory.c rename to src/stasis/experimental/logMemory.c index 416dd8d..8b297df 100644 --- a/src/stasis/logger/logMemory.c +++ b/src/stasis/experimental/logMemory.c @@ -2,7 +2,7 @@ NOTE: Person who's using the consumer interface calls close first, (for now). */ -#include +#include #include typedef struct { @@ -17,8 +17,16 @@ typedef struct { lsn_t eof; } logMemory_fifo_t; -void logMemory_init() { - /* NO-OP */ +void stasis_logMemory_init() { + lladdIterator_def_t logMemory_def = { + logMemory_Iterator_close, + logMemory_Iterator_next, + logMemory_Iterator_tryNext, + logMemory_Iterator_key, + logMemory_Iterator_value, + logMemory_Iterator_releaseTuple, + }; + lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def); } lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) { diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/experimental/lsmTree.c similarity index 99% rename from src/stasis/operations/lsmTree.c rename to src/stasis/experimental/lsmTree.c index 0146699..89b8b0d 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/experimental/lsmTree.c @@ -1,4 +1,4 @@ -#include +#include #include #include #include diff --git a/src/stasis/multiplexer.c b/src/stasis/experimental/multiplexer.c similarity index 98% rename from src/stasis/multiplexer.c rename to src/stasis/experimental/multiplexer.c index b81a3b2..3b523fe 100644 --- a/src/stasis/multiplexer.c +++ b/src/stasis/experimental/multiplexer.c @@ -1,8 +1,8 @@ -#include +#include #include #include -#include +#include lladdMultiplexer_t * lladdMultiplexer_alloc( int xid, lladdIterator_t * it, diff --git a/src/stasis/ringbuffer.c b/src/stasis/experimental/ringbuffer.c similarity index 99% rename from src/stasis/ringbuffer.c rename to src/stasis/experimental/ringbuffer.c index 67b6d0a..79c21a5 100644 --- a/src/stasis/ringbuffer.c +++ b/src/stasis/experimental/ringbuffer.c @@ -1,5 +1,5 @@ #include -#include +#include //#define TRACK_OFFSETS diff --git a/src/stasis/iterator.c b/src/stasis/iterator.c index 5c1d95e..411c5f2 100644 --- a/src/stasis/iterator.c +++ b/src/stasis/iterator.c @@ -1,8 +1,7 @@ #include #include -#include -#include -#include +//#include +#include #include @@ -12,38 +11,9 @@ void lladdIterator_register(int type, lladdIterator_def_t info) { assert(type < MAX_ITERATOR_TYPES); iterators[type] = info; } -static void noopTupDone(int xid, void * foo) { } void iterator_init() { - lladdIterator_def_t array_def = { - arrayIterator_close, - arrayIterator_next, - arrayIterator_next, - arrayIterator_key, - arrayIterator_value, - noopTupDone, - }; - lladdIterator_register(ARRAY_ITERATOR, array_def); - lladdIterator_def_t logMemory_def = { - logMemory_Iterator_close, - logMemory_Iterator_next, - logMemory_Iterator_tryNext, - logMemory_Iterator_key, - logMemory_Iterator_value, - logMemory_Iterator_releaseTuple, - }; - lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def); - lladdIterator_def_t pointer_def = { - lladdFifoPool_iterator_close, - lladdFifoPool_iterator_next, - lladdFifoPool_iterator_tryNext, - lladdFifoPool_iterator_key, - lladdFifoPool_iterator_value, - lladdFifoPool_iterator_tupleDone, - }; - lladdIterator_register(POINTER_ITERATOR, pointer_def); - - stasis_log_structured_group_init(); + /* no-op */ } diff --git a/src/stasis/lockManager.c b/src/stasis/lockManager.c index d568661..9f720b3 100644 --- a/src/stasis/lockManager.c +++ b/src/stasis/lockManager.c @@ -1,392 +1,14 @@ -#include +/* + * lockManager.c + * + * Created on: Jun 12, 2011 + * Author: sears + */ + #include -#include -#include - -#include -#include - -#include -#include - -static pthread_mutex_t stasis_lock_manager_ht_mut = PTHREAD_MUTEX_INITIALIZER; - -static int pblHtInsert_r(pblHashTable_t * h, void * key, size_t keylen, void * val) { - pthread_mutex_lock(&stasis_lock_manager_ht_mut); - int ret = pblHtInsert(h, key, keylen, val); - pthread_mutex_unlock(&stasis_lock_manager_ht_mut); - return ret; -} -static void * pblHtLookup_r(pblHashTable_t * h, void * key, size_t keylen) { - pthread_mutex_lock(&stasis_lock_manager_ht_mut); - void * ret = pblHtLookup(h, key, keylen); - pthread_mutex_unlock(&stasis_lock_manager_ht_mut); - return ret; -} -static int pblHtRemove_r(pblHashTable_t * h, void * key, size_t keylen) { - pthread_mutex_lock(&stasis_lock_manager_ht_mut); - int ret = pblHtRemove(h, key, keylen); - pthread_mutex_unlock(&stasis_lock_manager_ht_mut); - return ret; -} -static void * pblHtFirst_r(pblHashTable_t *h) { - pthread_mutex_lock(&stasis_lock_manager_ht_mut); - void * ret = pblHtFirst(h); - pthread_mutex_unlock(&stasis_lock_manager_ht_mut); - return ret; -} -static void * pblHtNext_r(pblHashTable_t *h) { - pthread_mutex_lock(&stasis_lock_manager_ht_mut); - void * ret = pblHtNext(h); - pthread_mutex_unlock(&stasis_lock_manager_ht_mut); - return ret; -} -static void * pblHtCurrentKey_r(pblHashTable_t *h) { - pthread_mutex_lock(&stasis_lock_manager_ht_mut); - void * ret = pblHtCurrentKey(h); - pthread_mutex_unlock(&stasis_lock_manager_ht_mut); - return ret; -} - -#define MUTEX_COUNT 32 -// These next two correspond to MUTEX count, and are the appropriate values to pass into hash(). -#define MUTEX_BITS 5 -#define MUTEX_EXT 32 - -static pthread_mutex_t mutexes[MUTEX_COUNT]; - -static pthread_mutex_t xid_table_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t * getMutex(byte * dat, int datLen) { - return &mutexes[stasis_linear_hash(dat, datLen, MUTEX_BITS, MUTEX_EXT)]; -} - -static pblHashTable_t * xidLockTable; -static pblHashTable_t * ridLockTable; - -typedef struct { - pthread_cond_t writeOK; - pthread_cond_t readOK; - int readers; - int writers; - int waiting; - int active; -} lock; - -void lockManagerInitHashed() { - int i = 0; - for(i = 0; i < MUTEX_COUNT; i++) { - pthread_mutex_init(&mutexes[i], NULL); - } - xidLockTable = pblHtCreate(); - ridLockTable = pblHtCreate(); - -} -pblHashTable_t * lockManagerBeginTransactionUnlocked(int xid) { - pblHashTable_t * xidLocks = pblHtCreate(); - pblHtInsert_r(xidLockTable, &xid, sizeof(int), xidLocks); - return xidLocks; -} -int lockManagerBeginTransaction(int xid) { - pthread_mutex_lock(&xid_table_mutex); - lockManagerBeginTransactionUnlocked(xid); - pthread_mutex_unlock(&xid_table_mutex); - return 0; -} - -lock* createLock(byte * dat, int datLen) { - lock * ret = malloc(sizeof(lock)); - - if(!ret) { return NULL; } - - // pthread_mutex_init(&ret->mut, NULL); - pthread_cond_init(&ret->writeOK, NULL); - pthread_cond_init(&ret->readOK, NULL); - ret->active = 0; - ret->readers = 0; - ret->writers = 0; - ret->waiting = 0; - - pblHtInsert_r(ridLockTable, dat, datLen, ret); - return ret; -} - -void destroyLock(byte * dat, int datLen, lock * l) { - pthread_cond_destroy(&l->writeOK); - pthread_cond_destroy(&l->readOK); - free (l); - pblHtRemove_r(ridLockTable, dat, datLen); -} - -#define LM_READLOCK 1 -#define LM_WRITELOCK 2 - -int lockManagerReadLockHashed(int xid, byte * dat, int datLen) { - if(xid == -1) { return 0; } - pthread_mutex_lock(&xid_table_mutex); - pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int)); - if(!xidLocks) { - xidLocks = lockManagerBeginTransactionUnlocked(xid); - } - long currentLockLevel = (long)pblHtLookup_r(xidLocks, dat, datLen); - // printf("xid %d read lock (%d)\n", xid, currentLockLevel); - if(currentLockLevel >= LM_READLOCK) { - pthread_mutex_unlock(&xid_table_mutex); - return 0; - } - assert(!currentLockLevel); - pthread_mutex_unlock(&xid_table_mutex); - pthread_mutex_t * mut = getMutex(dat, datLen); - - pthread_mutex_lock(mut); - lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen); - - if(!ridLock) { - ridLock = createLock(dat, datLen); - } - ridLock->active++; - - if(ridLock->writers || ridLock->waiting) { - struct timeval tv; - int tod_ret = gettimeofday (&tv, NULL); - tv.tv_sec++; // Wait up to one second to obtain a lock before detecting deadlock. - struct timespec ts; - ts.tv_sec = tv.tv_sec; - ts.tv_nsec = tv.tv_usec * 1000; - if(tod_ret != 0) { - perror("Could not get time of day"); - return LLADD_INTERNAL_ERROR; - } - do { - int wait_ret = pthread_cond_timedwait(&ridLock->readOK, mut, &ts); - if(wait_ret == ETIMEDOUT) { - ridLock->active--; - pthread_mutex_unlock(mut); - return LLADD_DEADLOCK; - } - } while(ridLock->writers); - } - if(currentLockLevel < LM_READLOCK) { - ridLock->readers++; - pblHtRemove_r(xidLocks, dat, datLen); - pblHtInsert_r(xidLocks, dat, datLen, (void*)LM_READLOCK); - } - ridLock->active--; - pthread_mutex_unlock(mut); - return 0; -} -int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) { - - if(xid == -1) { return 0; } - pthread_mutex_lock(&xid_table_mutex); - pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int)); - - if(!xidLocks) { - xidLocks = lockManagerBeginTransactionUnlocked(xid); - } - - long currentLockLevel = (long)pblHtLookup_r(xidLocks, dat, datLen); - - // printf("xid %d write lock (%d)\n", xid, currentLockLevel); - - int me = 0; - pthread_mutex_unlock(&xid_table_mutex); - - if(currentLockLevel >= LM_WRITELOCK) { - return 0; - } else if(currentLockLevel == LM_READLOCK) { - me = 1; - } - - pthread_mutex_t * mut = getMutex(dat, datLen); - - pthread_mutex_lock(mut); - lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen); - if(!ridLock) { - ridLock = createLock(dat, datLen); - } - - ridLock->active++; - ridLock->waiting++; - if(ridLock->writers || (ridLock->readers - me)) { - struct timeval tv; - int tod_ret = gettimeofday(&tv, NULL); - tv.tv_sec++; - struct timespec ts; - ts.tv_sec = tv.tv_sec; - ts.tv_nsec = tv.tv_usec * 1000; - if(tod_ret != 0) { - perror("Could not get time of day"); - return LLADD_INTERNAL_ERROR; - } - while(ridLock->writers || (ridLock->readers - me)) { - int lockret = pthread_cond_timedwait(&ridLock->writeOK, mut, &ts); - if(lockret == ETIMEDOUT) { - ridLock->waiting--; - ridLock->active--; - pthread_mutex_unlock(mut); - return LLADD_DEADLOCK; - } - } - } - ridLock->waiting--; - if(currentLockLevel == 0) { - ridLock->readers++; - ridLock->writers++; - } else if (currentLockLevel == LM_READLOCK) { - ridLock->writers++; - pblHtRemove_r(xidLocks, dat, datLen); - } - if(currentLockLevel != LM_WRITELOCK) { - pblHtInsert_r(xidLocks, dat, datLen, (void*)LM_WRITELOCK); - } - - ridLock->active--; - pthread_mutex_unlock(mut); - return 0; -} - -static int decrementLock(void * dat, int datLen, int currentLevel) { - // pthread_mutex_unlock(&xid_table_mutex); - pthread_mutex_t * mut = getMutex(dat, datLen); - pthread_mutex_lock(mut); - lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen); - assert(ridLock); - ridLock->active++; - if(currentLevel == LM_WRITELOCK) { - ridLock->writers--; - ridLock->readers--; - } else if(currentLevel == LM_READLOCK) { - ridLock->readers--; - } else if(currentLevel == 0) { - assert(0); // Someone tried to release a lock they didn't own! - } else { - fprintf(stderr, "Unknown lock type encountered!"); - ridLock->active--; - pthread_mutex_unlock(mut); - return LLADD_INTERNAL_ERROR; - } - - ridLock->active--; - - if(!(ridLock->active || ridLock->waiting || ridLock->readers || ridLock->writers)) { - // printf("destroyed lock"); - destroyLock(dat, datLen, ridLock); - } else { - // printf("(%d %d %d %d)", ridLock->active, ridLock->waiting, ridLock->readers, ridLock->writers); - } - pthread_mutex_unlock(mut); - return 0; -} - -int lockManagerUnlockHashed(int xid, byte * dat, int datLen) { - - - if(xid == -1) { return 0; } - // printf("xid %d unlock\n", xid); - - pthread_mutex_lock(&xid_table_mutex); - - pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int)); - - if(!xidLocks) { - xidLocks = lockManagerBeginTransactionUnlocked(xid); - } - - pthread_mutex_unlock(&xid_table_mutex); - - long currentLevel = (long)pblHtLookup_r(xidLocks, dat, datLen); - - assert(currentLevel); - pblHtRemove_r(xidLocks, dat, datLen); - decrementLock(dat, datLen, currentLevel); - - return 0; -} - -int lockManagerCommitHashed(int xid, int datLen) { - if(xid == -1) { return 0; } - pthread_mutex_lock(&xid_table_mutex); - - pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int)); - pblHtRemove_r(xidLockTable, &xid, sizeof(int)); - if(!xidLocks) { - xidLocks = lockManagerBeginTransactionUnlocked(xid); - } - - pthread_mutex_unlock(&xid_table_mutex); - long currentLevel; - int ret = 0; - for(currentLevel = (long)pblHtFirst_r(xidLocks); currentLevel; currentLevel = (long)pblHtNext_r(xidLocks)) { - void * currentKey = pblHtCurrentKey_r(xidLocks); - int tmpret = decrementLock(currentKey, datLen, currentLevel); - // Pass any error(s) up to the user. - // (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR) - if(tmpret) { - ret = tmpret; - } - } - pblHtDelete(xidLocks); - return ret; -} - -int lockManagerReadLockRecord(int xid, recordid rid) { - return lockManagerReadLockHashed(xid, (byte*)&rid, sizeof(recordid)); -} -int lockManagerWriteLockRecord(int xid, recordid rid) { - return lockManagerWriteLockHashed(xid, (byte*)&rid, sizeof(recordid)); -} -int lockManagerUnlockRecord(int xid, recordid rid) { - return lockManagerUnlockHashed(xid, (byte*)&rid, sizeof(recordid)); -} -int lockManagerCommitRecords(int xid) { - return lockManagerCommitHashed(xid, sizeof(recordid)); -} - -int lockManagerReadLockPage(int xid, pageid_t p) { - return lockManagerReadLockHashed(xid, (byte*)&p, sizeof(p)); -} -int lockManagerWriteLockPage(int xid, pageid_t p) { - return lockManagerWriteLockHashed(xid, (byte*)&p, sizeof(p)); -} -int lockManagerUnlockPage(int xid, pageid_t p) { - return lockManagerUnlockHashed(xid, (byte*)&p, sizeof(p)); -} -int lockManagerCommitPages(int xid) { - return lockManagerCommitHashed(xid, sizeof(pageid_t)); -} LockManagerSetup globalLockManager; -void setupLockManagerCallbacksPage() { - globalLockManager.init = &lockManagerInitHashed; - globalLockManager.readLockPage = &lockManagerReadLockPage; - globalLockManager.writeLockPage = &lockManagerWriteLockPage; - globalLockManager.unlockPage = &lockManagerUnlockPage; - globalLockManager.readLockRecord = NULL; - globalLockManager.writeLockRecord = NULL; - globalLockManager.unlockRecord = NULL; - globalLockManager.commit = &lockManagerCommitPages; - globalLockManager.abort = &lockManagerCommitPages; - globalLockManager.begin = &lockManagerBeginTransaction; - - globalLockManager.init(); -} - -void setupLockManagerCallbacksRecord () { - globalLockManager.init = &lockManagerInitHashed; - globalLockManager.readLockPage = NULL; - globalLockManager.writeLockPage = NULL; - globalLockManager.unlockPage = NULL; - globalLockManager.readLockRecord = &lockManagerReadLockRecord; - globalLockManager.writeLockRecord = &lockManagerWriteLockRecord; - globalLockManager.unlockRecord = &lockManagerUnlockRecord; - globalLockManager.commit = &lockManagerCommitRecords; - globalLockManager.abort = &lockManagerCommitRecords; - globalLockManager.begin = &lockManagerBeginTransaction; - globalLockManager.init(); -} - - void setupLockManagerCallbacksNil () { globalLockManager.init = NULL; globalLockManager.readLockPage = NULL; @@ -399,3 +21,4 @@ void setupLockManagerCallbacksNil () { globalLockManager.abort = NULL; globalLockManager.begin = NULL; } + diff --git a/src/stasis/operations/linearHashNTA.c b/src/stasis/operations/linearHashNTA.c index 5c5ced9..306856e 100644 --- a/src/stasis/operations/linearHashNTA.c +++ b/src/stasis/operations/linearHashNTA.c @@ -6,9 +6,9 @@ #include #include // The next two #includes are for deprecated code. -#include -#include -#include +//#include +//#include +//#include /** re-entrant implementation of a linear hash hable, using nested top actions. @@ -515,7 +515,8 @@ static int linearHashNTAIterator_value(int xid, void * impl, byte ** value) { //--------------------------------- async hash operations happen below here -typedef struct { +/* + typedef struct { int value_len; int key_len; } asyncHashInsert_t; @@ -594,3 +595,4 @@ lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads, return mainFifo->consumer; } +*/ diff --git a/src/stasis/page.c b/src/stasis/page.c index 3be7f1b..09a3721 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -117,7 +117,6 @@ void stasis_page_init(stasis_dirty_page_table_t * dpt) { stasis_page_impl_register(stasis_page_boundary_tag_impl()); stasis_page_impl_register(arrayListImpl()); stasis_page_impl_register(stasis_page_blob_impl()); - stasis_page_impl_register(lsmRootImpl()); stasis_page_impl_register(slottedLsnFreeImpl()); stasis_page_impl_register(segmentImpl()); stasis_page_impl_register(stasis_page_slotted_latch_free_impl()); diff --git a/src/stasis/page/latchFree/lfSlotted.c b/src/stasis/page/latchFree/lfSlotted.c index 09b5dae..a7c3f0c 100644 --- a/src/stasis/page/latchFree/lfSlotted.c +++ b/src/stasis/page/latchFree/lfSlotted.c @@ -5,14 +5,20 @@ * Author: sears */ +#include #include #include #include +#ifdef HAVE_GCC_ATOMICS #define CAS(_a,_o,_n) __sync_bool_compare_and_swap(_a,_o,_n) #define BARRIER() __sync_synchronize() #define FETCH_AND_ADD(_a, _i) __sync_fetch_and_add(_a, _i) - +#else +#define CAS(_a,_o,_n) 1 +#define BARRIER() abort() +#define FETCH_AND_ADD(_a, _i) 1 +#endif static int notSupported(int xid, Page * p) { return 0; } static const byte* lfSlottedRead (int xid, Page *p, recordid rid) { diff --git a/src/stasis/transactionTable.c b/src/stasis/transactionTable.c index 6062814..f699a8d 100644 --- a/src/stasis/transactionTable.c +++ b/src/stasis/transactionTable.c @@ -14,7 +14,7 @@ struct stasis_transaction_table_t { int active_count; -#ifndef HAVE_GCC_ATMOICS +#ifndef HAVE_GCC_ATOMICS /** This mutex protects the rest of the struct xidCount. diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 74df398..451f5e1 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -2,7 +2,6 @@ #include #include #include -#include #include #include #include @@ -97,7 +96,6 @@ int Tinit() { BtreeInit(); TlinkedListNTAInit(); iterator_init(); - consumer_init(); setupLockManagerCallbacksNil(); stasis_recovery_initiate(stasis_log_file, stasis_transaction_table, stasis_alloc); diff --git a/stasis/arrayCollection.h b/stasis/experimental/arrayCollection.h similarity index 90% rename from stasis/arrayCollection.h rename to stasis/experimental/arrayCollection.h index 13cf6bb..a29c2aa 100644 --- a/stasis/arrayCollection.h +++ b/stasis/experimental/arrayCollection.h @@ -6,3 +6,5 @@ void arrayIterator_close(int xid, void * impl); int arrayIterator_next (int xid, void * impl); int arrayIterator_key (int xid, void * impl, byte ** key); int arrayIterator_value(int xid, void * impl, byte ** value); + +void stasis_arrayCollection_init(); diff --git a/stasis/consumer.h b/stasis/experimental/consumer.h similarity index 100% rename from stasis/consumer.h rename to stasis/experimental/consumer.h diff --git a/stasis/fifo.h b/stasis/experimental/fifo.h similarity index 95% rename from stasis/fifo.h rename to stasis/experimental/fifo.h index b259fe0..212f07b 100644 --- a/stasis/fifo.h +++ b/stasis/experimental/fifo.h @@ -1,6 +1,6 @@ #include -#include +#include #ifndef __FIFO_H #define __FIFO_H @@ -41,4 +41,7 @@ int lladdFifoPool_iterator_key (int xid, void * it, byte ** key); int lladdFifoPool_iterator_value (int xid, void * it, byte ** val); void lladdFifoPool_iterator_tupleDone(int xid, void * it); void lladdFifoPool_iterator_releaseLock(int xid, void * it); + +void stasis_fifo_init(); + #endif // __FIFO_H diff --git a/stasis/graph.h b/stasis/experimental/graph.h similarity index 94% rename from stasis/graph.h rename to stasis/experimental/graph.h index c221851..09c0623 100644 --- a/stasis/graph.h +++ b/stasis/experimental/graph.h @@ -1,5 +1,5 @@ #include -#include +#include #ifndef __LLADD_GRAPH_H #define __LLADD_GRAPH_H diff --git a/stasis/operations/group.h b/stasis/experimental/group.h similarity index 100% rename from stasis/operations/group.h rename to stasis/experimental/group.h diff --git a/stasis/logger/logMemory.h b/stasis/experimental/logMemory.h similarity index 85% rename from stasis/logger/logMemory.h rename to stasis/experimental/logMemory.h index 025740d..5ff35e1 100644 --- a/stasis/logger/logMemory.h +++ b/stasis/experimental/logMemory.h @@ -1,8 +1,8 @@ #include -#include -#include +#include +#include #include -#include +#include #ifndef __LOGMEMORY_H #define __LOGMEMORY_H @@ -28,4 +28,6 @@ int logMemory_Iterator_key (int xid, void * impl, byte ** key); int logMemory_Iterator_value (int xid, void * impl, byte ** value); void logMemory_Iterator_releaseTuple(int xid, void *it); void logMemory_Iterator_releaseLock (int xid, void * impl); + +void stasis_logMemory_init(); #endif diff --git a/stasis/operations/lsmIterators.h b/stasis/experimental/lsmIterators.h similarity index 98% rename from stasis/operations/lsmIterators.h rename to stasis/experimental/lsmIterators.h index ce52991..fe6c22a 100644 --- a/stasis/operations/lsmIterators.h +++ b/stasis/experimental/lsmIterators.h @@ -1,11 +1,13 @@ #ifndef _LSMITERATORS_H__ #define _LSMITERATORS_H__ -#include "stasis/page.h" -#include "stasis/bufferManager.h" -#include "stasis/page/compression/compression.h" -#include "stasis/page/compression/tuple.h" -#include "stasis/operations.h" +#include +#include +#include +#include +#include + +#include "lsmTree.h" /** @file diff --git a/stasis/operations/lsmTable.h b/stasis/experimental/lsmTable.h similarity index 100% rename from stasis/operations/lsmTable.h rename to stasis/experimental/lsmTable.h diff --git a/stasis/operations/lsmTree.h b/stasis/experimental/lsmTree.h similarity index 98% rename from stasis/operations/lsmTree.h rename to stasis/experimental/lsmTree.h index 4472ba7..e65fef0 100644 --- a/stasis/operations/lsmTree.h +++ b/stasis/experimental/lsmTree.h @@ -18,6 +18,9 @@ #include #include #include + +BEGIN_C_DECLS + typedef struct { recordid treeRoot; recordid pos; @@ -153,4 +156,8 @@ static inline int lsmTreeIterator_value(int xid, lladdIterator_t *it, } static inline void lsmTreeIterator_tupleDone(int xid, void *it) { } static inline void lsmTreeIterator_releaseLock(int xid, void *it) { } +page_impl lsmRootImpl(); + +END_C_DECLS + #endif // _LSMTREE_H__ diff --git a/stasis/operations/lsmWorkers.h b/stasis/experimental/lsmWorkers.h similarity index 100% rename from stasis/operations/lsmWorkers.h rename to stasis/experimental/lsmWorkers.h diff --git a/stasis/multiplexer.h b/stasis/experimental/multiplexer.h similarity index 98% rename from stasis/multiplexer.h rename to stasis/experimental/multiplexer.h index 409ad54..9670a44 100644 --- a/stasis/multiplexer.h +++ b/stasis/experimental/multiplexer.h @@ -1,4 +1,4 @@ -#include "iterator.h" +#include #include "consumer.h" #include "fifo.h" diff --git a/stasis/ringbuffer.h b/stasis/experimental/ringbuffer.h similarity index 100% rename from stasis/ringbuffer.h rename to stasis/experimental/ringbuffer.h diff --git a/stasis/operations.h b/stasis/operations.h index 32d4ac8..e793770 100644 --- a/stasis/operations.h +++ b/stasis/operations.h @@ -155,9 +155,7 @@ typedef struct { #include "operations/linearHashNTA.h" #include "operations/bTree.h" #include "operations/regions.h" -#include "operations/lsmTree.h" #include "operations/lsnFreeSet.h" -#include "operations/group.h" #include "operations/segmentFile.h" /** diff --git a/stasis/page/fixed.h b/stasis/page/fixed.h index b66acbb..57aee22 100644 --- a/stasis/page/fixed.h +++ b/stasis/page/fixed.h @@ -36,6 +36,4 @@ void fixedPageInit(); void fixedPageDeinit(); page_impl fixedImpl(); page_impl arrayListImpl(); -// @todo move lsmTreeImpl() to lsmTree.h (but first, move page.h...) -page_impl lsmRootImpl(); #endif diff --git a/test/stasis/CMakeLists.txt b/test/stasis/CMakeLists.txt index abc0399..cd5399f 100644 --- a/test/stasis/CMakeLists.txt +++ b/test/stasis/CMakeLists.txt @@ -19,9 +19,9 @@ CREATE_CHECK(check_header) CREATE_CHECK(check_linkedListNTA) CREATE_CHECK(check_linearHashNTA) CREATE_CHECK(check_pageOrientedList) -CREATE_CHECK(check_ringbuffer) -CREATE_CHECK(check_iterator) -CREATE_CHECK(check_multiplexer) +CREATE_EXPERIMENTAL_CHECK(check_ringbuffer) +CREATE_EXPERIMENTAL_CHECK(check_iterator) +CREATE_EXPERIMENTAL_CHECK(check_multiplexer) CREATE_CHECK(check_bTree) CREATE_CHECK(check_regions) CREATE_CHECK(check_allocationPolicy) @@ -29,6 +29,6 @@ CREATE_CHECK(check_dirtyPageTable) CREATE_CHECK(check_io) CREATE_CHECK(check_rangeTracker) CREATE_CHECK(check_replacementPolicy) -CREATE_CHECK(check_lsmTree) -CREATE_CHECK(check_groupBy) +CREATE_EXPERIMENTAL_CHECK(check_lsmTree) +CREATE_EXPERIMENTAL_CHECK(check_groupBy) CREATE_CHECK(check_boundedLog) diff --git a/test/stasis/check_groupBy.c b/test/stasis/check_groupBy.c index f6dd710..f0e6549 100644 --- a/test/stasis/check_groupBy.c +++ b/test/stasis/check_groupBy.c @@ -43,6 +43,7 @@ terms specified in this license. #include "../check_includes.h" #include +#include #include #include @@ -55,8 +56,10 @@ terms specified in this license. START_TEST(groupBySmokeTest) { Tinit(); + stasis_log_structured_group_init(); int xid = Tbegin(); - stasis_group_t * handle = TlogStructuredGroup(xid, 1024*1024*40); + +stasis_group_t * handle = TlogStructuredGroup(xid, 1024*1024*40); for(int i =0; i < 10000; i++) { for(int j = 0; j < 100; j++) { int val = i * 100 + j; diff --git a/test/stasis/check_iterator.c b/test/stasis/check_iterator.c index 2370697..791d270 100644 --- a/test/stasis/check_iterator.c +++ b/test/stasis/check_iterator.c @@ -42,7 +42,7 @@ terms specified in this license. #include "../check_includes.h" #include -#include +#include #include #include @@ -111,6 +111,7 @@ static void iterator_test(int xid, START_TEST(iteratorTest) { Tinit(); + stasis_arrayCollection_init(); int xid = Tbegin(); unsigned int keyArray[NUM_ENTRIES]; byte valueArray[NUM_ENTRIES]; diff --git a/test/stasis/check_lsmTree.c b/test/stasis/check_lsmTree.c index 2065c98..d2fc6c2 100644 --- a/test/stasis/check_lsmTree.c +++ b/test/stasis/check_lsmTree.c @@ -2,6 +2,8 @@ #include +#include + #include #include #include @@ -31,6 +33,7 @@ void insertProbeIter(lsmkey_t NUM_ENTRIES) { lsmTreeRegisterComparator(intcmp,cmp); TlsmRegionAllocConf_t alloc_conf = LSM_REGION_ALLOC_STATIC_INITIALIZER; + stasis_page_impl_register(lsmRootImpl()); Tinit(); int xid = Tbegin(); recordid tree = TlsmCreate(xid, intcmp, diff --git a/test/stasis/check_multiplexer.c b/test/stasis/check_multiplexer.c index 7091476..02aef33 100644 --- a/test/stasis/check_multiplexer.c +++ b/test/stasis/check_multiplexer.c @@ -42,8 +42,10 @@ terms specified in this license. #include "../check_includes.h" #include -#include -#include +#include +#include +#include +#include #include #include @@ -165,6 +167,11 @@ static void * go2( void * arg) { START_TEST(multiplexTest) { Tinit(); + stasis_fifo_init(); + stasis_log_structured_group_init(); + stasis_arrayCollection_init(); + stasis_logMemory_init(); + consumer_init(); int xid = Tbegin(); recordid hash = ThashCreate(xid, sizeof(lsn_t), VARIABLE_LENGTH); diff --git a/test/stasis/check_ringbuffer.c b/test/stasis/check_ringbuffer.c index 81acd30..56ce21c 100644 --- a/test/stasis/check_ringbuffer.c +++ b/test/stasis/check_ringbuffer.c @@ -41,7 +41,7 @@ terms specified in this license. ---*/ #include "../check_includes.h" -#include +#include #include #include