move a bunch of multiplexer and lsm related stuff into experimental/ directory

This commit is contained in:
Sears Russell 2011-06-13 06:58:34 +00:00
parent b477ac11a6
commit 377788a13e
45 changed files with 548 additions and 485 deletions

View file

@ -122,6 +122,12 @@ MACRO(CREATE_CHECK NAME)
ADD_TEST(${NAME} nice ./${NAME}) ADD_TEST(${NAME} nice ./${NAME})
ENDMACRO(CREATE_CHECK) ENDMACRO(CREATE_CHECK)
MACRO(CREATE_EXPERIMENTAL_CHECK NAME)
ADD_EXECUTABLE(${NAME} ${NAME}.c)
TARGET_LINK_LIBRARIES(${NAME} stasis_experimental ${COMMON_LIBRARIES})
ADD_TEST(${NAME} nice ./${NAME})
ENDMACRO(CREATE_EXPERIMENTAL_CHECK)
MACRO(CREATE_CHECK_OPT NAME OPT) MACRO(CREATE_CHECK_OPT NAME OPT)
ADD_EXECUTABLE(${NAME} ${NAME}.c) ADD_EXECUTABLE(${NAME} ${NAME}.c)
TARGET_LINK_LIBRARIES(${NAME} ${COMMON_LIBRARIES}) TARGET_LINK_LIBRARIES(${NAME} ${COMMON_LIBRARIES})

View file

@ -2,16 +2,20 @@ SUBDIRS(multicore)
IF(HAVE_GETLINE) IF(HAVE_GETLINE)
ADD_EXECUTABLE(rose rose.cpp) ADD_EXECUTABLE(rose rose.cpp)
TARGET_LINK_LIBRARIES(rose ${COMMON_LIBRARIES}) TARGET_LINK_LIBRARIES(rose stasis_experimental ${COMMON_LIBRARIES})
ADD_EXECUTABLE(roseTable roseTable.cpp) ADD_EXECUTABLE(roseTable roseTable.cpp)
TARGET_LINK_LIBRARIES(roseTable ${COMMON_LIBRARIES}) TARGET_LINK_LIBRARIES(roseTable stasis_experimental ${COMMON_LIBRARIES})
ADD_EXECUTABLE(roseTableTpcCH-workload1 roseTableTpcCH-workload1.cpp) ADD_EXECUTABLE(roseTableTpcCH-workload1 roseTableTpcCH-workload1.cpp)
TARGET_LINK_LIBRARIES(roseTableTpcCH-workload1 ${COMMON_LIBRARIES}) TARGET_LINK_LIBRARIES(roseTableTpcCH-workload1 stasis_experimental ${COMMON_LIBRARIES})
ADD_EXECUTABLE(roseTableTpcCH-workload2 roseTableTpcCH-workload2.cpp) ADD_EXECUTABLE(roseTableTpcCH-workload2 roseTableTpcCH-workload2.cpp)
TARGET_LINK_LIBRARIES(roseTableTpcCH-workload2 ${COMMON_LIBRARIES}) TARGET_LINK_LIBRARIES(roseTableTpcCH-workload2 stasis_experimental ${COMMON_LIBRARIES})
ADD_EXECUTABLE(transitiveClosure transitiveClosure.c)
TARGET_LINK_LIBRARIES(transitiveClosure stasis_experimental ${COMMON_LIBRARIES})
ENDIF(HAVE_GETLINE) ENDIF(HAVE_GETLINE)
@ -27,7 +31,6 @@ CREATE_EXECUTABLE(pageOrientedListNTA)
CREATE_EXECUTABLE(linearHashNTAThreaded) CREATE_EXECUTABLE(linearHashNTAThreaded)
CREATE_EXECUTABLE(linearHashNTAMultiReader) CREATE_EXECUTABLE(linearHashNTAMultiReader)
CREATE_EXECUTABLE(linearHashNTAWriteRequests) CREATE_EXECUTABLE(linearHashNTAWriteRequests)
CREATE_EXECUTABLE(transitiveClosure)
CREATE_EXECUTABLE(zeroCopy) CREATE_EXECUTABLE(zeroCopy)
CREATE_EXECUTABLE(sequentialThroughput) CREATE_EXECUTABLE(sequentialThroughput)
CREATE_EXECUTABLE(qos) CREATE_EXECUTABLE(qos)

View file

@ -10,15 +10,16 @@
//typedef int32_t val_t; // rose executable's input contains 32 bit values //typedef int32_t val_t; // rose executable's input contains 32 bit values
typedef int64_t val_t; // rose executable's input contains 64 bit values typedef int64_t val_t; // rose executable's input contains 64 bit values
#include "stasis/experimental/lsmTree.h"
#include "stasis/page/compression/for-impl.h" #include "stasis/page/compression/for-impl.h"
#include "stasis/page/compression/pstar-impl.h" #include "stasis/page/compression/pstar-impl.h"
#include "stasis/page/compression/rle-impl.h" #include "stasis/page/compression/rle-impl.h"
#include "stasis/page/compression/multicolumn-impl.h" #include "stasis/page/compression/multicolumn-impl.h"
#include "stasis/page/compression/tuple.h" #include "stasis/page/compression/tuple.h"
#include "stasis/operations/lsmIterators.h" #include "stasis/experimental/lsmIterators.h"
#include "stasis/operations/lsmWorkers.h" #include "stasis/experimental/lsmWorkers.h"
#undef end #undef end
#undef begin #undef begin
@ -355,6 +356,7 @@ void run_test(unsigned int inserts, column_number_t column_count,
int num_pages = 0; int num_pages = 0;
stasis_page_impl_register(lsmRootImpl());
Tinit(); Tinit();
recordid tree = NULLRID; recordid tree = NULLRID;

View file

@ -1,7 +1,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include "stasis/operations/lsmTable.h" #include "stasis/experimental/lsmTable.h"
#include "stasis/transactional.h" #include "stasis/transactional.h"
@ -89,6 +89,7 @@ namespace rose {
// bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; // bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE;
stasis_page_impl_register(lsmRootImpl());
Tinit(); Tinit();
int xid = Tbegin(); int xid = Tbegin();

View file

@ -1,7 +1,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include "stasis/operations/lsmTable.h" #include "stasis/experimental/lsmTable.h"
#include "stasis/transactional.h" #include "stasis/transactional.h"
@ -50,6 +50,7 @@ namespace rose {
// bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; // bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE;
stasis_page_impl_register(lsmRootImpl());
Tinit(); Tinit();
int xid = Tbegin(); int xid = Tbegin();

View file

@ -1,7 +1,7 @@
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/multiplexer.h> #include <stasis/experimental/multiplexer.h>
#include <stasis/graph.h> #include <stasis/experimental/graph.h>
#include <stasis/logger/logMemory.h> #include <stasis/experimental/logMemory.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>

View file

@ -8,16 +8,15 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c concurrentHash.c rw.c doubleLink
page.c bufferPool.c blobManager.c page.c bufferPool.c blobManager.c
recovery2.c truncation.c transactional2.c recovery2.c truncation.c transactional2.c
dirtyPageTable.c transactionTable.c dirtyPageTable.c transactionTable.c
allocationPolicy.c lockManager.c iterator.c allocationPolicy.c iterator.c
consumer.c arrayCollection.c ringbuffer.c fifo.c logger/logEntry.c
multiplexer.c graph.c logger/logEntry.c
logger/safeWrites.c logger/logWriterUtils.c logger/safeWrites.c logger/logWriterUtils.c
logger/filePool.c logger/filePool.c
logger/inMemoryLog.c logger/inMemoryLog.c
logger/logHandle.c logger/logger2.c logger/logHandle.c logger/logger2.c
logger/logMemory.c
logger/reorderingHandle.c logger/reorderingHandle.c
logger/groupForce.c logger/groupForce.c
lockManager.c
page/uninitialized.c page/uninitialized.c
page/slotted.c page/slotted.c
page/fixed.c page/fixed.c
@ -32,13 +31,12 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c concurrentHash.c rw.c doubleLink
operations/arrayList.c operations/arrayList.c
operations/lsnFreeSet.c operations/lsnFreeSet.c
operations/segmentFile.c operations/segmentFile.c
operations/group/logStructured.c
hash.c hash.c
operations/naiveLinearHash.c operations/naiveLinearHash.c
operations/linearHashNTA.c operations/linearHashNTA.c
operations/linkedListNTA.c operations/linkedListNTA.c
operations/pageOrientedListNTA.c operations/pageOrientedListNTA.c
operations/regions.c operations/lsmTree.c operations/regions.c
operations/bTree.c operations/bTree.c
io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/raid1.c io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/raid1.c
io/non_blocking.c io/debug.c io/handle.c io/non_blocking.c io/debug.c io/handle.c
@ -48,5 +46,17 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c concurrentHash.c rw.c doubleLink
util/log2.c util/histogram.c util/hashFunctions.c util/min.c util/ringbuffer.c util/log2.c util/histogram.c util/hashFunctions.c util/min.c util/ringbuffer.c
util/multiset.c util/slab.c util/multiset.c util/slab.c
stlredblack.cpp util/stlslab.cpp) stlredblack.cpp util/stlslab.cpp)
ADD_LIBRARY(stasis_experimental
experimental/consumer.c
experimental/fifo.c
experimental/graph.c
experimental/logMemory.c
experimental/multiplexer.c
experimental/ringbuffer.c
experimental/arrayCollection.c
experimental/lockManagerImpl.c
experimental/group.c
experimental/lsmTree.c
)
INSTALL(TARGETS stasis LIBRARY DESTINATION lib) INSTALL(TARGETS stasis LIBRARY DESTINATION lib)

View file

@ -53,6 +53,7 @@ terms specified in this license.
#endif #endif
#include <stasis/common.h> #include <stasis/common.h>
#include <stasis/constants.h>
#include <assert.h> #include <assert.h>

View file

@ -61,7 +61,16 @@ typedef struct {
int elementCount; int elementCount;
} pointer_array_iterator_t; } pointer_array_iterator_t;
static void noopTupDone(int xid, void * foo) { }
/*lladdIterator_t * ptrArray_iterator(byte * array, int elementCount) { void stasis_arrayCollection_init() {
lladdIterator_def_t array_def = {
}*/ arrayIterator_close,
arrayIterator_next,
arrayIterator_next,
arrayIterator_key,
arrayIterator_value,
noopTupDone,
};
lladdIterator_register(ARRAY_ITERATOR, array_def);
}

View file

@ -1,6 +1,6 @@
#include <stasis/consumer.h> #include <stasis/experimental/consumer.h>
#include <assert.h> #include <assert.h>
#include <stasis/logger/logMemory.h> #include <stasis/experimental/logMemory.h>
static lladdConsumer_def_t consumers[MAX_CONSUMER_TYPES]; static lladdConsumer_def_t consumers[MAX_CONSUMER_TYPES];

View file

@ -1,6 +1,6 @@
#include <stasis/fifo.h> #include <stasis/experimental/fifo.h>
#include <stasis/crc32.h> #include <stasis/crc32.h>
#include <stasis/logger/logMemory.h> #include <stasis/experimental/logMemory.h>
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
@ -240,3 +240,15 @@ lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerC
return pool; return pool;
} }
void stasis_fifo_init() {
lladdIterator_def_t pointer_def = {
lladdFifoPool_iterator_close,
lladdFifoPool_iterator_next,
lladdFifoPool_iterator_tryNext,
lladdFifoPool_iterator_key,
lladdFifoPool_iterator_value,
lladdFifoPool_iterator_tupleDone,
};
lladdIterator_register(POINTER_ITERATOR, pointer_def);
}

View file

@ -7,7 +7,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>
#include <stasis/graph.h> #include <stasis/experimental/graph.h>
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/crc32.h> #include <stasis/crc32.h>
#include <assert.h> #include <assert.h>

View file

@ -6,6 +6,7 @@
*/ */
#include<stasis/transactional.h> #include<stasis/transactional.h>
#include<stasis/lhtable.h> #include<stasis/lhtable.h>
#include<stasis/experimental/group.h>
#include<string.h> #include<string.h>
typedef struct { typedef struct {
struct LH_ENTRY(table) * table; struct LH_ENTRY(table) * table;

View file

@ -0,0 +1,385 @@
#include <pbl/pbl.h>
#include <stasis/lockManager.h>
#include <stasis/latches.h>
#include <stasis/hash.h>
#include <sys/time.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
static pthread_mutex_t stasis_lock_manager_ht_mut = PTHREAD_MUTEX_INITIALIZER;
static int pblHtInsert_r(pblHashTable_t * h, void * key, size_t keylen, void * val) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
int ret = pblHtInsert(h, key, keylen, val);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static void * pblHtLookup_r(pblHashTable_t * h, void * key, size_t keylen) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
void * ret = pblHtLookup(h, key, keylen);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static int pblHtRemove_r(pblHashTable_t * h, void * key, size_t keylen) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
int ret = pblHtRemove(h, key, keylen);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static void * pblHtFirst_r(pblHashTable_t *h) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
void * ret = pblHtFirst(h);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static void * pblHtNext_r(pblHashTable_t *h) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
void * ret = pblHtNext(h);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static void * pblHtCurrentKey_r(pblHashTable_t *h) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
void * ret = pblHtCurrentKey(h);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
#define MUTEX_COUNT 32
// These next two correspond to MUTEX count, and are the appropriate values to pass into hash().
#define MUTEX_BITS 5
#define MUTEX_EXT 32
static pthread_mutex_t mutexes[MUTEX_COUNT];
static pthread_mutex_t xid_table_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t * getMutex(byte * dat, int datLen) {
return &mutexes[stasis_linear_hash(dat, datLen, MUTEX_BITS, MUTEX_EXT)];
}
static pblHashTable_t * xidLockTable;
static pblHashTable_t * ridLockTable;
typedef struct {
pthread_cond_t writeOK;
pthread_cond_t readOK;
int readers;
int writers;
int waiting;
int active;
} lock;
void lockManagerInitHashed() {
int i = 0;
for(i = 0; i < MUTEX_COUNT; i++) {
pthread_mutex_init(&mutexes[i], NULL);
}
xidLockTable = pblHtCreate();
ridLockTable = pblHtCreate();
}
pblHashTable_t * lockManagerBeginTransactionUnlocked(int xid) {
pblHashTable_t * xidLocks = pblHtCreate();
pblHtInsert_r(xidLockTable, &xid, sizeof(int), xidLocks);
return xidLocks;
}
int lockManagerBeginTransaction(int xid) {
pthread_mutex_lock(&xid_table_mutex);
lockManagerBeginTransactionUnlocked(xid);
pthread_mutex_unlock(&xid_table_mutex);
return 0;
}
lock* createLock(byte * dat, int datLen) {
lock * ret = malloc(sizeof(lock));
if(!ret) { return NULL; }
// pthread_mutex_init(&ret->mut, NULL);
pthread_cond_init(&ret->writeOK, NULL);
pthread_cond_init(&ret->readOK, NULL);
ret->active = 0;
ret->readers = 0;
ret->writers = 0;
ret->waiting = 0;
pblHtInsert_r(ridLockTable, dat, datLen, ret);
return ret;
}
void destroyLock(byte * dat, int datLen, lock * l) {
pthread_cond_destroy(&l->writeOK);
pthread_cond_destroy(&l->readOK);
free (l);
pblHtRemove_r(ridLockTable, dat, datLen);
}
#define LM_READLOCK 1
#define LM_WRITELOCK 2
int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
long currentLockLevel = (long)pblHtLookup_r(xidLocks, dat, datLen);
// printf("xid %d read lock (%d)\n", xid, currentLockLevel);
if(currentLockLevel >= LM_READLOCK) {
pthread_mutex_unlock(&xid_table_mutex);
return 0;
}
assert(!currentLockLevel);
pthread_mutex_unlock(&xid_table_mutex);
pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen);
if(!ridLock) {
ridLock = createLock(dat, datLen);
}
ridLock->active++;
if(ridLock->writers || ridLock->waiting) {
struct timeval tv;
int tod_ret = gettimeofday (&tv, NULL);
tv.tv_sec++; // Wait up to one second to obtain a lock before detecting deadlock.
struct timespec ts;
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
if(tod_ret != 0) {
perror("Could not get time of day");
return LLADD_INTERNAL_ERROR;
}
do {
int wait_ret = pthread_cond_timedwait(&ridLock->readOK, mut, &ts);
if(wait_ret == ETIMEDOUT) {
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_DEADLOCK;
}
} while(ridLock->writers);
}
if(currentLockLevel < LM_READLOCK) {
ridLock->readers++;
pblHtRemove_r(xidLocks, dat, datLen);
pblHtInsert_r(xidLocks, dat, datLen, (void*)LM_READLOCK);
}
ridLock->active--;
pthread_mutex_unlock(mut);
return 0;
}
int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
long currentLockLevel = (long)pblHtLookup_r(xidLocks, dat, datLen);
// printf("xid %d write lock (%d)\n", xid, currentLockLevel);
int me = 0;
pthread_mutex_unlock(&xid_table_mutex);
if(currentLockLevel >= LM_WRITELOCK) {
return 0;
} else if(currentLockLevel == LM_READLOCK) {
me = 1;
}
pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen);
if(!ridLock) {
ridLock = createLock(dat, datLen);
}
ridLock->active++;
ridLock->waiting++;
if(ridLock->writers || (ridLock->readers - me)) {
struct timeval tv;
int tod_ret = gettimeofday(&tv, NULL);
tv.tv_sec++;
struct timespec ts;
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
if(tod_ret != 0) {
perror("Could not get time of day");
return LLADD_INTERNAL_ERROR;
}
while(ridLock->writers || (ridLock->readers - me)) {
int lockret = pthread_cond_timedwait(&ridLock->writeOK, mut, &ts);
if(lockret == ETIMEDOUT) {
ridLock->waiting--;
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_DEADLOCK;
}
}
}
ridLock->waiting--;
if(currentLockLevel == 0) {
ridLock->readers++;
ridLock->writers++;
} else if (currentLockLevel == LM_READLOCK) {
ridLock->writers++;
pblHtRemove_r(xidLocks, dat, datLen);
}
if(currentLockLevel != LM_WRITELOCK) {
pblHtInsert_r(xidLocks, dat, datLen, (void*)LM_WRITELOCK);
}
ridLock->active--;
pthread_mutex_unlock(mut);
return 0;
}
static int decrementLock(void * dat, int datLen, int currentLevel) {
// pthread_mutex_unlock(&xid_table_mutex);
pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen);
assert(ridLock);
ridLock->active++;
if(currentLevel == LM_WRITELOCK) {
ridLock->writers--;
ridLock->readers--;
} else if(currentLevel == LM_READLOCK) {
ridLock->readers--;
} else if(currentLevel == 0) {
assert(0); // Someone tried to release a lock they didn't own!
} else {
fprintf(stderr, "Unknown lock type encountered!");
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_INTERNAL_ERROR;
}
ridLock->active--;
if(!(ridLock->active || ridLock->waiting || ridLock->readers || ridLock->writers)) {
// printf("destroyed lock");
destroyLock(dat, datLen, ridLock);
} else {
// printf("(%d %d %d %d)", ridLock->active, ridLock->waiting, ridLock->readers, ridLock->writers);
}
pthread_mutex_unlock(mut);
return 0;
}
int lockManagerUnlockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
// printf("xid %d unlock\n", xid);
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
pthread_mutex_unlock(&xid_table_mutex);
long currentLevel = (long)pblHtLookup_r(xidLocks, dat, datLen);
assert(currentLevel);
pblHtRemove_r(xidLocks, dat, datLen);
decrementLock(dat, datLen, currentLevel);
return 0;
}
int lockManagerCommitHashed(int xid, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int));
pblHtRemove_r(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
pthread_mutex_unlock(&xid_table_mutex);
long currentLevel;
int ret = 0;
for(currentLevel = (long)pblHtFirst_r(xidLocks); currentLevel; currentLevel = (long)pblHtNext_r(xidLocks)) {
void * currentKey = pblHtCurrentKey_r(xidLocks);
int tmpret = decrementLock(currentKey, datLen, currentLevel);
// Pass any error(s) up to the user.
// (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR)
if(tmpret) {
ret = tmpret;
}
}
pblHtDelete(xidLocks);
return ret;
}
int lockManagerReadLockRecord(int xid, recordid rid) {
return lockManagerReadLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerWriteLockRecord(int xid, recordid rid) {
return lockManagerWriteLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerUnlockRecord(int xid, recordid rid) {
return lockManagerUnlockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerCommitRecords(int xid) {
return lockManagerCommitHashed(xid, sizeof(recordid));
}
int lockManagerReadLockPage(int xid, pageid_t p) {
return lockManagerReadLockHashed(xid, (byte*)&p, sizeof(p));
}
int lockManagerWriteLockPage(int xid, pageid_t p) {
return lockManagerWriteLockHashed(xid, (byte*)&p, sizeof(p));
}
int lockManagerUnlockPage(int xid, pageid_t p) {
return lockManagerUnlockHashed(xid, (byte*)&p, sizeof(p));
}
int lockManagerCommitPages(int xid) {
return lockManagerCommitHashed(xid, sizeof(pageid_t));
}
void setupLockManagerCallbacksPage() {
globalLockManager.init = &lockManagerInitHashed;
globalLockManager.readLockPage = &lockManagerReadLockPage;
globalLockManager.writeLockPage = &lockManagerWriteLockPage;
globalLockManager.unlockPage = &lockManagerUnlockPage;
globalLockManager.readLockRecord = NULL;
globalLockManager.writeLockRecord = NULL;
globalLockManager.unlockRecord = NULL;
globalLockManager.commit = &lockManagerCommitPages;
globalLockManager.abort = &lockManagerCommitPages;
globalLockManager.begin = &lockManagerBeginTransaction;
globalLockManager.init();
}
void setupLockManagerCallbacksRecord () {
globalLockManager.init = &lockManagerInitHashed;
globalLockManager.readLockPage = NULL;
globalLockManager.writeLockPage = NULL;
globalLockManager.unlockPage = NULL;
globalLockManager.readLockRecord = &lockManagerReadLockRecord;
globalLockManager.writeLockRecord = &lockManagerWriteLockRecord;
globalLockManager.unlockRecord = &lockManagerUnlockRecord;
globalLockManager.commit = &lockManagerCommitRecords;
globalLockManager.abort = &lockManagerCommitRecords;
globalLockManager.begin = &lockManagerBeginTransaction;
globalLockManager.init();
}

View file

@ -2,7 +2,7 @@
NOTE: Person who's using the consumer interface calls close first, (for now). NOTE: Person who's using the consumer interface calls close first, (for now).
*/ */
#include <stasis/logger/logMemory.h> #include <stasis/experimental/logMemory.h>
#include <assert.h> #include <assert.h>
typedef struct { typedef struct {
@ -17,8 +17,16 @@ typedef struct {
lsn_t eof; lsn_t eof;
} logMemory_fifo_t; } logMemory_fifo_t;
void logMemory_init() { void stasis_logMemory_init() {
/* NO-OP */ lladdIterator_def_t logMemory_def = {
logMemory_Iterator_close,
logMemory_Iterator_next,
logMemory_Iterator_tryNext,
logMemory_Iterator_key,
logMemory_Iterator_value,
logMemory_Iterator_releaseTuple,
};
lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def);
} }
lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) { lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) {

View file

@ -1,4 +1,4 @@
#include <stasis/operations/lsmTree.h> #include <stasis/experimental/lsmTree.h>
#include <stasis/truncation.h> #include <stasis/truncation.h>
#include <stasis/constants.h> #include <stasis/constants.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>

View file

@ -1,8 +1,8 @@
#include <stasis/multiplexer.h> #include <stasis/experimental/multiplexer.h>
#include <stasis/crc32.h> #include <stasis/crc32.h>
#include <stasis/operations/linearHashNTA.h> #include <stasis/operations/linearHashNTA.h>
#include <stasis/logger/logMemory.h> #include <stasis/experimental/logMemory.h>
lladdMultiplexer_t * lladdMultiplexer_alloc( lladdMultiplexer_t * lladdMultiplexer_alloc(
int xid, lladdIterator_t * it, int xid, lladdIterator_t * it,

View file

@ -1,5 +1,5 @@
#include <stasis/common.h> #include <stasis/common.h>
#include <stasis/ringbuffer.h> #include <stasis/experimental/ringbuffer.h>
//#define TRACK_OFFSETS //#define TRACK_OFFSETS

View file

@ -1,8 +1,7 @@
#include <stasis/iterator.h> #include <stasis/iterator.h>
#include <stasis/operations.h> #include <stasis/operations.h>
#include <stasis/arrayCollection.h> //#include <stasis/arrayCollection.h>
#include <stasis/logger/logMemory.h> #include <stasis/experimental/group.h>
#include <stasis/operations/group.h>
#include <assert.h> #include <assert.h>
@ -12,38 +11,9 @@ void lladdIterator_register(int type, lladdIterator_def_t info) {
assert(type < MAX_ITERATOR_TYPES); assert(type < MAX_ITERATOR_TYPES);
iterators[type] = info; iterators[type] = info;
} }
static void noopTupDone(int xid, void * foo) { }
void iterator_init() { void iterator_init() {
lladdIterator_def_t array_def = { /* no-op */
arrayIterator_close,
arrayIterator_next,
arrayIterator_next,
arrayIterator_key,
arrayIterator_value,
noopTupDone,
};
lladdIterator_register(ARRAY_ITERATOR, array_def);
lladdIterator_def_t logMemory_def = {
logMemory_Iterator_close,
logMemory_Iterator_next,
logMemory_Iterator_tryNext,
logMemory_Iterator_key,
logMemory_Iterator_value,
logMemory_Iterator_releaseTuple,
};
lladdIterator_register(LOG_MEMORY_ITERATOR, logMemory_def);
lladdIterator_def_t pointer_def = {
lladdFifoPool_iterator_close,
lladdFifoPool_iterator_next,
lladdFifoPool_iterator_tryNext,
lladdFifoPool_iterator_key,
lladdFifoPool_iterator_value,
lladdFifoPool_iterator_tupleDone,
};
lladdIterator_register(POINTER_ITERATOR, pointer_def);
stasis_log_structured_group_init();
} }

View file

@ -1,392 +1,14 @@
#include <pbl/pbl.h> /*
* lockManager.c
*
* Created on: Jun 12, 2011
* Author: sears
*/
#include <stasis/lockManager.h> #include <stasis/lockManager.h>
#include <stasis/latches.h>
#include <stasis/hash.h>
#include <sys/time.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
static pthread_mutex_t stasis_lock_manager_ht_mut = PTHREAD_MUTEX_INITIALIZER;
static int pblHtInsert_r(pblHashTable_t * h, void * key, size_t keylen, void * val) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
int ret = pblHtInsert(h, key, keylen, val);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static void * pblHtLookup_r(pblHashTable_t * h, void * key, size_t keylen) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
void * ret = pblHtLookup(h, key, keylen);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static int pblHtRemove_r(pblHashTable_t * h, void * key, size_t keylen) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
int ret = pblHtRemove(h, key, keylen);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static void * pblHtFirst_r(pblHashTable_t *h) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
void * ret = pblHtFirst(h);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static void * pblHtNext_r(pblHashTable_t *h) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
void * ret = pblHtNext(h);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
static void * pblHtCurrentKey_r(pblHashTable_t *h) {
pthread_mutex_lock(&stasis_lock_manager_ht_mut);
void * ret = pblHtCurrentKey(h);
pthread_mutex_unlock(&stasis_lock_manager_ht_mut);
return ret;
}
#define MUTEX_COUNT 32
// These next two correspond to MUTEX count, and are the appropriate values to pass into hash().
#define MUTEX_BITS 5
#define MUTEX_EXT 32
static pthread_mutex_t mutexes[MUTEX_COUNT];
static pthread_mutex_t xid_table_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t * getMutex(byte * dat, int datLen) {
return &mutexes[stasis_linear_hash(dat, datLen, MUTEX_BITS, MUTEX_EXT)];
}
static pblHashTable_t * xidLockTable;
static pblHashTable_t * ridLockTable;
typedef struct {
pthread_cond_t writeOK;
pthread_cond_t readOK;
int readers;
int writers;
int waiting;
int active;
} lock;
void lockManagerInitHashed() {
int i = 0;
for(i = 0; i < MUTEX_COUNT; i++) {
pthread_mutex_init(&mutexes[i], NULL);
}
xidLockTable = pblHtCreate();
ridLockTable = pblHtCreate();
}
pblHashTable_t * lockManagerBeginTransactionUnlocked(int xid) {
pblHashTable_t * xidLocks = pblHtCreate();
pblHtInsert_r(xidLockTable, &xid, sizeof(int), xidLocks);
return xidLocks;
}
int lockManagerBeginTransaction(int xid) {
pthread_mutex_lock(&xid_table_mutex);
lockManagerBeginTransactionUnlocked(xid);
pthread_mutex_unlock(&xid_table_mutex);
return 0;
}
lock* createLock(byte * dat, int datLen) {
lock * ret = malloc(sizeof(lock));
if(!ret) { return NULL; }
// pthread_mutex_init(&ret->mut, NULL);
pthread_cond_init(&ret->writeOK, NULL);
pthread_cond_init(&ret->readOK, NULL);
ret->active = 0;
ret->readers = 0;
ret->writers = 0;
ret->waiting = 0;
pblHtInsert_r(ridLockTable, dat, datLen, ret);
return ret;
}
void destroyLock(byte * dat, int datLen, lock * l) {
pthread_cond_destroy(&l->writeOK);
pthread_cond_destroy(&l->readOK);
free (l);
pblHtRemove_r(ridLockTable, dat, datLen);
}
#define LM_READLOCK 1
#define LM_WRITELOCK 2
int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
long currentLockLevel = (long)pblHtLookup_r(xidLocks, dat, datLen);
// printf("xid %d read lock (%d)\n", xid, currentLockLevel);
if(currentLockLevel >= LM_READLOCK) {
pthread_mutex_unlock(&xid_table_mutex);
return 0;
}
assert(!currentLockLevel);
pthread_mutex_unlock(&xid_table_mutex);
pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen);
if(!ridLock) {
ridLock = createLock(dat, datLen);
}
ridLock->active++;
if(ridLock->writers || ridLock->waiting) {
struct timeval tv;
int tod_ret = gettimeofday (&tv, NULL);
tv.tv_sec++; // Wait up to one second to obtain a lock before detecting deadlock.
struct timespec ts;
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
if(tod_ret != 0) {
perror("Could not get time of day");
return LLADD_INTERNAL_ERROR;
}
do {
int wait_ret = pthread_cond_timedwait(&ridLock->readOK, mut, &ts);
if(wait_ret == ETIMEDOUT) {
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_DEADLOCK;
}
} while(ridLock->writers);
}
if(currentLockLevel < LM_READLOCK) {
ridLock->readers++;
pblHtRemove_r(xidLocks, dat, datLen);
pblHtInsert_r(xidLocks, dat, datLen, (void*)LM_READLOCK);
}
ridLock->active--;
pthread_mutex_unlock(mut);
return 0;
}
int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
long currentLockLevel = (long)pblHtLookup_r(xidLocks, dat, datLen);
// printf("xid %d write lock (%d)\n", xid, currentLockLevel);
int me = 0;
pthread_mutex_unlock(&xid_table_mutex);
if(currentLockLevel >= LM_WRITELOCK) {
return 0;
} else if(currentLockLevel == LM_READLOCK) {
me = 1;
}
pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen);
if(!ridLock) {
ridLock = createLock(dat, datLen);
}
ridLock->active++;
ridLock->waiting++;
if(ridLock->writers || (ridLock->readers - me)) {
struct timeval tv;
int tod_ret = gettimeofday(&tv, NULL);
tv.tv_sec++;
struct timespec ts;
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
if(tod_ret != 0) {
perror("Could not get time of day");
return LLADD_INTERNAL_ERROR;
}
while(ridLock->writers || (ridLock->readers - me)) {
int lockret = pthread_cond_timedwait(&ridLock->writeOK, mut, &ts);
if(lockret == ETIMEDOUT) {
ridLock->waiting--;
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_DEADLOCK;
}
}
}
ridLock->waiting--;
if(currentLockLevel == 0) {
ridLock->readers++;
ridLock->writers++;
} else if (currentLockLevel == LM_READLOCK) {
ridLock->writers++;
pblHtRemove_r(xidLocks, dat, datLen);
}
if(currentLockLevel != LM_WRITELOCK) {
pblHtInsert_r(xidLocks, dat, datLen, (void*)LM_WRITELOCK);
}
ridLock->active--;
pthread_mutex_unlock(mut);
return 0;
}
static int decrementLock(void * dat, int datLen, int currentLevel) {
// pthread_mutex_unlock(&xid_table_mutex);
pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen);
assert(ridLock);
ridLock->active++;
if(currentLevel == LM_WRITELOCK) {
ridLock->writers--;
ridLock->readers--;
} else if(currentLevel == LM_READLOCK) {
ridLock->readers--;
} else if(currentLevel == 0) {
assert(0); // Someone tried to release a lock they didn't own!
} else {
fprintf(stderr, "Unknown lock type encountered!");
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_INTERNAL_ERROR;
}
ridLock->active--;
if(!(ridLock->active || ridLock->waiting || ridLock->readers || ridLock->writers)) {
// printf("destroyed lock");
destroyLock(dat, datLen, ridLock);
} else {
// printf("(%d %d %d %d)", ridLock->active, ridLock->waiting, ridLock->readers, ridLock->writers);
}
pthread_mutex_unlock(mut);
return 0;
}
int lockManagerUnlockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
// printf("xid %d unlock\n", xid);
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
pthread_mutex_unlock(&xid_table_mutex);
long currentLevel = (long)pblHtLookup_r(xidLocks, dat, datLen);
assert(currentLevel);
pblHtRemove_r(xidLocks, dat, datLen);
decrementLock(dat, datLen, currentLevel);
return 0;
}
int lockManagerCommitHashed(int xid, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup_r(xidLockTable, &xid, sizeof(int));
pblHtRemove_r(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
pthread_mutex_unlock(&xid_table_mutex);
long currentLevel;
int ret = 0;
for(currentLevel = (long)pblHtFirst_r(xidLocks); currentLevel; currentLevel = (long)pblHtNext_r(xidLocks)) {
void * currentKey = pblHtCurrentKey_r(xidLocks);
int tmpret = decrementLock(currentKey, datLen, currentLevel);
// Pass any error(s) up to the user.
// (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR)
if(tmpret) {
ret = tmpret;
}
}
pblHtDelete(xidLocks);
return ret;
}
int lockManagerReadLockRecord(int xid, recordid rid) {
return lockManagerReadLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerWriteLockRecord(int xid, recordid rid) {
return lockManagerWriteLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerUnlockRecord(int xid, recordid rid) {
return lockManagerUnlockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerCommitRecords(int xid) {
return lockManagerCommitHashed(xid, sizeof(recordid));
}
int lockManagerReadLockPage(int xid, pageid_t p) {
return lockManagerReadLockHashed(xid, (byte*)&p, sizeof(p));
}
int lockManagerWriteLockPage(int xid, pageid_t p) {
return lockManagerWriteLockHashed(xid, (byte*)&p, sizeof(p));
}
int lockManagerUnlockPage(int xid, pageid_t p) {
return lockManagerUnlockHashed(xid, (byte*)&p, sizeof(p));
}
int lockManagerCommitPages(int xid) {
return lockManagerCommitHashed(xid, sizeof(pageid_t));
}
LockManagerSetup globalLockManager; LockManagerSetup globalLockManager;
void setupLockManagerCallbacksPage() {
globalLockManager.init = &lockManagerInitHashed;
globalLockManager.readLockPage = &lockManagerReadLockPage;
globalLockManager.writeLockPage = &lockManagerWriteLockPage;
globalLockManager.unlockPage = &lockManagerUnlockPage;
globalLockManager.readLockRecord = NULL;
globalLockManager.writeLockRecord = NULL;
globalLockManager.unlockRecord = NULL;
globalLockManager.commit = &lockManagerCommitPages;
globalLockManager.abort = &lockManagerCommitPages;
globalLockManager.begin = &lockManagerBeginTransaction;
globalLockManager.init();
}
void setupLockManagerCallbacksRecord () {
globalLockManager.init = &lockManagerInitHashed;
globalLockManager.readLockPage = NULL;
globalLockManager.writeLockPage = NULL;
globalLockManager.unlockPage = NULL;
globalLockManager.readLockRecord = &lockManagerReadLockRecord;
globalLockManager.writeLockRecord = &lockManagerWriteLockRecord;
globalLockManager.unlockRecord = &lockManagerUnlockRecord;
globalLockManager.commit = &lockManagerCommitRecords;
globalLockManager.abort = &lockManagerCommitRecords;
globalLockManager.begin = &lockManagerBeginTransaction;
globalLockManager.init();
}
void setupLockManagerCallbacksNil () { void setupLockManagerCallbacksNil () {
globalLockManager.init = NULL; globalLockManager.init = NULL;
globalLockManager.readLockPage = NULL; globalLockManager.readLockPage = NULL;
@ -399,3 +21,4 @@ void setupLockManagerCallbacksNil () {
globalLockManager.abort = NULL; globalLockManager.abort = NULL;
globalLockManager.begin = NULL; globalLockManager.begin = NULL;
} }

View file

@ -6,9 +6,9 @@
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
// The next two #includes are for deprecated code. // The next two #includes are for deprecated code.
#include <stasis/fifo.h> //#include <stasis/fifo.h>
#include <stasis/multiplexer.h> //#include <stasis/multiplexer.h>
#include <stasis/logger/logMemory.h> //#include <stasis/logger/logMemory.h>
/** /**
re-entrant implementation of a linear hash hable, using nested top actions. re-entrant implementation of a linear hash hable, using nested top actions.
@ -515,7 +515,8 @@ static int linearHashNTAIterator_value(int xid, void * impl, byte ** value) {
//--------------------------------- async hash operations happen below here //--------------------------------- async hash operations happen below here
typedef struct { /*
typedef struct {
int value_len; int value_len;
int key_len; int key_len;
} asyncHashInsert_t; } asyncHashInsert_t;
@ -594,3 +595,4 @@ lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
return mainFifo->consumer; return mainFifo->consumer;
} }
*/

View file

@ -117,7 +117,6 @@ void stasis_page_init(stasis_dirty_page_table_t * dpt) {
stasis_page_impl_register(stasis_page_boundary_tag_impl()); stasis_page_impl_register(stasis_page_boundary_tag_impl());
stasis_page_impl_register(arrayListImpl()); stasis_page_impl_register(arrayListImpl());
stasis_page_impl_register(stasis_page_blob_impl()); stasis_page_impl_register(stasis_page_blob_impl());
stasis_page_impl_register(lsmRootImpl());
stasis_page_impl_register(slottedLsnFreeImpl()); stasis_page_impl_register(slottedLsnFreeImpl());
stasis_page_impl_register(segmentImpl()); stasis_page_impl_register(segmentImpl());
stasis_page_impl_register(stasis_page_slotted_latch_free_impl()); stasis_page_impl_register(stasis_page_slotted_latch_free_impl());

View file

@ -5,14 +5,20 @@
* Author: sears * Author: sears
*/ */
#include <config.h>
#include <stasis/common.h> #include <stasis/common.h>
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/page/latchFree/lfSlotted.h> #include <stasis/page/latchFree/lfSlotted.h>
#ifdef HAVE_GCC_ATOMICS
#define CAS(_a,_o,_n) __sync_bool_compare_and_swap(_a,_o,_n) #define CAS(_a,_o,_n) __sync_bool_compare_and_swap(_a,_o,_n)
#define BARRIER() __sync_synchronize() #define BARRIER() __sync_synchronize()
#define FETCH_AND_ADD(_a, _i) __sync_fetch_and_add(_a, _i) #define FETCH_AND_ADD(_a, _i) __sync_fetch_and_add(_a, _i)
#else
#define CAS(_a,_o,_n) 1
#define BARRIER() abort()
#define FETCH_AND_ADD(_a, _i) 1
#endif
static int notSupported(int xid, Page * p) { return 0; } static int notSupported(int xid, Page * p) { return 0; }
static const byte* lfSlottedRead (int xid, Page *p, recordid rid) { static const byte* lfSlottedRead (int xid, Page *p, recordid rid) {

View file

@ -14,7 +14,7 @@
struct stasis_transaction_table_t { struct stasis_transaction_table_t {
int active_count; int active_count;
#ifndef HAVE_GCC_ATMOICS #ifndef HAVE_GCC_ATOMICS
/** /**
This mutex protects the rest of the struct This mutex protects the rest of the struct
xidCount. xidCount.

View file

@ -2,7 +2,6 @@
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/recovery.h> #include <stasis/recovery.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>
#include <stasis/consumer.h>
#include <stasis/lockManager.h> #include <stasis/lockManager.h>
#include <stasis/pageHandle.h> #include <stasis/pageHandle.h>
#include <stasis/page.h> #include <stasis/page.h>
@ -97,7 +96,6 @@ int Tinit() {
BtreeInit(); BtreeInit();
TlinkedListNTAInit(); TlinkedListNTAInit();
iterator_init(); iterator_init();
consumer_init();
setupLockManagerCallbacksNil(); setupLockManagerCallbacksNil();
stasis_recovery_initiate(stasis_log_file, stasis_transaction_table, stasis_alloc); stasis_recovery_initiate(stasis_log_file, stasis_transaction_table, stasis_alloc);

View file

@ -6,3 +6,5 @@ void arrayIterator_close(int xid, void * impl);
int arrayIterator_next (int xid, void * impl); int arrayIterator_next (int xid, void * impl);
int arrayIterator_key (int xid, void * impl, byte ** key); int arrayIterator_key (int xid, void * impl, byte ** key);
int arrayIterator_value(int xid, void * impl, byte ** value); int arrayIterator_value(int xid, void * impl, byte ** value);
void stasis_arrayCollection_init();

View file

@ -1,6 +1,6 @@
#include <stasis/iterator.h> #include <stasis/iterator.h>
#include <stasis/consumer.h> #include <stasis/experimental/consumer.h>
#ifndef __FIFO_H #ifndef __FIFO_H
#define __FIFO_H #define __FIFO_H
@ -41,4 +41,7 @@ int lladdFifoPool_iterator_key (int xid, void * it, byte ** key);
int lladdFifoPool_iterator_value (int xid, void * it, byte ** val); int lladdFifoPool_iterator_value (int xid, void * it, byte ** val);
void lladdFifoPool_iterator_tupleDone(int xid, void * it); void lladdFifoPool_iterator_tupleDone(int xid, void * it);
void lladdFifoPool_iterator_releaseLock(int xid, void * it); void lladdFifoPool_iterator_releaseLock(int xid, void * it);
void stasis_fifo_init();
#endif // __FIFO_H #endif // __FIFO_H

View file

@ -1,5 +1,5 @@
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/fifo.h> #include <stasis/experimental/fifo.h>
#ifndef __LLADD_GRAPH_H #ifndef __LLADD_GRAPH_H
#define __LLADD_GRAPH_H #define __LLADD_GRAPH_H

View file

@ -1,8 +1,8 @@
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/ringbuffer.h> #include <stasis/experimental/ringbuffer.h>
#include <stasis/consumer.h> #include <stasis/experimental/consumer.h>
#include <stasis/iterator.h> #include <stasis/iterator.h>
#include <stasis/fifo.h> #include <stasis/experimental/fifo.h>
#ifndef __LOGMEMORY_H #ifndef __LOGMEMORY_H
#define __LOGMEMORY_H #define __LOGMEMORY_H
@ -28,4 +28,6 @@ int logMemory_Iterator_key (int xid, void * impl, byte ** key);
int logMemory_Iterator_value (int xid, void * impl, byte ** value); int logMemory_Iterator_value (int xid, void * impl, byte ** value);
void logMemory_Iterator_releaseTuple(int xid, void *it); void logMemory_Iterator_releaseTuple(int xid, void *it);
void logMemory_Iterator_releaseLock (int xid, void * impl); void logMemory_Iterator_releaseLock (int xid, void * impl);
void stasis_logMemory_init();
#endif #endif

View file

@ -1,11 +1,13 @@
#ifndef _LSMITERATORS_H__ #ifndef _LSMITERATORS_H__
#define _LSMITERATORS_H__ #define _LSMITERATORS_H__
#include "stasis/page.h" #include <stasis/page.h>
#include "stasis/bufferManager.h" #include <stasis/bufferManager.h>
#include "stasis/page/compression/compression.h" #include <stasis/page/compression/compression.h>
#include "stasis/page/compression/tuple.h" #include <stasis/page/compression/tuple.h>
#include "stasis/operations.h" #include <stasis/operations.h>
#include "lsmTree.h"
/** /**
@file @file

View file

@ -18,6 +18,9 @@
#include <stasis/common.h> #include <stasis/common.h>
#include <stasis/iterator.h> #include <stasis/iterator.h>
#include <assert.h> #include <assert.h>
BEGIN_C_DECLS
typedef struct { typedef struct {
recordid treeRoot; recordid treeRoot;
recordid pos; recordid pos;
@ -153,4 +156,8 @@ static inline int lsmTreeIterator_value(int xid, lladdIterator_t *it,
} }
static inline void lsmTreeIterator_tupleDone(int xid, void *it) { } static inline void lsmTreeIterator_tupleDone(int xid, void *it) { }
static inline void lsmTreeIterator_releaseLock(int xid, void *it) { } static inline void lsmTreeIterator_releaseLock(int xid, void *it) { }
page_impl lsmRootImpl();
END_C_DECLS
#endif // _LSMTREE_H__ #endif // _LSMTREE_H__

View file

@ -1,4 +1,4 @@
#include "iterator.h" #include <stasis/iterator.h>
#include "consumer.h" #include "consumer.h"
#include "fifo.h" #include "fifo.h"

View file

@ -155,9 +155,7 @@ typedef struct {
#include "operations/linearHashNTA.h" #include "operations/linearHashNTA.h"
#include "operations/bTree.h" #include "operations/bTree.h"
#include "operations/regions.h" #include "operations/regions.h"
#include "operations/lsmTree.h"
#include "operations/lsnFreeSet.h" #include "operations/lsnFreeSet.h"
#include "operations/group.h"
#include "operations/segmentFile.h" #include "operations/segmentFile.h"
/** /**

View file

@ -36,6 +36,4 @@ void fixedPageInit();
void fixedPageDeinit(); void fixedPageDeinit();
page_impl fixedImpl(); page_impl fixedImpl();
page_impl arrayListImpl(); page_impl arrayListImpl();
// @todo move lsmTreeImpl() to lsmTree.h (but first, move page.h...)
page_impl lsmRootImpl();
#endif #endif

View file

@ -19,9 +19,9 @@ CREATE_CHECK(check_header)
CREATE_CHECK(check_linkedListNTA) CREATE_CHECK(check_linkedListNTA)
CREATE_CHECK(check_linearHashNTA) CREATE_CHECK(check_linearHashNTA)
CREATE_CHECK(check_pageOrientedList) CREATE_CHECK(check_pageOrientedList)
CREATE_CHECK(check_ringbuffer) CREATE_EXPERIMENTAL_CHECK(check_ringbuffer)
CREATE_CHECK(check_iterator) CREATE_EXPERIMENTAL_CHECK(check_iterator)
CREATE_CHECK(check_multiplexer) CREATE_EXPERIMENTAL_CHECK(check_multiplexer)
CREATE_CHECK(check_bTree) CREATE_CHECK(check_bTree)
CREATE_CHECK(check_regions) CREATE_CHECK(check_regions)
CREATE_CHECK(check_allocationPolicy) CREATE_CHECK(check_allocationPolicy)
@ -29,6 +29,6 @@ CREATE_CHECK(check_dirtyPageTable)
CREATE_CHECK(check_io) CREATE_CHECK(check_io)
CREATE_CHECK(check_rangeTracker) CREATE_CHECK(check_rangeTracker)
CREATE_CHECK(check_replacementPolicy) CREATE_CHECK(check_replacementPolicy)
CREATE_CHECK(check_lsmTree) CREATE_EXPERIMENTAL_CHECK(check_lsmTree)
CREATE_CHECK(check_groupBy) CREATE_EXPERIMENTAL_CHECK(check_groupBy)
CREATE_CHECK(check_boundedLog) CREATE_CHECK(check_boundedLog)

View file

@ -43,6 +43,7 @@ terms specified in this license.
#include "../check_includes.h" #include "../check_includes.h"
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/experimental/group.h>
#include <assert.h> #include <assert.h>
#include <limits.h> #include <limits.h>
@ -55,8 +56,10 @@ terms specified in this license.
START_TEST(groupBySmokeTest) { START_TEST(groupBySmokeTest) {
Tinit(); Tinit();
stasis_log_structured_group_init();
int xid = Tbegin(); int xid = Tbegin();
stasis_group_t * handle = TlogStructuredGroup(xid, 1024*1024*40);
stasis_group_t * handle = TlogStructuredGroup(xid, 1024*1024*40);
for(int i =0; i < 10000; i++) { for(int i =0; i < 10000; i++) {
for(int j = 0; j < 100; j++) { for(int j = 0; j < 100; j++) {
int val = i * 100 + j; int val = i * 100 + j;

View file

@ -42,7 +42,7 @@ terms specified in this license.
#include "../check_includes.h" #include "../check_includes.h"
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/arrayCollection.h> #include <stasis/experimental/arrayCollection.h>
#include <pbl/pbl.h> #include <pbl/pbl.h>
#include <stdlib.h> #include <stdlib.h>
@ -111,6 +111,7 @@ static void iterator_test(int xid,
START_TEST(iteratorTest) START_TEST(iteratorTest)
{ {
Tinit(); Tinit();
stasis_arrayCollection_init();
int xid = Tbegin(); int xid = Tbegin();
unsigned int keyArray[NUM_ENTRIES]; unsigned int keyArray[NUM_ENTRIES];
byte valueArray[NUM_ENTRIES]; byte valueArray[NUM_ENTRIES];

View file

@ -2,6 +2,8 @@
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/experimental/lsmTree.h>
#include <assert.h> #include <assert.h>
#include <limits.h> #include <limits.h>
#include <math.h> #include <math.h>
@ -31,6 +33,7 @@ void insertProbeIter(lsmkey_t NUM_ENTRIES) {
lsmTreeRegisterComparator(intcmp,cmp); lsmTreeRegisterComparator(intcmp,cmp);
TlsmRegionAllocConf_t alloc_conf = LSM_REGION_ALLOC_STATIC_INITIALIZER; TlsmRegionAllocConf_t alloc_conf = LSM_REGION_ALLOC_STATIC_INITIALIZER;
stasis_page_impl_register(lsmRootImpl());
Tinit(); Tinit();
int xid = Tbegin(); int xid = Tbegin();
recordid tree = TlsmCreate(xid, intcmp, recordid tree = TlsmCreate(xid, intcmp,

View file

@ -42,8 +42,10 @@ terms specified in this license.
#include "../check_includes.h" #include "../check_includes.h"
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/multiplexer.h> #include <stasis/experimental/multiplexer.h>
#include <stasis/logger/logMemory.h> #include <stasis/experimental/logMemory.h>
#include <stasis/experimental/group.h>
#include <stasis/experimental/arrayCollection.h>
#include <pbl/pbl.h> #include <pbl/pbl.h>
#include <sys/time.h> #include <sys/time.h>
@ -165,6 +167,11 @@ static void * go2( void * arg) {
START_TEST(multiplexTest) { START_TEST(multiplexTest) {
Tinit(); Tinit();
stasis_fifo_init();
stasis_log_structured_group_init();
stasis_arrayCollection_init();
stasis_logMemory_init();
consumer_init();
int xid = Tbegin(); int xid = Tbegin();
recordid hash = ThashCreate(xid, sizeof(lsn_t), VARIABLE_LENGTH); recordid hash = ThashCreate(xid, sizeof(lsn_t), VARIABLE_LENGTH);

View file

@ -41,7 +41,7 @@ terms specified in this license.
---*/ ---*/
#include "../check_includes.h" #include "../check_includes.h"
#include <stasis/ringbuffer.h> #include <stasis/experimental/ringbuffer.h>
#include <assert.h> #include <assert.h>
#include <sys/time.h> #include <sys/time.h>