remove compensations.h (This is a big change, and impacts lots of whitespace. I have left some indentation breakage in place in order to minimize the size of the diff)

This commit is contained in:
Sears Russell 2011-06-12 08:13:34 +00:00
parent bb89c5a0cf
commit c5a36c032d
35 changed files with 411 additions and 750 deletions

View file

@ -143,7 +143,7 @@ INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR}
LINK_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR}/src/stasis)
IF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" )
SET(COMMON_LIBRARIES stasis m pthread stdc++ ${DBUG})
SET(COMMON_LIBRARIES stasis m pthread stdc++ ${DBUG}) # profiler)
SET(CMAKE_C_FLAGS "-g -Wall -pedantic -std=gnu99 -DPBL_COMPAT -D_FILE_OFFSET_BITS=64 ${CMAKE_C_FLAGS}")
SET(CMAKE_CXX_FLAGS "-g -Wall -Wno-long-long -pedantic -DPBL_COMPAT -D_FILE_OFFSET_BITS=64 ${CMAKE_CXX_FLAGS}")
ELSEIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "SunPro" )

View file

@ -25,7 +25,6 @@ ADD_LIBRARY(stasis crc32.c redblack.c tsearchcompat.c lhtable.c concurrentHash.c
page/lsnFree.c
page/segment.c
page/latchFree/lfSlotted.c
compensations.c
operations/pageOperations.c
operations/decrement.c operations/increment.c
operations/prepare.c operations/set.c

View file

@ -20,9 +20,9 @@ static pthread_key_t lastPage;
#define RW 1
static void bufManBufDeinit();
static compensated_function Page *bufManLoadPage(stasis_buffer_manager_t *ignored, stasis_buffer_manager_handle_t* h, int xid, pageid_t pageid, pagetype_t type);
static compensated_function Page *bufManGetCachedPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid);
static compensated_function Page *bufManLoadUninitPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid);
static Page *bufManLoadPage(stasis_buffer_manager_t *ignored, stasis_buffer_manager_handle_t* h, int xid, pageid_t pageid, pagetype_t type);
static Page *bufManGetCachedPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid);
static Page *bufManLoadUninitPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid);
static void bufManReleasePage (stasis_buffer_manager_t *ignored, Page * p);
static void bufManSimulateBufferManagerCrash();
@ -349,7 +349,7 @@ static Page* bufManGetPage(int xid, pageid_t pageid, int locktype, int uninitial
return ret;
}
static compensated_function Page *bufManLoadPage(stasis_buffer_manager_t *ignored, stasis_buffer_manager_handle_t * hignored, int xid, const pageid_t pageid, pagetype_t type) {
static Page *bufManLoadPage(stasis_buffer_manager_t *ignored, stasis_buffer_manager_handle_t * hignored, int xid, const pageid_t pageid, pagetype_t type) {
Page * ret = pthread_getspecific(lastPage);
@ -385,7 +385,7 @@ static Page* bufManGetCachedPage(stasis_buffer_manager_t *ignored, int xid, cons
return bufManLoadPage(ignored, NULL, xid, pageid, UNKNOWN_TYPE_PAGE);
}
static compensated_function Page *bufManLoadUninitPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid) {
static Page *bufManLoadUninitPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid) {
Page * ret = pthread_getspecific(lastPage);

View file

@ -1,37 +0,0 @@
#include <stasis/compensations.h>
int ___compensation_count___ = 0;
#include <assert.h>
#include <stdio.h>
static pthread_key_t error_key;
void compensations_init () {
int ret = pthread_key_create(&error_key, NULL);
assert(!ret);
pthread_setspecific(error_key, NULL);
}
void compensations_deinit() {
int ret = pthread_key_delete(error_key);
assert(!ret);
}
long compensation_error() {
long error = (long) pthread_getspecific(error_key);
return error;
}
void compensation_clear_error() {
compensation_set_error(0);
}
void compensation_set_error(long error) {
int ret = pthread_setspecific(error_key, (void *)error);
if(ret) {
printf("Unhandled error: %s\n", strerror(ret));
abort();
}
assert(!ret);
}

View file

@ -23,9 +23,9 @@ void consumer_init() {
}
compensated_function int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t valSize) {
int Tconsumer_push(int xid, lladdConsumer_t *it, byte *key, size_t keySize, byte *val, size_t valSize) {
return consumers[it->type].push(xid, it->impl, key, keySize, val, valSize);
}
compensated_function void Tconsumer_close(int xid, lladdConsumer_t *it) {
void Tconsumer_close(int xid, lladdConsumer_t *it) {
consumers[it->type].close(xid, it->impl);
}

View file

@ -1,6 +1,5 @@
#include <pbl/pbl.h>
#include <stasis/lockManager.h>
#include <stasis/compensations.h>
#include <stasis/latches.h>
#include <stasis/hash.h>
@ -162,7 +161,6 @@ int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
if(wait_ret == ETIMEDOUT) {
ridLock->active--;
pthread_mutex_unlock(mut);
compensation_set_error(LLADD_DEADLOCK);
return LLADD_DEADLOCK;
}
} while(ridLock->writers);
@ -218,7 +216,6 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
ts.tv_nsec = tv.tv_usec * 1000;
if(tod_ret != 0) {
perror("Could not get time of day");
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}
while(ridLock->writers || (ridLock->readers - me)) {
@ -227,7 +224,6 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
ridLock->waiting--;
ridLock->active--;
pthread_mutex_unlock(mut);
compensation_set_error(LLADD_DEADLOCK);
return LLADD_DEADLOCK;
}
}
@ -333,10 +329,10 @@ int lockManagerCommitHashed(int xid, int datLen) {
return ret;
}
compensated_function int lockManagerReadLockRecord(int xid, recordid rid) {
int lockManagerReadLockRecord(int xid, recordid rid) {
return lockManagerReadLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
compensated_function int lockManagerWriteLockRecord(int xid, recordid rid) {
int lockManagerWriteLockRecord(int xid, recordid rid) {
return lockManagerWriteLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerUnlockRecord(int xid, recordid rid) {
@ -346,10 +342,10 @@ int lockManagerCommitRecords(int xid) {
return lockManagerCommitHashed(xid, sizeof(recordid));
}
compensated_function int lockManagerReadLockPage(int xid, pageid_t p) {
int lockManagerReadLockPage(int xid, pageid_t p) {
return lockManagerReadLockHashed(xid, (byte*)&p, sizeof(p));
}
compensated_function int lockManagerWriteLockPage(int xid, pageid_t p) {
int lockManagerWriteLockPage(int xid, pageid_t p) {
return lockManagerWriteLockHashed(xid, (byte*)&p, sizeof(p));
}
int lockManagerUnlockPage(int xid, pageid_t p) {

View file

@ -3,8 +3,6 @@
*/
#include <stasis/logger/logMemory.h>
#include <stasis/compensations.h>
#include <assert.h>
typedef struct {
@ -66,7 +64,7 @@ void logMemory_Iterator_close(int xid, void * impl) {
free(impl);
}
compensated_function int logMemory_Iterator_next (int xid, void * impl) {
int logMemory_Iterator_next (int xid, void * impl) {
logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl;
pthread_mutex_lock(&(fifo->readerMutex));
pthread_mutex_lock(&(fifo->mutex));
@ -91,7 +89,6 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
}
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -103,7 +100,6 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -116,7 +112,6 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -137,7 +132,7 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
.._next. The functionality should be broken into modules and
reused... */
compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
int logMemory_Iterator_tryNext (int xid, void * impl) {
logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl;
if(EBUSY == pthread_mutex_trylock(&(fifo->readerMutex))) {
return 0;
@ -163,7 +158,6 @@ compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -175,7 +169,6 @@ compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -188,7 +181,6 @@ compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -205,7 +197,7 @@ compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
}
compensated_function void logMemory_Iterator_releaseLock (int xid, void * impl) {
void logMemory_Iterator_releaseLock (int xid, void * impl) {
logMemory_fifo_t * fifo = (logMemory_fifo_t *) impl;
pthread_mutex_unlock(&(fifo->mutex));
@ -227,7 +219,7 @@ compensated_function void logMemory_Iterator_releaseLock (int xid, void * impl)
.._next. The functionality should be broken into modules and
reused... */
compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
logMemory_fifo_t *fifo = (logMemory_fifo_t *) impl;
pthread_mutex_lock(&(fifo->readerMutex));
pthread_mutex_lock(&(fifo->mutex));
@ -255,7 +247,6 @@ compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -267,7 +258,6 @@ compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -280,7 +270,6 @@ compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
@ -342,7 +331,6 @@ int logMemory_consumer_push(int xid, /*lladdConsumer_t * cons*/ void * it, byte
pthread_cond_wait(&(fifo->writeReady), &(fifo->mutex));
}
if(ret == -1) { // asked to append something longer than the buffer!
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}
while(-2 == ringBufferAppend( (fifo)->ringBuffer, val, valSize)) {
@ -350,7 +338,6 @@ int logMemory_consumer_push(int xid, /*lladdConsumer_t * cons*/ void * it, byte
}
if(ret == -1) { // asked to append something longer than the buffer!
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}

View file

@ -72,7 +72,7 @@ void * lladdMultiplexer_flush(lladdMultiplexer_t * m) {
Tconsumer_close(m->xid, m->fifoPool->dirtyPoolFifo->consumer);
}
*/
return (void*)compensation_error();
return (void*)0;
}
@ -112,7 +112,7 @@ void * multiplexer_worker(void * arg) {
Tconsumer_close(m->xid, m->fifoPool->dirtyPoolFifo->consumer);
}
return (void*)compensation_error();
return (void*)0;
}
/* ****************** END OF MULTIXPLEXER IMPLEMENTATION **************

View file

@ -195,11 +195,8 @@ void TarrayListDealloc(int xid, recordid rid) {
@todo this function calls pow(), which is horribly inefficient.
*/
compensated_function int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p;
try_ret(compensation_error()) {
p = loadPage(xid, rid.page);
} end_ret(compensation_error());
int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch, 0);
array_list_parameter_t alp
= array_list_read_parameter(xid, p);
@ -225,37 +222,37 @@ compensated_function int TarrayListExtend(int xid, recordid rid, int slots) {
tmp2.slot = 0;
tmp2.size = alp.size;
/* Iterate over the (small number) of indirection blocks that need to be updated */
try_ret(compensation_error()) {
for(pageid_t i = lastCurrentBlock+1; i <= lastNewBlock; i++) {
/* Alloc block i */
for(pageid_t i = lastCurrentBlock+1; i <= lastNewBlock; i++) {
/* Alloc block i */
#ifdef HAVE_POWL
pageid_t blockSize = alp.initialSize * powl(alp.multiplier, i);
pageid_t blockSize = alp.initialSize * powl(alp.multiplier, i);
#else
pageid_t blockSize = alp.initialSize * powf(alp.multiplier, i);
pageid_t blockSize = alp.initialSize * powf(alp.multiplier, i);
#endif
pageid_t newFirstPage = TpageAllocMany(xid, blockSize);
DEBUG("block %lld %lld %lld\n", (long long)i, (long long)newFirstPage, (long long)blockSize);
tmp.slot = i + FIRST_DATA_PAGE_OFFSET;
/* Iterate over the (large number) of new blocks, clearing their contents */
pageid_t newFirstPage = TpageAllocMany(xid, blockSize);
DEBUG("block %lld %lld %lld\n", (long long)i, (long long)newFirstPage, (long long)blockSize);
tmp.slot = i + FIRST_DATA_PAGE_OFFSET;
/* Iterate over the (large number) of new blocks, clearing their contents */
#ifdef ARRAY_LIST_OLD_ALLOC
// old way
{
for(pageid_t i = newFirstPage; i < newFirstPage + blockSize; i++) {
TinitializeFixedPage(xid, i, alp.size);
}
// old way
{
for(pageid_t i = newFirstPage; i < newFirstPage + blockSize; i++) {
TinitializeFixedPage(xid, i, alp.size);
}
#else
TinitializeFixedPageRange(xid, newFirstPage, blockSize, alp.size);
#endif
TsetRaw(xid,tmp,&newFirstPage);
DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage);
}
#else
TinitializeFixedPageRange(xid, newFirstPage, blockSize, alp.size);
#endif
TsetRaw(xid,tmp,&newFirstPage);
DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage);
}
tmp.slot = MAX_OFFSET_POSITION;
tmp.slot = MAX_OFFSET_POSITION;
pageid_t newMaxOffset = alp.maxOffset+slots;
TsetRaw(xid, tmp, &newMaxOffset);
pageid_t newMaxOffset = alp.maxOffset+slots;
TsetRaw(xid, tmp, &newMaxOffset);
} end_ret(compensation_error());
return 0;
}

View file

@ -59,39 +59,33 @@ void LinearHashNTADeinit() {
}
/* private methods... */
compensated_function static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh);
static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh);
/** @todo Remove defined HASH_INIT_ARRAY_LIST_COUNT */
#define HASH_INIT_ARRAY_LIST_COUNT (stasis_util_two_to_the(HASH_INIT_BITS))
#define HASH_INIT_ARRAY_LIST_MULT 2
compensated_function recordid ThashCreate(int xid, int keySize, int valueSize) {
recordid ThashCreate(int xid, int keySize, int valueSize) {
recordid hashHeader;
lladd_hash_header lhh;
memset(&lhh,0,sizeof(lhh));
try_ret(NULLRID) {
hashHeader = Talloc(xid, sizeof(lladd_hash_header));
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(recordid));
} else {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(stasis_linkedList_entry) + keySize + valueSize);
}
} end_ret(NULLRID);
try_ret(NULLRID) {
TarrayListExtend(xid, lhh.buckets, HASH_INIT_ARRAY_LIST_COUNT);
} end_ret(NULLRID);
hashHeader = Talloc(xid, sizeof(lladd_hash_header));
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(recordid));
} else {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(stasis_linkedList_entry) + keySize + valueSize);
}
TarrayListExtend(xid, lhh.buckets, HASH_INIT_ARRAY_LIST_COUNT);
int i;
recordid bucket = lhh.buckets;
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
for(i = 0; i < HASH_INIT_ARRAY_LIST_COUNT; i++) {
try_ret(NULLRID) {
recordid rid = TpagedListAlloc(xid);
bucket.slot = i;
Tset(xid, bucket, &rid);
// printf("paged list alloced at rid {%d %d %d}\n", rid.page, rid.slot, rid.size);
} end_ret(NULLRID);
recordid rid = TpagedListAlloc(xid);
bucket.slot = i;
Tset(xid, bucket, &rid);
// printf("paged list alloced at rid {%d %d %d}\n", rid.page, rid.slot, rid.size);
}
} else {
@ -111,18 +105,16 @@ compensated_function recordid ThashCreate(int xid, int keySize, int valueSize) {
lhh.nextSplit = 0;
lhh.bits = HASH_INIT_BITS;
lhh.numEntries = 0;
try_ret(NULLRID) {
Tset(xid, hashHeader, &lhh);
} end_ret(NULLRID);
Tset(xid, hashHeader, &lhh);
return hashHeader;
}
compensated_function void ThashDelete(int xid, recordid hash) {
void ThashDelete(int xid, recordid hash) {
abort();
}
compensated_function static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize);
compensated_function static int __ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize);
static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize);
static int __ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize);
/*typedef struct {
recordid hashHeader;
@ -135,8 +127,7 @@ typedef struct {
int valueSize;
} linearHash_remove_arg;*/
//compensated_function static int operateInsert(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
compensated_function static int op_linear_hash_insert(const LogEntry* e, Page* p) {
static int op_linear_hash_insert(const LogEntry* e, Page* p) {
const linearHash_remove_arg * args = stasis_log_entry_update_args_cptr(e);
recordid hashHeader = args->hashHeader;
int keySize = args->keySize;
@ -146,22 +137,21 @@ compensated_function static int op_linear_hash_insert(const LogEntry* e, Page* p
byte * key = (byte*)(args+1);
byte * value = ((byte*)(args+1))+ keySize;
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
__ThashInsert(e->xid, hashHeader, key, keySize, value, valueSize);
} compensate_ret(compensation_error());
pthread_mutex_lock(&linear_hash_mutex);
__ThashInsert(e->xid, hashHeader, key, keySize, value, valueSize);
pthread_mutex_unlock(&linear_hash_mutex);
return 0;
}
compensated_function static int op_linear_hash_remove(const LogEntry* e, Page* p) {
static int op_linear_hash_remove(const LogEntry* e, Page* p) {
const linearHash_insert_arg * args = stasis_log_entry_update_args_cptr(e);
recordid hashHeader = args->hashHeader;
int keySize = args->keySize;
byte * key = (byte*)(args + 1);
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
__ThashRemove(e->xid, hashHeader, key, keySize);
} compensate_ret(compensation_error());
pthread_mutex_lock(&linear_hash_mutex);
__ThashRemove(e->xid, hashHeader, key, keySize);
pthread_mutex_unlock(&linear_hash_mutex);
return 0;
}
@ -186,7 +176,7 @@ stasis_operation_impl stasis_op_impl_linear_hash_remove() {
return o;
}
compensated_function int ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize) {
int ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize) {
/* XXX slow, but doesn't generate any log entries unless the key exists */
int ret = ThashRemove(xid, hashHeader, key, keySize);
hashHeader.size = sizeof(lladd_hash_header);
@ -199,11 +189,10 @@ compensated_function int ThashInsert(int xid, recordid hashHeader, const byte* k
/** @todo MEMORY LEAK arg, handle on pthread_cancel.. */
void * handle;
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
handle = TbeginNestedTopAction(xid, OPERATION_LINEAR_HASH_INSERT, (byte*)arg, argSize);
free(arg);
__ThashInsert(xid, hashHeader, key, keySize, value, valueSize);
} end_action_ret(compensation_error());
handle = TbeginNestedTopAction(xid, OPERATION_LINEAR_HASH_INSERT, (byte*)arg, argSize);
free(arg);
__ThashInsert(xid, hashHeader, key, keySize, value, valueSize);
TendNestedTopAction(xid, handle);
@ -211,13 +200,12 @@ compensated_function int ThashInsert(int xid, recordid hashHeader, const byte* k
return ret;
}
compensated_function static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize) {
static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize) {
lladd_hash_header lhh;
try_ret(compensation_error()) {
Tread(xid, hashHeader, &lhh);
} end_ret(compensation_error());
Tread(xid, hashHeader, &lhh);
lhh.numEntries ++;
try_ret(compensation_error()) {
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
if(lhh.numEntries > (int)((double)(lhh.nextSplit
+ stasis_util_two_to_the(lhh.bits-1)) * (HASH_FILL_FACTOR))) {
@ -229,135 +217,122 @@ compensated_function static int __ThashInsert(int xid, recordid hashHeader, cons
ThashSplitBucket(xid, hashHeader, &lhh);
}
}
} end_ret(compensation_error());
recordid bucket = lhh.buckets;
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
int ret;
try_ret(compensation_error()) {
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
recordid bucketList;
Tread(xid, bucket, &bucketList);
Tread(xid, bucket, &bucketList);
// int before = TpagedListSpansPages(xid, bucketList);
ret = TpagedListRemove(xid, bucketList, key, keySize);
TpagedListInsert(xid, bucketList, key, keySize, value, valueSize);
// int before = TpagedListSpansPages(xid, bucketList);
ret = TpagedListRemove(xid, bucketList, key, keySize);
TpagedListInsert(xid, bucketList, key, keySize, value, valueSize);
// int after = TpagedListSpansPages(xid, bucketList);
// if(before != after) { // Page overflowed...
// T hashSplitBucket(xid, hashHeader, &lhh);
// T hashSplitBucket(xid, hashHeader, &lhh);
// }
// int after = TpagedListSpansPages(xid, bucketList);
// if(before != after) { // Page overflowed...
// T hashSplitBucket(xid, hashHeader, &lhh);
// T hashSplitBucket(xid, hashHeader, &lhh);
// }
} else {
assert(lhh.keySize == keySize); assert(lhh.valueSize == valueSize);
ret = TlinkedListRemove(xid, bucket, key, keySize);
TlinkedListInsert(xid, bucket, key, keySize, value, valueSize);
}
if(ret) { lhh.numEntries--; }
Tset(xid, hashHeader, &lhh);
} end_ret(compensation_error());
} else {
assert(lhh.keySize == keySize); assert(lhh.valueSize == valueSize);
ret = TlinkedListRemove(xid, bucket, key, keySize);
TlinkedListInsert(xid, bucket, key, keySize, value, valueSize);
}
if(ret) { lhh.numEntries--; }
Tset(xid, hashHeader, &lhh);
return ret;
}
compensated_function int ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize) {
int ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize) {
hashHeader.size = sizeof(lladd_hash_header);
byte * value;
int valueSize;
int ret;
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
valueSize = ThashLookup(xid, hashHeader, key, keySize, &value);
} end_action_ret(compensation_error());
pthread_mutex_lock(&linear_hash_mutex);
valueSize = ThashLookup(xid, hashHeader, key, keySize, &value);
if(valueSize == -1) {
pthread_mutex_unlock(&linear_hash_mutex);
return 0;
}
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
int argSize = sizeof(linearHash_remove_arg) + keySize + valueSize;
linearHash_remove_arg * arg = calloc(1,argSize);
arg->hashHeader = hashHeader;
arg->keySize = keySize;
arg->valueSize = valueSize;
memcpy(arg+1, key, keySize);
memcpy((byte*)(arg+1)+keySize, value, valueSize);
void * handle;
int argSize = sizeof(linearHash_remove_arg) + keySize + valueSize;
linearHash_remove_arg * arg = calloc(1,argSize);
arg->hashHeader = hashHeader;
arg->keySize = keySize;
arg->valueSize = valueSize;
memcpy(arg+1, key, keySize);
memcpy((byte*)(arg+1)+keySize, value, valueSize);
void * handle;
handle = TbeginNestedTopAction(xid, OPERATION_LINEAR_HASH_REMOVE, (byte*)arg, argSize);
free(arg);
free(value);
handle = TbeginNestedTopAction(xid, OPERATION_LINEAR_HASH_REMOVE, (byte*)arg, argSize);
free(arg);
free(value);
ret = __ThashRemove(xid, hashHeader, key, keySize);
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
ret = __ThashRemove(xid, hashHeader, key, keySize);
TendNestedTopAction(xid, handle);
pthread_mutex_unlock(&linear_hash_mutex);
return ret;
}
compensated_function static int __ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize) {
static int __ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize) {
int ret;
try_ret(compensation_error()) {
lladd_hash_header lhh;
Tread(xid, hashHeader, &lhh);
lhh.numEntries--;
Tset(xid, hashHeader, &lhh);
lladd_hash_header lhh;
Tread(xid, hashHeader, &lhh);
lhh.numEntries--;
Tset(xid, hashHeader, &lhh);
recordid bucket = lhh.buckets;
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
recordid bucket = lhh.buckets;
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &bucketList);
ret = TpagedListRemove(xid, bucketList, key, keySize);
} else {
if(lhh.keySize != keySize) { compensation_set_error(LLADD_INTERNAL_ERROR); }
assert(lhh.keySize == keySize);
ret = TlinkedListRemove(xid, bucket, key, keySize);
}
} end_ret(compensation_error());
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &bucketList);
ret = TpagedListRemove(xid, bucketList, key, keySize);
} else {
assert(lhh.keySize == keySize);
ret = TlinkedListRemove(xid, bucket, key, keySize);
}
return ret;
}
compensated_function int ThashLookup(int xid, recordid hashHeader, const byte * key, int keySize, byte ** value) {
int ThashLookup(int xid, recordid hashHeader, const byte * key, int keySize, byte ** value) {
lladd_hash_header lhh;
hashHeader.size = sizeof(lladd_hash_header);
int ret;
// This whole thing is safe since the callee's do not modify global state...
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
Tread(xid, hashHeader, &lhh);
pthread_mutex_lock(&linear_hash_mutex);
Tread(xid, hashHeader, &lhh);
recordid bucket = lhh.buckets;
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
recordid bucket = lhh.buckets;
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &bucketList);
ret = TpagedListFind(xid, bucketList, key, keySize, value);
} else {
assert(lhh.keySize == keySize);
ret = TlinkedListFind(xid, bucket, key, keySize, value);
}
} compensate_ret(compensation_error());
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &bucketList);
ret = TpagedListFind(xid, bucketList, key, keySize, value);
} else {
assert(lhh.keySize == keySize);
ret = TlinkedListFind(xid, bucket, key, keySize, value);
}
pthread_mutex_unlock(&linear_hash_mutex);
return ret;
}
compensated_function static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh) {
static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh) {
try {
long old_bucket = lhh->nextSplit;
long new_bucket = old_bucket + stasis_util_two_to_the(lhh->bits-1);
recordid old_bucket_rid = lhh->buckets;
@ -419,76 +394,70 @@ compensated_function static void ThashSplitBucket(int xid, recordid hashHeader,
}
TlinkedListClose(xid, it);
}
} end;
return;
}
lladd_hash_iterator * ThashIterator(int xid, recordid hashHeader, int keySize, int valueSize) {
hashHeader.size = sizeof(lladd_hash_header);
lladd_hash_iterator * it = calloc(1,sizeof(lladd_hash_iterator));
begin_action_ret(free, it, NULL) {
it->hashHeader = hashHeader;
lladd_hash_header lhh;
Tread(xid, hashHeader, &lhh);
it->bucket = lhh.buckets;
it->numBuckets = lhh.nextSplit + stasis_util_two_to_the(lhh.bits-1);
it->bucket.slot = 0;
/* it->keySize = keySize;
it->valueSize = valueSize;
assert(keySize == lhh.keySize);
assert(valueSize == lhh.valueSize); */
keySize = lhh.keySize;
it->keySize = lhh.keySize;
valueSize = lhh.valueSize;
it->valueSize = lhh.valueSize;
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
it->it = NULL;
recordid bucketList;
assert(it->bucket.size == sizeof(bucketList));
Tread(xid, it->bucket, &bucketList);
it->pit= TpagedListIterator(xid, bucketList);
} else {
it->pit = NULL;
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
}
} end_action_ret(NULL);
it->hashHeader = hashHeader;
lladd_hash_header lhh;
Tread(xid, hashHeader, &lhh);
it->bucket = lhh.buckets;
it->numBuckets = lhh.nextSplit + stasis_util_two_to_the(lhh.bits-1);
it->bucket.slot = 0;
/* it->keySize = keySize;
it->valueSize = valueSize;
assert(keySize == lhh.keySize);
assert(valueSize == lhh.valueSize); */
keySize = lhh.keySize;
it->keySize = lhh.keySize;
valueSize = lhh.valueSize;
it->valueSize = lhh.valueSize;
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
it->it = NULL;
recordid bucketList;
assert(it->bucket.size == sizeof(bucketList));
Tread(xid, it->bucket, &bucketList);
it->pit= TpagedListIterator(xid, bucketList);
} else {
it->pit = NULL;
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
}
return it;
}
int ThashNext(int xid, lladd_hash_iterator * it, byte ** key, int * keySize, byte** value, int * valueSize) {
try_ret(0) {
if(it->it) {
assert(!it->pit);
while(!TlinkedListNext(xid, it->it, key, keySize, value, valueSize)) {
if(compensation_error()) { return 0; }
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
TlinkedListClose(xid, it->it);
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
} else {
TlinkedListClose(xid, it->it);
it->it = 0;
return 0;
}
}
} else {
assert(it->pit);
while(!TpagedListNext(xid, it->pit, key, keySize, value, valueSize)) {
if(compensation_error()) { return 0; }
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
recordid bucketList;
Tread(xid, it->bucket, &bucketList);
TpagedListClose(xid,it->pit);
it->pit = TpagedListIterator(xid, bucketList);
} else {
TpagedListClose(xid,it->pit);
it->pit = 0;
return 0;
}
}
if(it->it) {
assert(!it->pit);
while(!TlinkedListNext(xid, it->it, key, keySize, value, valueSize)) {
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
TlinkedListClose(xid, it->it);
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
} else {
TlinkedListClose(xid, it->it);
it->it = 0;
return 0;
}
}
} end_ret(0);
} else {
assert(it->pit);
while(!TpagedListNext(xid, it->pit, key, keySize, value, valueSize)) {
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
recordid bucketList;
Tread(xid, it->bucket, &bucketList);
TpagedListClose(xid,it->pit);
it->pit = TpagedListIterator(xid, bucketList);
} else {
TpagedListClose(xid,it->pit);
it->pit = 0;
return 0;
}
}
}
return 1;
}

View file

@ -52,8 +52,8 @@ void TlinkedListNTADeinit() {
}
compensated_function static void stasis_linked_list_insert_helper(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
compensated_function static int stasis_linked_list_remove_helper(int xid, recordid list, const byte * key, int keySize);
static void stasis_linked_list_insert_helper(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
static int stasis_linked_list_remove_helper(int xid, recordid list, const byte * key, int keySize);
typedef struct {
recordid list;
int keySize;
@ -64,7 +64,7 @@ typedef struct {
int valueSize;
} stasis_linked_list_remove_log;
compensated_function static int op_linked_list_nta_insert(const LogEntry* e, Page* p) {
static int op_linked_list_nta_insert(const LogEntry* e, Page* p) {
assert(!p);
const stasis_linked_list_remove_log * log = stasis_log_entry_update_args_cptr(e);;
@ -76,18 +76,17 @@ compensated_function static int op_linked_list_nta_insert(const LogEntry* e, Pag
valueSize = log->valueSize;
key = (byte*)(log+1);
value = ((byte*)(log+1))+keySize;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
pthread_mutex_lock(&stasis_linked_list_mutex);
// printf("Operate insert called: rid.page = %d keysize = %d valuesize = %d %d {%d %d %d}\n", rid.page, log->keySize, log->valueSize, *(int*)key, value->page, value->slot, value->size);
// Skip writing the undo! Recovery will write a CLR after we're done, effectively
// wrapping this in a nested top action, so we needn't worry about that either.
stasis_linked_list_insert_helper(e->xid, log->list, key, keySize, value, valueSize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
stasis_linked_list_insert_helper(e->xid, log->list, key, keySize, value, valueSize);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return 0;
}
compensated_function static int op_linked_list_nta_remove(const LogEntry *e, Page* p) {
static int op_linked_list_nta_remove(const LogEntry *e, Page* p) {
assert(!p);
const stasis_linked_list_remove_log * log = stasis_log_entry_update_args_cptr(e);
@ -96,17 +95,16 @@ compensated_function static int op_linked_list_nta_remove(const LogEntry *e, Pag
keySize = log->keySize;
key = (byte*)(log+1);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
// printf("Operate remove called: %d\n", *(int*)key);
// Don't call the version that writes an undo entry!
stasis_linked_list_remove_helper(e->xid, log->list, key, keySize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
pthread_mutex_lock(&stasis_linked_list_mutex);
// printf("Operate remove called: %d\n", *(int*)key);
// Don't call the version that writes an undo entry!
stasis_linked_list_remove_helper(e->xid, log->list, key, keySize);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return 0;
}
compensated_function int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int ret = 0;
/* try_ret(compensation_error()) {
ret = TlinkedListRemove(xid, list, key, keySize);
@ -117,16 +115,16 @@ compensated_function int TlinkedListInsert(int xid, recordid list, const byte *
undoLog->list = list;
undoLog->keySize = keySize;
memcpy(undoLog+1, key, keySize);
pthread_mutex_lock(&stasis_linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
(byte*)undoLog, sizeof(stasis_linked_list_insert_log) + keySize);
free(undoLog);
stasis_linked_list_insert_helper(xid, list, key, keySize, value, valueSize);
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
pthread_mutex_lock(&stasis_linked_list_mutex);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
(byte*)undoLog, sizeof(stasis_linked_list_insert_log) + keySize);
free(undoLog);
stasis_linked_list_insert_helper(xid, list, key, keySize, value, valueSize);
TendNestedTopAction(xid, handle);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}
@ -151,11 +149,9 @@ stasis_operation_impl stasis_op_impl_linked_list_remove() {
};
return o;
}
compensated_function static void stasis_linked_list_insert_helper(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
static void stasis_linked_list_insert_helper(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
//int ret = Tli nkedListRemove(xid, list, key, keySize);
try {
stasis_linkedList_entry * entry = malloc(sizeof(stasis_linkedList_entry) + keySize + valueSize);
Tread(xid, list, entry);
@ -178,17 +174,14 @@ compensated_function static void stasis_linked_list_insert_helper(int xid, recor
free(newEntry);
}
free(entry);
} end;
}
compensated_function int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
stasis_linkedList_entry * entry = malloc(list.size);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, -2) {
pthread_mutex_lock(&stasis_linked_list_mutex);
Tread(xid, list, entry);
} end_action_ret(-2);
if(!entry->next.size) {
free(entry);
@ -198,26 +191,27 @@ compensated_function int TlinkedListFind(int xid, recordid list, const byte * ke
int done = 0;
int ret = -1;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, -2) {
while(!done) {
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
int valueSize = list.size - (sizeof(stasis_linkedList_entry) + keySize);
*value = malloc(valueSize);
memcpy(*value, ((byte*)(entry+1))+keySize, valueSize);
done = 1;
ret = valueSize;
}
if(entry->next.size != -1) {
assert(entry->next.size == list.size); // Don't handle lists with variable length records for now
Tread(xid, entry->next, entry);
} else {
done = 1;
}
while(!done) {
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
int valueSize = list.size - (sizeof(stasis_linkedList_entry) + keySize);
*value = malloc(valueSize);
memcpy(*value, ((byte*)(entry+1))+keySize, valueSize);
done = 1;
ret = valueSize;
}
free(entry);
} compensate_ret(-2);
if(entry->next.size != -1) {
assert(entry->next.size == list.size); // Don't handle lists with variable length records for now
Tread(xid, entry->next, entry);
} else {
done = 1;
}
}
free(entry);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}
@ -225,51 +219,49 @@ compensated_function int TlinkedListFind(int xid, recordid list, const byte * ke
compensated_function int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
byte * value;
int valueSize;
pthread_mutex_lock(&stasis_linked_list_mutex);
int ret;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
ret = TlinkedListFind(xid, list, key, keySize, &value);
} end_action_ret(compensation_error());
ret = TlinkedListFind(xid, list, key, keySize, &value);
if(ret != -1) {
valueSize = ret;
} else {
pthread_mutex_unlock(&stasis_linked_list_mutex);
return 0;
}
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
int entrySize = sizeof(stasis_linked_list_remove_log) + keySize + valueSize;
stasis_linked_list_remove_log * undoLog = malloc(entrySize);
undoLog->list = list;
undoLog->keySize = keySize;
undoLog->valueSize = valueSize;
int entrySize = sizeof(stasis_linked_list_remove_log) + keySize + valueSize;
stasis_linked_list_remove_log * undoLog = malloc(entrySize);
memcpy(undoLog+1, key, keySize);
memcpy(((byte*)(undoLog+1))+keySize, value, valueSize);
// printf("entry size %d sizeof(remove_log)%d keysize %d valuesize %d sizeof(rid) %d key %d value {%d %d %ld}\n",
// entrySize, sizeof(stasis_linked_list_remove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
(byte*)undoLog, entrySize);
free(value);
free(undoLog);
stasis_linked_list_remove_helper(xid, list, key, keySize);
undoLog->list = list;
undoLog->keySize = keySize;
undoLog->valueSize = valueSize;
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
memcpy(undoLog+1, key, keySize);
memcpy(((byte*)(undoLog+1))+keySize, value, valueSize);
// printf("entry size %d sizeof(remove_log)%d keysize %d valuesize %d sizeof(rid) %d key %d value {%d %d %ld}\n",
// entrySize, sizeof(stasis_linked_list_remove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
(byte*)undoLog, entrySize);
free(value);
free(undoLog);
stasis_linked_list_remove_helper(xid, list, key, keySize);
TendNestedTopAction(xid, handle);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return 1;
}
compensated_function static int stasis_linked_list_remove_helper(int xid, recordid list, const byte * key, int keySize) {
static int stasis_linked_list_remove_helper(int xid, recordid list, const byte * key, int keySize) {
stasis_linkedList_entry * entry = malloc(list.size);
pthread_mutex_lock(&stasis_linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
Tread(xid, list, entry);
} end_action_ret(compensation_error());
Tread(xid, list, entry);
if(entry->next.size == 0) {
//Empty List.
@ -283,10 +275,7 @@ compensated_function static int stasis_linked_list_remove_helper(int xid, record
oldLastRead.size = -2;
int ret = 0;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
while(1) {
if(compensation_error()) { break; }
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
if(listRoot) {
@ -328,15 +317,14 @@ compensated_function static int stasis_linked_list_remove_helper(int xid, record
}
}
free(entry);
} compensate_ret(compensation_error());
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}
/*** @todo TlinkedListMove could be much faster, but this is good enough for a first pass */
compensated_function int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize) {
int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize) {
byte * value = 0;
int ret;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
int valueSize = TlinkedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
@ -351,22 +339,21 @@ compensated_function int TlinkedListMove(int xid, recordid start_list, recordid
ret = 1;
}
if(value) { free(value); }
} compensate_ret(compensation_error());
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}
compensated_function recordid TlinkedListCreate(int xid, int keySize, int valueSize) {
recordid TlinkedListCreate(int xid, int keySize, int valueSize) {
recordid ret;
try_ret(NULLRID) {
ret = Talloc(xid, sizeof(stasis_linkedList_entry) + keySize + valueSize);
byte * cleared = calloc(sizeof(stasis_linkedList_entry) + keySize + valueSize, sizeof(byte));
Tset(xid, ret, cleared);
free(cleared);
} end_ret(NULLRID);
ret = Talloc(xid, sizeof(stasis_linkedList_entry) + keySize + valueSize);
byte * cleared = calloc(sizeof(stasis_linkedList_entry) + keySize + valueSize, sizeof(byte));
Tset(xid, ret, cleared);
free(cleared);
return ret;
}
compensated_function void TlinkedListDelete(int xid, recordid list) {
try {
void TlinkedListDelete(int xid, recordid list) {
stasis_linkedList_entry * entry = malloc(list.size);
Tread(xid, list, entry);
@ -377,7 +364,6 @@ compensated_function void TlinkedListDelete(int xid, recordid list) {
}
while(entry->next.size != -1) {
if(compensation_error()) { break; }
recordid nextEntry;
Tread(xid, nextEntry, entry);
assert(!memcmp(&nextEntry, &(entry->next), sizeof(recordid)));
@ -385,10 +371,9 @@ compensated_function void TlinkedListDelete(int xid, recordid list) {
}
free(entry);
} end;
}
compensated_function stasis_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize) {
stasis_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize) {
stasis_linkedList_iterator * it = malloc(sizeof(stasis_linkedList_iterator));
it->keySize = keySize;
it->valueSize = valueSize;
@ -400,7 +385,7 @@ compensated_function stasis_linkedList_iterator * TlinkedListIterator(int xid, r
void TlinkedListClose(int xid, stasis_linkedList_iterator * it) {
free(it);
}
compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * it, byte ** key, int * keySize, byte **value, int * valueSize) {
int TlinkedListNext(int xid, stasis_linkedList_iterator * it, byte ** key, int * keySize, byte **value, int * valueSize) {
if(it->next.size == -1) {
return 0;
@ -409,8 +394,8 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
int done = 0;
int ret = 0;
stasis_linkedList_entry * entry;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
pthread_mutex_lock(&stasis_linked_list_mutex);
if(it->first == -1) {
it->first = 1;
@ -433,20 +418,16 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
it->first = 0;
}
}
} end_action_ret(compensation_error());
if(done) {
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
assert(it->keySize + it->valueSize + sizeof(stasis_linkedList_entry) == it->next.size);
entry = malloc(it->next.size);
Tread(xid, it->next, entry);
} end_action_ret(compensation_error());
if(entry->next.size) {
*keySize = it->keySize;
*valueSize = it->valueSize;

View file

@ -43,29 +43,27 @@ static int findInBucket(int xid, recordid hashRid, int bucket_number, const void
static int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize) {
int found;
try_ret(compensation_error()) {
hashEntry * e = malloc(sizeof(hashEntry) + keySize + valSize);
recordid nextEntry;
hashEntry * e = malloc(sizeof(hashEntry) + keySize + valSize);
hashRid.slot = bucket_number;
nextEntry = hashRid;
recordid nextEntry;
found = 0;
hashRid.slot = bucket_number;
nextEntry = hashRid;
while(nextEntry.size != -1 && nextEntry.size != 0) {
if(compensation_error()) { break; }
assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize) && e->next.size != 0) {
memcpy(val, ((byte*)(e+1))+keySize, valSize);
found = 1;
break;
}
nextEntry = e->next;
found = 0;
while(nextEntry.size != -1 && nextEntry.size != 0) {
assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize) && e->next.size != 0) {
memcpy(val, ((byte*)(e+1))+keySize, valSize);
found = 1;
break;
}
free(e);
} end_ret(compensation_error());
nextEntry = e->next;
}
free(e);
return found;
}
@ -79,44 +77,38 @@ static void expand(int xid, recordid hash, int next_split, int i, int keySize, i
#define AMORTIZE 1000
#define FF_AM 750
if(count <= 0 && !(count * -1) % FF_AM) {
try {
recordid * headerRidB = pblHtLookup(openHashes, &(hash.page), sizeof(hash.page));
int j;
TarrayListExtend(xid, hash, AMORTIZE);
for(j = 0; j < AMORTIZE; j++) {
if(compensation_error()) { break; }
if(next_split >= stasis_util_two_to_the(i-1)+2) {
i++;
next_split = 2;
}
rehash(xid, hash, next_split, i, keySize, valSize);
next_split++;
headerNextSplit = next_split;
headerHashBits = i;
recordid * headerRidB = pblHtLookup(openHashes, &(hash.page), sizeof(hash.page));
int j;
TarrayListExtend(xid, hash, AMORTIZE);
for(j = 0; j < AMORTIZE; j++) {
if(next_split >= stasis_util_two_to_the(i-1)+2) {
i++;
next_split = 2;
}
update_hash_header(xid, hash, i, next_split);
} end;
rehash(xid, hash, next_split, i, keySize, valSize);
next_split++;
headerNextSplit = next_split;
headerHashBits = i;
}
update_hash_header(xid, hash, i, next_split);
}
}
static void update_hash_header(int xid, recordid hash, pageid_t i, pageid_t next_split) {
try {
hashEntry * he = pblHtLookup(openHashes, &(hash.page), sizeof(hash.page));
assert(he);
recordid * headerRidB = &he->next;
hashEntry * he = pblHtLookup(openHashes, &(hash.page), sizeof(hash.page));
assert(he);
recordid * headerRidB = &he->next;
assert(headerRidB);
assert(headerRidB);
headerHashBits = i;
headerNextSplit = next_split;
hash.slot = 1;
headerHashBits = i;
headerNextSplit = next_split;
hash.slot = 1;
Tset(xid, hash, headerRidB);
} end;
Tset(xid, hash, headerRidB);
}
static void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, unsigned int keySize, unsigned int valSize) {
try {
int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
@ -250,7 +242,6 @@ static void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, u
free(D_contents);
free(A_contents);
free(B_contents);
} end;
}
static void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete) {
@ -326,7 +317,6 @@ static int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry
memcpy(B, bucket_contents, sizeof(hashEntry) + keySize + valSize);
Baddr = this;
while(B->next.size != -1) {
if(compensation_error()) { break; } // guard the asserts below.
hashEntry * tmp = A;
A = B;
Aaddr = Baddr;
@ -340,7 +330,7 @@ static int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry
assert(Aaddr.size == sizeof(hashEntry) + keySize + valSize);
Tset(xid, Aaddr, A);
if(deletedEntry) {
*deletedEntry = Baddr;
*deletedEntry = Baddr;
}
found = 1;
break;

View file

@ -31,19 +31,14 @@ static int op_page_set_range_inverse(const LogEntry* e, Page* p) {
return 0;
}
compensated_function int TpageGet(int xid, pageid_t page, void *memAddr) {
Page * q = 0;
try_ret(compensation_error()) {
q = loadPage(xid, page);
memcpy(memAddr, q->memAddr, PAGE_SIZE);
} end_ret(compensation_error());
try_ret(compensation_error()) {
releasePage(q);
} end_ret(compensation_error());
int TpageGet(int xid, pageid_t page, void *memAddr) {
Page * q = loadPage(xid, page);
memcpy(memAddr, q->memAddr, PAGE_SIZE);
releasePage(q);
return 0;
}
compensated_function int TpageSet(int xid, pageid_t page, const void * memAddr) {
int TpageSet(int xid, pageid_t page, const void * memAddr) {
return TpageSetRange(xid, page, 0, memAddr, PAGE_SIZE);
}
@ -59,9 +54,7 @@ int TpageSetRange(int xid, pageid_t page, int offset, const void * memAddr, int
releasePage(p);
try_ret(compensation_error()) {
Tupdate(xid,page,logArg,sizeof(int)+len*2,OPERATION_PAGE_SET_RANGE);
} end_ret(compensation_error());
Tupdate(xid,page,logArg,sizeof(int)+len*2,OPERATION_PAGE_SET_RANGE);
free(logArg);
return 0;
@ -74,7 +67,7 @@ int TpageSetRange(int xid, pageid_t page, int offset, const void * memAddr, int
This calls loadPage and releasePage directly, and bypasses the
logger.
*/
compensated_function void pageOperationsInit(stasis_log_t *log) {
void pageOperationsInit(stasis_log_t *log) {
regionsInit(log);
@ -85,13 +78,12 @@ compensated_function void pageOperationsInit(stasis_log_t *log) {
}
compensated_function int TpageDealloc(int xid, pageid_t page) {
int TpageDealloc(int xid, pageid_t page) {
TregionDealloc(xid, page); // @todo inefficient hack!
return 0;
}
compensated_function pageid_t TpageAlloc(int xid) {
pageid_t TpageAlloc(int xid) {
return TregionAlloc(xid, 1, STORAGE_MANAGER_NAIVE_PAGE_ALLOC);
}

View file

@ -7,22 +7,20 @@ typedef struct {
short keySize;
} pagedListEntry;
compensated_function recordid TpagedListAlloc(int xid) {
recordid TpagedListAlloc(int xid) {
recordid ret;
try_ret(NULLRID) {
ret = Talloc(xid, sizeof(pagedListHeader));
pagedListHeader header;
memset(&header,0,sizeof(header));
header.thisPage = 0;
header.nextPage = NULLRID;
Tset(xid, ret, &header);
} end_ret(NULLRID);
ret = Talloc(xid, sizeof(pagedListHeader));
pagedListHeader header;
memset(&header,0,sizeof(header));
header.thisPage = 0;
header.nextPage = NULLRID;
Tset(xid, ret, &header);
return ret;
}
compensated_function int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int ret;
try_ret(compensation_error()) {
pagedListHeader header;
Tread(xid, list, &header);
recordid headerRid = list;
@ -38,7 +36,6 @@ compensated_function int TpagedListInsert(int xid, recordid list, const byte * k
// When the loop completes, header will contain the contents of the page header the entry will be inserted into,
// headerrid will contain the rid of that header, and rid will contain the newly allocated recordid
while(rid.size == -1) {
if(compensation_error()) { break; }
if(header.nextPage.size == -1) {
// We're at the end of the list
@ -76,12 +73,10 @@ compensated_function int TpagedListInsert(int xid, recordid list, const byte * k
Tset(xid, headerRid, &header);
free(dat);
} end_ret(compensation_error());
return ret;
}
compensated_function int TpagedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
try_ret(compensation_error()) {
int TpagedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
pagedListHeader header;
Tread(xid, list, &header);
@ -91,12 +86,10 @@ compensated_function int TpagedListFind(int xid, recordid list, const byte * key
rid.size = 0;
while(rid.slot || header.nextPage.size != -1) {
if(compensation_error()) { break; }
if(rid.slot) {
rid.size = TrecordSize(xid, rid);
pagedListEntry * dat;
if(compensation_error()) { break; }
dat = malloc(rid.size);
Tread(xid, rid, dat);
@ -115,14 +108,12 @@ compensated_function int TpagedListFind(int xid, recordid list, const byte * key
rid.slot = header.thisPage;
}
}
} end_ret(compensation_error());
return -1;
}
compensated_function int TpagedListRemove(int xid, recordid list, const byte * key, int keySize) {
int TpagedListRemove(int xid, recordid list, const byte * key, int keySize) {
pagedListHeader header;
int ret = 0;
try_ret(compensation_error()) {
Tread(xid, list, &header);
recordid headerRid;
@ -132,10 +123,8 @@ compensated_function int TpagedListRemove(int xid, recordid list, const byte * k
short lastSlot = -1;
headerRid = list;
while(rid.slot || header.nextPage.size != -1) {
if(compensation_error()) { break; }
if(rid.slot) {
rid.size = TrecordSize(xid, rid);
if(compensation_error()) { break; };
pagedListEntry * dat = malloc(rid.size);
Tread(xid, rid, dat);
@ -145,7 +134,6 @@ compensated_function int TpagedListRemove(int xid, recordid list, const byte * k
recordid lastRid = rid;
lastRid.slot = lastSlot;
lastRid.size = TrecordSize(xid, lastRid);
if(compensation_error()) { free(dat); break; }
pagedListEntry * lastRidBuf = malloc(lastRid.size);
Tread(xid, lastRid, lastRidBuf);
lastRidBuf->nextEntry = dat->nextEntry;
@ -171,14 +159,12 @@ compensated_function int TpagedListRemove(int xid, recordid list, const byte * k
rid.slot = header.thisPage;
}
}
} end_ret(compensation_error());
return ret;
}
compensated_function int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte * key, int keySize) {
int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte * key, int keySize) {
byte * value = NULL;
int ret;
try_ret(compensation_error()) {
int valueSize = TpagedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
ret = TpagedListRemove(xid, start_list, key, keySize);
@ -190,16 +176,13 @@ compensated_function int TpagedListMove(int xid, recordid start_list, recordid e
} else {
ret = 0;
}
} end_ret(compensation_error());
return ret;
}
compensated_function lladd_pagedList_iterator * TpagedListIterator(int xid, recordid list) {
lladd_pagedList_iterator * TpagedListIterator(int xid, recordid list) {
pagedListHeader header;
assert(list.size == sizeof(pagedListHeader));
try_ret(NULL) {
Tread(xid, list, &header);
} end_ret(NULL);
lladd_pagedList_iterator * it = malloc(sizeof(lladd_pagedList_iterator));
@ -213,20 +196,16 @@ compensated_function lladd_pagedList_iterator * TpagedListIterator(int xid, reco
void TpagedListClose(int xid, lladd_pagedList_iterator * it) {
free(it);
}
compensated_function int TpagedListNext(int xid, lladd_pagedList_iterator * it,
int TpagedListNext(int xid, lladd_pagedList_iterator * it,
byte ** key, int * keySize,
byte ** value, int * valueSize) {
while(it->entryRid.slot || it->headerRid.size != -1) {
if(it->entryRid.slot) {
try_ret(compensation_error()) {
it->entryRid.size = TrecordSize(xid, it->entryRid);
} end_ret(compensation_error());
it->entryRid.size = TrecordSize(xid, it->entryRid);
assert(it->entryRid.size != -1);
pagedListEntry * entry = malloc(it->entryRid.size);
begin_action_ret(free, entry, compensation_error()) {
Tread(xid, it->entryRid, entry);
} end_action_ret(compensation_error());
Tread(xid, it->entryRid, entry);
*keySize = entry->keySize;
*valueSize = it->entryRid.size - *keySize - sizeof(pagedListEntry);
@ -245,9 +224,7 @@ compensated_function int TpagedListNext(int xid, lladd_pagedList_iterator * it,
} else { // move to next page.
pagedListHeader header;
try_ret(compensation_error()) {
Tread(xid, it->headerRid, &header);
} end_ret(compensation_error());
Tread(xid, it->headerRid, &header);
it->entryRid.page = it->headerRid.page;
it->headerRid = header.nextPage;
it->entryRid.slot = header.thisPage;

View file

@ -192,12 +192,8 @@ static int op_set_range_inverse(const LogEntry* e, Page* p) {
free(tmp);
return 0;
}
compensated_function void TsetRange(int xid, recordid rid, int offset, int length, const void * dat) {
Page * p;
try {
p = loadPage(xid, rid.page);
} end;
void TsetRange(int xid, recordid rid, int offset, int length, const void * dat) {
Page * p = loadPage(xid, rid.page);
/// XXX rewrite without malloc (use read_begin, read_done)
set_range_t * range = malloc(sizeof(set_range_t) + 2 * length);

View file

@ -73,7 +73,6 @@ terms specified in this license.
#include <stasis/constants.h>
#include <stasis/blobManager.h>
#include <stasis/lockManager.h>
#include <stasis/compensations.h>
#include <stasis/page/slotted.h>
#include <stasis/page/fixed.h>
#include <stasis/page/uninitialized.h>

View file

@ -4,7 +4,6 @@
#include <stasis/bufferManager.h>
#include <stasis/consumer.h>
#include <stasis/lockManager.h>
#include <stasis/compensations.h>
#include <stasis/pageHandle.h>
#include <stasis/page.h>
#include <stasis/transactionTable.h>
@ -76,8 +75,6 @@ stasis_log_t* stasis_log_default_factory() {
int Tinit() {
stasis_initted = 1;
compensations_init();
stasis_operation_table_init();
stasis_transaction_table = stasis_transaction_table_init();
@ -215,11 +212,10 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page,
assert(stasis_transaction_table_is_active(stasis_transaction_table, xid));
Page * p = loadPage(xid, page);
assert(p);
try {
if(globalLockManager.writeLockPage) {
globalLockManager.writeLockPage(xid, p->id);
}
} end;
if(globalLockManager.writeLockPage) {
globalLockManager.writeLockPage(xid, p->id);
}
pthread_mutex_lock(&h->mut);
@ -299,20 +295,15 @@ Page * TreadWithPage(int xid, recordid rid, Page *p, void * dat) {
return p;
}
compensated_function void Tread(int xid, recordid rid, void * dat) {
void Tread(int xid, recordid rid, void * dat) {
Page * p;
try {
p = loadPage(xid, rid.page);
} end;
p = loadPage(xid, rid.page);
releasePage( TreadWithPage(xid, rid, p, dat) );
}
compensated_function void TreadRaw(int xid, recordid rid, void * dat) {
Page * p;
try {
p = loadPage(xid, rid.page);
} end;
void TreadRaw(int xid, recordid rid, void * dat) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
stasis_record_read(xid, p, rid, dat);
unlock(p->rwlatch);

View file

@ -29,10 +29,10 @@ lsn_t __real_TwritebackUpdate(int xid, pageid_t page,
void __real_TreorderableWritebackUpdate(int xid, void* h,
pageid_t page, const void * dat,
size_t datlen, int op)LINKER_STUB
compensated_function void __real_Tread(int xid, recordid rid, void *dat)LINKER_STUB
void __real_Tread(int xid, recordid rid, void *dat)LINKER_STUB
Page * __real_TreadWithPage(int xid, recordid rid, Page *p, void *dat)LINKER_STUB
compensated_function void __real_TreadRaw(int xid, recordid rid, void *dat)LINKER_STUB
compensated_function void __real_TreadStr(int xid, recordid rid, char *dat)LINKER_STUB
void __real_TreadRaw(int xid, recordid rid, void *dat)LINKER_STUB
void __real_TreadStr(int xid, recordid rid, char *dat)LINKER_STUB
int __real_Tcommit(int xid)LINKER_STUB
int __real_TsoftCommit(int xid)LINKER_STUB
void __real_TforceCommits(void)LINKER_STUB

View file

@ -21,10 +21,10 @@ lsn_t __real_TwritebackUpdate(int xid, pageid_t page,
void __real_TreorderableWritebackUpdate(int xid, void* h,
pageid_t page, const void * dat,
size_t datlen, int op);
compensated_function void __real_Tread(int xid, recordid rid, void *dat);
void __real_Tread(int xid, recordid rid, void *dat);
Page * __real_TreadWithPage(int xid, recordid rid, Page *p, void *dat);
compensated_function void __real_TreadRaw(int xid, recordid rid, void *dat);
compensated_function void __real_TreadStr(int xid, recordid rid, char *dat);
void __real_TreadRaw(int xid, recordid rid, void *dat);
void __real_TreadStr(int xid, recordid rid, char *dat);
int __real_Tcommit(int xid);
int __real_TsoftCommit(int xid);
void __real_TforceCommits(void);
@ -87,7 +87,7 @@ void __wrap_TreorderableWritebackUpdate(int xid, void* h,
__real_TreorderableWritebackUpdate(xid, h, page, dat, datlen, op);
printf("ret_TreorderableWritebackUpdate(%lld)\n", (long long)stasis_timestamp);
}
compensated_function void __wrap_Tread(int xid, recordid rid, void *dat) {
void __wrap_Tread(int xid, recordid rid, void *dat) {
printf("call_Tread(%lld, %d, %lld, %d, %lld)\n", (long long)stasis_timestamp, xid, rid.page, rid.slot, (long long)rid.size);
__real_Tread(xid, rid, dat);
printf("ret_Tread(%lld, %d, %lld, %d, %lld)\n", (long long)stasis_timestamp, xid, rid.page, rid.slot, (long long)rid.size);
@ -98,12 +98,12 @@ Page * __wrap_TreadWithPage(int xid, recordid rid, Page *p, void *dat) {
printf("ret_Tread(%lld, %d, %lld, %d, %lld)\n", (long long)stasis_timestamp, xid, rid.page, rid.slot, (long long)rid.size);
return ret;
}
compensated_function void __wrap_TreadRaw(int xid, recordid rid, void *dat) {
void __wrap_TreadRaw(int xid, recordid rid, void *dat) {
printf("call_Tread(%lld, %d, %lld, %d, %lld)\n", (long long)stasis_timestamp, xid, rid.page, rid.slot, (long long)rid.size); // XXX due to interposition artifacts, this printf will rarely be called.
__real_TreadRaw(xid, rid, dat);
printf("ret_Tread(%lld, %d, %lld, %d, %lld)\n", (long long)stasis_timestamp, xid, rid.page, rid.slot, (long long)rid.size);
}
compensated_function void __wrap_TreadStr(int xid, recordid rid, char *dat) {
void __wrap_TreadStr(int xid, recordid rid, char *dat) {
printf("call_Tread(%lld, %d, %lld, %d, %lld)\n", (long long)stasis_timestamp, xid, rid.page, rid.slot, (long long)rid.size);
__real_TreadStr(xid, rid, dat);
printf("ret_Tread(%lld, %d, %lld, %d, %lld)\n", (long long)stasis_timestamp, xid, rid.page, rid.slot, (long long)rid.size);

View file

@ -60,8 +60,8 @@ void stasis_blob_read(int xid, Page * p, recordid rid, void * buf);
*/
void stasis_blob_write(int xid, Page * p, recordid rid, const void * buf);
compensated_function recordid preAllocBlob(int xid, long blobsize);
compensated_function recordid preAllocBlobFromPage(int xid, long page, long blobsize);
recordid preAllocBlob(int xid, long blobsize);
recordid preAllocBlobFromPage(int xid, long page, long blobsize);
/**
Allocate a blob of size blobSize.

View file

@ -172,6 +172,6 @@ extern long long *stasis_dbug_timestamp;
#define STLSEARCH
#include "compensations.h"
#include <pthread.h>
#endif /* __stasis_common_h */

View file

@ -1,171 +0,0 @@
#include <pthread.h>
#include <stasis/common.h>
#ifndef __COMPENSATIONS_H
#define __COMPENSATIONS_H
/**
@file
An incomplete implementation of compensations for C (deprecated)
@deprecated Don't use compensations in new code. For now, abort()
the process on error.
Rants about cpp:
There seems to be no way to add this syntax:
foo() {
lock * l = hashLookup(foo);
// stuff
compensate_(l) {
lock(l);
// blah blah
} with {
unlock(l);
}
// more stuff
}
=>
foo() {
push_compensation_stack(lock_c_line_1231, l);
lock(l);
pop_compensation_stack(); // remove top stack entry and execute it.
}
void lock_c_line_1231(lock * l) {
unlock(l);
}
(note that this syntax doesn't require closures!)
There are a few problems:
1: 'compensate' and 'with' need to know the name of the
compensation's implementation function.
2: the 'with' block needs to move its code to the outside of the
enclosing function's scope, since nested functions cannot be called
after the function they are declared in returns.
You could try \#defining a temporary variable, and reading from it in
the 'with' macro, but you seem to need a stack in order to support
that.
Here is the syntax that I've settled on:
lock_t * l = foo();
begin_action(unlock, l) {
lock(l);
// ...
} compensate;
// Or: (not recommended)
begin_action(unlock, l) {
lock(l);
// ...
unlock(l);
} end_action;
// This is a useful variant, however:
lock(l);
// while loop , complicated stuff, etc
{
begin_action(unlock, l) {
// something that can cause an error.
} end_action;
}
unlock(l);
In all cases, an error can be raised using:
compensation_set_error(int i);
If an error is set, then each instance of begin_action and
end_action will cause the function to return, as appropriate.
Currently, nesting begin_actions within each other in the same
function will not work. This could probably be partially fixed by
replacing return statements with 'break' statements, or a GOTO to
the proper enclosing end_action/compensate. There may be a way to
\#define/\#undefine a variable in a way that would handle this
properly.
Also, begin_action(NULL, NULL) is supported, and is useful for
checking the return value of a called function, but, for
efficiency, try{ } end; is recommended
*/
BEGIN_C_DECLS
void compensations_init();
void compensations_deinit();
long compensation_error();
void compensation_clear_error();
void compensation_set_error(long code);
#define try do { if(compensation_error()) return; do
#define try_ret(x) do { if(compensation_error()) return (x); do
#define end while(0); if(compensation_error()) return; }while(0)
#define end_ret(x) while(0); if(compensation_error()) return (x); }while(0)
extern int ___compensation_count___;
#define begin_action(func, var) \
if(compensation_error()) return; \
do{ \
/* void (*_func_)(void*); */ \
/* assert(func); */ \
pthread_cleanup_push(/*_func_=*/(void(*)(void*))(func), (void*)(var));\
/* assert(_func_); */ \
do
/** @todo compensation variables don't need _func_ anymore. */
#define end_action \
while(0); \
pthread_cleanup_pop(/*_func_ &&*/compensation_error()); \
if(compensation_error()) return; \
} while(0)
#define compensate \
while(0); \
pthread_cleanup_pop(1/*(int)_func_*/); \
if(compensation_error()) return; \
} while(0)
#define begin_action_ret(func, var, ret) \
if(compensation_error()) return (ret); \
do{ \
/* void (*_func_)(void*); */ \
pthread_cleanup_push(/*_func_=*/(void(*)(void*))(func), (void*)(var));\
do
#define end_action_ret(ret) \
while(0); \
pthread_cleanup_pop(/*_func_ &&*/compensation_error()); \
if(compensation_error()) return (ret); \
} while(0)
#define compensate_ret(ret) \
while(0); \
pthread_cleanup_pop(1/*(int)_func*/); \
if(compensation_error()) return (ret); \
} while(0)
#define compensated_function
#endif
END_C_DECLS

View file

@ -17,8 +17,8 @@ extern LockManagerSetup globalLockManager;
void lockManagerInit();
compensated_function int lockManagerReadLockRecord(int xid, recordid rid);
compensated_function int lockManagerWriteLockRecord(int xid, recordid rid);
int lockManagerReadLockRecord(int xid, recordid rid);
int lockManagerWriteLockRecord(int xid, recordid rid);
int lockManagerUnlockRecord(int xid, recordid rid);
int lockManagerCommit(int xid);

View file

@ -22,10 +22,10 @@ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset);
void logMemory_consumer_close(int xid, void *it);
int logMemory_consumer_push (int xid, void * it, byte * key, size_t keySize, byte * val, size_t valSize);
void logMemory_Iterator_close(int xid, void * impl);
compensated_function int logMemory_Iterator_next(int xid, void * impl);
compensated_function int logMemory_Iterator_tryNext(int xid, void * impl);
compensated_function int logMemory_Iterator_key (int xid, void * impl, byte ** key);
compensated_function int logMemory_Iterator_value (int xid, void * impl, byte ** value);
compensated_function void logMemory_Iterator_releaseTuple(int xid, void *it);
compensated_function void logMemory_Iterator_releaseLock (int xid, void * impl);
int logMemory_Iterator_next(int xid, void * impl);
int logMemory_Iterator_tryNext(int xid, void * impl);
int logMemory_Iterator_key (int xid, void * impl, byte ** key);
int logMemory_Iterator_value (int xid, void * impl, byte ** value);
void logMemory_Iterator_releaseTuple(int xid, void *it);
void logMemory_Iterator_releaseLock (int xid, void * impl);
#endif

View file

@ -32,14 +32,14 @@ void stasis_alloc_deinit(stasis_alloc_t* alloc);
@return the recordid of the new record.
*/
compensated_function recordid Talloc(int xid, unsigned long size);
recordid Talloc(int xid, unsigned long size);
compensated_function recordid TallocFromPage(int xid, pageid_t page, unsigned long size);
recordid TallocFromPage(int xid, pageid_t page, unsigned long size);
/**
Free a record.
*/
compensated_function void Tdealloc(int xid, recordid rid);
void Tdealloc(int xid, recordid rid);
/**
Obtain the type of a record, as returned by getRecordType.
@ -54,7 +54,7 @@ compensated_function void Tdealloc(int xid, recordid rid);
@see getRecordType
*/
compensated_function int TrecordType(int xid, recordid rid);
int TrecordType(int xid, recordid rid);
/**
Obtain the length of the data stored in a record.
@ -66,9 +66,9 @@ compensated_function int TrecordType(int xid, recordid rid);
@return -1 if the record does not exist, the size of the record otherwise.
*/
compensated_function int TrecordSize(int xid, recordid rid);
int TrecordSize(int xid, recordid rid);
/** Return the number of records stored in page pageid */
compensated_function int TrecordsInPage(int xid, pageid_t page);
int TrecordsInPage(int xid, pageid_t page);
#endif

View file

@ -102,7 +102,7 @@ void TarrayListDealloc(int xid, recordid rid);
@param rid the recordid pointing to the ArrayList.
@param slots the number of slots to end to the end of the ArrayList.
*/
compensated_function int TarrayListExtend(int xid, recordid rid, int slots);
int TarrayListExtend(int xid, recordid rid, int slots);
/**
Get the length of an ArrayList.
@ -110,7 +110,7 @@ compensated_function int TarrayListExtend(int xid, recordid rid, int slots);
@param rid the recordid pointing to the ArrayList.
@return The number of items stored in the ArrayList.
*/
compensated_function int TarrayListLength(int xid, recordid rid);
int TarrayListLength(int xid, recordid rid);
/** Used by Tread() and Tset() to map from arrayList index to recordid. */
recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset);

View file

@ -41,9 +41,8 @@
/** Aim to keep 0.7 items in each bucket */
#define HASH_FILL_FACTOR 0.7
compensated_function recordid ThashCreate(int xid, int keySize, int valSize);
compensated_function void ThashDelete(int xid, recordid hash);
recordid ThashCreate(int xid, int keySize, int valSize);
void ThashDelete(int xid, recordid hash);
/**
Insert key, value pair into hash, overwriting the existing value,
if any.
@ -56,7 +55,7 @@ compensated_function void ThashDelete(int xid, recordid hash);
@param valueSize length of key in bytes
@return 1 if the key was defined, 0 otherwise
*/
compensated_function int ThashInsert(int xid, recordid hash,
int ThashInsert(int xid, recordid hash,
const byte* key, int keySize,
const byte* value, int valueSize);
/**
@ -68,13 +67,13 @@ compensated_function int ThashInsert(int xid, recordid hash,
@param keySize length of key in bytes
@return 1 if the key was defined, 0 otherwise
*/
compensated_function int ThashRemove(int xid, recordid hash,
int ThashRemove(int xid, recordid hash,
const byte* key, int keySize);
/** @return size of the value associated with key, or -1 if key not found.
(a return value of zero means the key is associated with an
empty value.) */
compensated_function int ThashLookup(int xid, recordid hash, const byte* key, int keySize, byte ** value);
int ThashLookup(int xid, recordid hash, const byte* key, int keySize, byte ** value);
/**
Iterator that complies with the standard Stasis iterator interface.

View file

@ -29,24 +29,24 @@ typedef struct {
recordid listRoot;
} stasis_linkedList_iterator;
compensated_function int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
compensated_function int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value);
compensated_function int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
compensated_function int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize);
int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value);
int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize);
/** The linked list iterator can tolerate the concurrent removal of values that
it has already returned. In the presence of such removals, the iterator
will return the keys and values present in the list as it existed when next()
was first called.
@return a new iterator initialized to the head of the list. */
compensated_function stasis_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize);
stasis_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize);
void TlinkedListClose(int xid, stasis_linkedList_iterator * it);
/** @return 1 if there was another entry to be iterated over. 0 otherwise.
If this function returns 1, the caller must free() the malloced memory
returned via the key and value arguments.*/
compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
compensated_function recordid TlinkedListCreate(int xid, int keySize, int ValueSize);
compensated_function void TlinkedListDelete(int xid, recordid list);
int TlinkedListNext(int xid, stasis_linkedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
recordid TlinkedListCreate(int xid, int keySize, int ValueSize);
void TlinkedListDelete(int xid, recordid list);
void TlinkedListNTAInit();
void TlinkedListNTADeinit();

View file

@ -105,6 +105,6 @@ stasis_operation_impl stasis_op_impl_multipage_initialize();
stasis_operation_impl stasis_op_impl_fixed_page_alloc();
compensated_function void pageOperationsInit(stasis_log_t *log);
void pageOperationsInit(stasis_log_t *log);
#endif

View file

@ -89,10 +89,10 @@ typedef struct {
//recordid dereferencePagedListRID(int xid, recordid rid);
/** @return 1 if the key was already in the list. */
compensated_function int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
compensated_function int TpagedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value);
compensated_function int TpagedListRemove(int xid, recordid list, const byte * key, int keySize);
compensated_function int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize);
int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
int TpagedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value);
int TpagedListRemove(int xid, recordid list, const byte * key, int keySize);
int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize);
/** The linked list iterator can tolerate the concurrent removal of values that
it has already returned. In the presence of such removals, the iterator
will return the keys and values present in the list as it existed when next()
@ -104,10 +104,10 @@ void TpagedListClose(int xid, lladd_pagedList_iterator *it);
/** @return 1 if there was another entry to be iterated over. 0 otherwise.
If this function returns 1, the caller must free() the malloced memory
returned via the key and value arguments.*/
compensated_function int TpagedListNext(int xid, lladd_pagedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
compensated_function recordid TpagedListAlloc(int xid);
compensated_function void TpagedListDelete(int xid, recordid list);
compensated_function int TpagedListSpansPages(int xid, recordid list);
int TpagedListNext(int xid, lladd_pagedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
recordid TpagedListAlloc(int xid);
void TpagedListDelete(int xid, recordid list);
int TpagedListSpansPages(int xid, recordid list);
stasis_operation_impl getPagedListInsert();
stasis_operation_impl getPagedListRemove();
#endif

View file

@ -79,7 +79,6 @@ stasis_operation_impl stasis_op_impl_set_range_inverse();
efficiently, it performs a number of extra memcpy() calls over the
entire record.
*/
compensated_function void TsetRange(int xid, recordid rid, int offset, int length, const void * dat);
void TsetRange(int xid, recordid rid, int offset, int length, const void * dat);
#endif

View file

@ -632,7 +632,7 @@ void TreorderableWritebackUpdate(int xid, void* h,
* @param rid reference to page/slot
* @param dat buffer into which data goes
*/
compensated_function void Tread(int xid, recordid rid, void *dat);
void Tread(int xid, recordid rid, void *dat);
Page * TreadWithPage(int xid, recordid rid, Page *p, void *dat);
/**
* Read a value of a record without first dereferencing the record.
@ -642,8 +642,8 @@ Page * TreadWithPage(int xid, recordid rid, Page *p, void *dat);
* @see arrayList for a data structure that uses recordid
* dereferencing to transparently provide records to its callers.
*/
compensated_function void TreadRaw(int xid, recordid rid, void *dat);
compensated_function void TreadStr(int xid, recordid rid, char *dat);
void TreadRaw(int xid, recordid rid, void *dat);
void TreadStr(int xid, recordid rid, char *dat);
/**
* Commit an active transaction. Each transaction should be completed

View file

@ -19,9 +19,6 @@ CREATE_CHECK(check_header)
CREATE_CHECK(check_linkedListNTA)
CREATE_CHECK(check_linearHashNTA)
CREATE_CHECK(check_pageOrientedList)
CREATE_CHECK(check_lockManager)
CREATE_CHECK(check_compensations)
CREATE_CHECK(check_errorHandling)
CREATE_CHECK(check_ringbuffer)
CREATE_CHECK(check_iterator)
CREATE_CHECK(check_multiplexer)

View file

@ -3,5 +3,5 @@
int main(void) {
Tinit();
Tdeinit();
return compensation_error();
return 0;
}

View file

@ -5,5 +5,5 @@ int main(void) {
TtruncateLog();
Tdeinit();
return compensation_error();
return 0;
}