whitespace fixup after removal of compensations

This commit is contained in:
Sears Russell 2011-06-12 09:20:58 +00:00
parent aee5cca000
commit 4fb44b6f5f
14 changed files with 559 additions and 657 deletions

View file

@ -53,83 +53,81 @@ static int bufManCloseHandle(stasis_buffer_manager_t *bm, stasis_buffer_manager_
}
stasis_buffer_manager_t* stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) {
page_handle = ph;
stasis_buffer_manager_t * bm = malloc(sizeof(*bm));
bm->releasePageImpl = bufManReleasePage;
bm->openHandleImpl = bufManOpenHandle;
bm->closeHandleImpl = bufManCloseHandle;
bm->loadPageImpl = bufManLoadPage;
bm->loadUninitPageImpl = bufManLoadUninitPage;
bm->prefetchPages = NULL;
bm->getCachedPageImpl = bufManGetCachedPage;
bm->writeBackPage = pageWrite_legacyWrapper;
bm->forcePages = forcePageFile_legacyWrapper;
bm->forcePageRange = forceRangePageFile_legacyWrapper;
bm->stasis_buffer_manager_close = bufManBufDeinit;
bm->stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash;
page_handle = ph;
stasis_buffer_manager_t * bm = malloc(sizeof(*bm));
bm->releasePageImpl = bufManReleasePage;
bm->openHandleImpl = bufManOpenHandle;
bm->closeHandleImpl = bufManCloseHandle;
bm->loadPageImpl = bufManLoadPage;
bm->loadUninitPageImpl = bufManLoadUninitPage;
bm->prefetchPages = NULL;
bm->getCachedPageImpl = bufManGetCachedPage;
bm->writeBackPage = pageWrite_legacyWrapper;
bm->forcePages = forcePageFile_legacyWrapper;
bm->forcePageRange = forceRangePageFile_legacyWrapper;
bm->stasis_buffer_manager_close = bufManBufDeinit;
bm->stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash;
stasis_buffer_pool = stasis_buffer_pool_init();
stasis_buffer_pool = stasis_buffer_pool_init();
pthread_mutex_init(&loadPagePtr_mutex, NULL);
pthread_mutex_init(&loadPagePtr_mutex, NULL);
activePages = LH_ENTRY(create)(16);
activePages = LH_ENTRY(create)(16);
dummy_page = stasis_buffer_pool_malloc_page(stasis_buffer_pool);
stasis_buffer_pool_free_page(stasis_buffer_pool, dummy_page, -1);
Page *first;
first = stasis_buffer_pool_malloc_page(stasis_buffer_pool);
stasis_buffer_pool_free_page(stasis_buffer_pool, first, 0);
LH_ENTRY(insert)(activePages, &first->id, sizeof(first->id), first);
page_handle->read(page_handle, first, UNKNOWN_TYPE_PAGE);
pageCacheInit(first);
dummy_page = stasis_buffer_pool_malloc_page(stasis_buffer_pool);
stasis_buffer_pool_free_page(stasis_buffer_pool, dummy_page, -1);
Page *first;
first = stasis_buffer_pool_malloc_page(stasis_buffer_pool);
stasis_buffer_pool_free_page(stasis_buffer_pool, first, 0);
LH_ENTRY(insert)(activePages, &first->id, sizeof(first->id), first);
page_handle->read(page_handle, first, UNKNOWN_TYPE_PAGE);
pageCacheInit(first);
int err = pthread_key_create(&lastPage, 0);
assert(!err);
int err = pthread_key_create(&lastPage, 0);
assert(!err);
assert(activePages);
assert(activePages);
#ifdef PROFILE_LATCHES_WRITE_ONLY
profile_load_hash = LH_ENTRY(create)(10);
profile_load_pins_hash = LH_ENTRY(create)(10);
profile_load_hash = LH_ENTRY(create)(10);
profile_load_pins_hash = LH_ENTRY(create)(10);
#endif
bm->impl = 0; // XXX hack, but this module is deprecated
return bm;
bm->impl = 0; // XXX hack, but this module is deprecated
return bm;
}
static void bufManBufDeinit() {
DEBUG("pageCacheDeinit()");
DEBUG("pageCacheDeinit()");
struct LH_ENTRY(list) iter;
const struct LH_ENTRY(pair_t) * next;
LH_ENTRY(openlist(activePages, &iter));
struct LH_ENTRY(list) iter;
const struct LH_ENTRY(pair_t) * next;
LH_ENTRY(openlist(activePages, &iter));
while((next = LH_ENTRY(readlist)(&iter))) {
page_handle->write(page_handle, (Page*)next->value);
DEBUG("+");
}
while((next = LH_ENTRY(readlist)(&iter))) {
page_handle->write(page_handle, (Page*)next->value);
DEBUG("+");
}
LH_ENTRY(destroy)(activePages);
LH_ENTRY(destroy)(activePages);
pthread_mutex_destroy(&loadPagePtr_mutex);
pthread_mutex_destroy(&loadPagePtr_mutex);
pageCacheDeinit();
pageCacheDeinit();
stasis_buffer_pool_deinit(stasis_buffer_pool);
stasis_buffer_pool_deinit(stasis_buffer_pool);
page_handle->close(page_handle);
page_handle->close(page_handle);
#ifdef PIN_COUNT
if(pinCount != 0) {
printf("WARNING: At exit, %d pages were still pinned!\n", pinCount);
}
if(pinCount != 0) {
printf("WARNING: At exit, %d pages were still pinned!\n", pinCount);
}
#endif
return;
return;
}
/**
Just close file descriptors, don't do any other clean up. (For
testing.)
@todo buffer manager should never call close_(); it not longer manages pageFile handles
Just close file descriptors, don't do any other clean up. (For
testing.)
*/
static void bufManSimulateBufferManagerCrash() {
page_handle->close(page_handle);
@ -202,18 +200,18 @@ static Page* bufManGetPage(int xid, pageid_t pageid, int locktype, int uninitial
char * holderD = 0;
int pinsD = 0;
if(holder) {
holderD = strdup(holder);
pinsD = *pins;
holderD = strdup(holder);
pinsD = *pins;
}
#endif
if(locktype == RW) {
writelock(ret->loadlatch, 217);
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217);
readlock(ret->loadlatch, 217);
}
#ifdef PROFILE_LATCHES_WRITE_ONLY
if(holderD)
free(holderD);
free(holderD);
#endif
}
spin++;

View file

@ -159,9 +159,9 @@ int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
do {
int wait_ret = pthread_cond_timedwait(&ridLock->readOK, mut, &ts);
if(wait_ret == ETIMEDOUT) {
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_DEADLOCK;
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_DEADLOCK;
}
} while(ridLock->writers);
}
@ -197,7 +197,7 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
me = 1;
}
pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup_r(ridLockTable, dat, datLen);
@ -221,10 +221,10 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
while(ridLock->writers || (ridLock->readers - me)) {
int lockret = pthread_cond_timedwait(&ridLock->writeOK, mut, &ts);
if(lockret == ETIMEDOUT) {
ridLock->waiting--;
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_DEADLOCK;
ridLock->waiting--;
ridLock->active--;
pthread_mutex_unlock(mut);
return LLADD_DEADLOCK;
}
}
}

View file

@ -17,41 +17,36 @@ typedef struct {
lsn_t eof;
} logMemory_fifo_t;
void logMemory_init() {
/* NO-OP */
/* NO-OP */
}
lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) {
lladdFifo_t * fifo = (lladdFifo_t *) malloc(sizeof(lladdFifo_t));
lladdFifo_t * fifo = (lladdFifo_t *) malloc(sizeof(lladdFifo_t));
lladdIterator_t * iterator = (lladdIterator_t *) malloc(sizeof(lladdIterator_t));
iterator->type = LOG_MEMORY_ITERATOR;
iterator->impl = malloc(sizeof(logMemory_fifo_t));
((logMemory_fifo_t *)iterator->impl)->ringBuffer = openLogRingBuffer(size, initialOffset);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->mutex), NULL);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->readerMutex), NULL);
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->readReady), NULL);
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->writeReady), NULL);
((logMemory_fifo_t *)iterator->impl)->cached_value = NULL;
((logMemory_fifo_t *)iterator->impl)->eof = -1;
lladdIterator_t * iterator = (lladdIterator_t *) malloc(sizeof(lladdIterator_t));
iterator->type = LOG_MEMORY_ITERATOR;
iterator->impl = malloc(sizeof(logMemory_fifo_t));
((logMemory_fifo_t *)iterator->impl)->ringBuffer = openLogRingBuffer(size, initialOffset);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->mutex), NULL);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->readerMutex), NULL);
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->readReady), NULL);
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->writeReady), NULL);
((logMemory_fifo_t *)iterator->impl)->cached_value = NULL;
((logMemory_fifo_t *)iterator->impl)->eof = -1;
lladdConsumer_t * consumer = (lladdConsumer_t *) malloc(sizeof(lladdConsumer_t));
consumer->type = LOG_MEMORY_CONSUMER;
consumer->impl = iterator->impl;
lladdConsumer_t * consumer = (lladdConsumer_t *) malloc(sizeof(lladdConsumer_t));
consumer->type = LOG_MEMORY_CONSUMER;
consumer->impl = iterator->impl;
fifo->iterator = iterator;
fifo->consumer = consumer;
fifo->iterator = iterator;
fifo->consumer = consumer;
return fifo;
return fifo;
}
/*------------- iterator interface implementation --------------------*/
/** This function should not be called until next() or one of its
@ -92,7 +87,6 @@ int logMemory_Iterator_next (int xid, void * impl) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
@ -115,7 +109,6 @@ int logMemory_Iterator_next (int xid, void * impl) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
@ -124,8 +117,8 @@ int logMemory_Iterator_next (int xid, void * impl) {
pthread_cond_broadcast(&(fifo->writeReady));
pthread_mutex_unlock(&(fifo->mutex));
return 1;
return 1;
}
/** @todo logMemory_Iterator_tryNext is a cut and pasted version of
@ -161,7 +154,6 @@ int logMemory_Iterator_tryNext (int xid, void * impl) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
@ -184,9 +176,7 @@ int logMemory_Iterator_tryNext (int xid, void * impl) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
fifo->cached_lsn = (lsn_t)lsn;
@ -316,10 +306,6 @@ void logMemory_consumer_close(int xid, void *it){
pthread_mutex_unlock(&(fifo->mutex));
}
/*compensated_function void Tconsumer_close(int xid, lladdConsumer_t * cons) {
logMemory_consumer_close(xid, cons);
}*/
int logMemory_consumer_push(int xid, /*lladdConsumer_t * cons*/ void * it, byte * key, size_t keySize, byte * val, size_t valSize) {
int ret;
logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it);
@ -347,12 +333,3 @@ int logMemory_consumer_push(int xid, /*lladdConsumer_t * cons*/ void * it, byte
return ret;
}
/* if(it->type == LOG_MEMORY_CONSUMER) {
return logMemory_consumer_push(xid, it, key, keySize, val, valSize);
}
if(it->type == POINTER_CONSUMER) {
return pointer_consumer_push(xid, it, key, keySize, val, valSize);
}
// always succeeds.
}*/

View file

@ -4,22 +4,19 @@
#include <stasis/logger/logMemory.h>
lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it,
void (*multiplexer)(byte * key,
size_t keySize,
byte * value,
size_t valueSize,
byte ** multiplexKey,
size_t * multiplexKeySize),
/* lladdConsumer_t * getConsumer(struct lladdFifoPool_t* fifoPool,
byte* multiplexKey,
size_t multiplexKeySize), */
lladdFifoPool_t * fifoPool) {
lladdMultiplexer_t * lladdMultiplexer_alloc(
int xid, lladdIterator_t * it,
void (*multiplexer)(byte * key,
size_t keySize,
byte * value,
size_t valueSize,
byte ** multiplexKey,
size_t * multiplexKeySize),
lladdFifoPool_t * fifoPool) {
lladdMultiplexer_t * ret = malloc(sizeof(lladdMultiplexer_t));
ret->it = it;
ret->multiplexer = multiplexer;
ret->consumerHash = pblHtCreate();
// ret->getConsumer = getConsumer;
ret->fifoPool = fifoPool;
ret->xid = xid;
return ret;
@ -121,11 +118,11 @@ void * multiplexer_worker(void * arg) {
*/
void multiplexHashLogByKey(byte * key,
size_t keySize,
byte * value,
size_t valueSize,
byte ** multiplexKey,
size_t * multiplexKeySize) {
size_t keySize,
byte * value,
size_t valueSize,
byte ** multiplexKey,
size_t * multiplexKeySize) {
// We don't care what the key is. It's probably an LSN.
const LogEntry * log = (const LogEntry*) value;
const byte * updateArgs = stasis_log_entry_update_args_cptr(log); // assume the log is a logical update entry.
@ -158,7 +155,6 @@ void multiplexHashLogByKey(byte * key,
}
}
void multiplexByValue(byte * key, size_t keySize, byte * value, size_t valueSize, byte **multiplexKey, size_t * multiplexSize) {
*multiplexKey = value;
*multiplexSize = valueSize;

View file

@ -15,7 +15,6 @@ typedef struct {
pageid_t maxOffset;
} array_list_parameter_t;
static array_list_parameter_t array_list_read_parameter(int xid, Page * p) {
array_list_parameter_t alp;
@ -60,9 +59,10 @@ static int array_list_op_init_header(const LogEntry* e, Page* p) {
= stasis_log_entry_update_args_cptr(e);
stasis_fixed_initialize_page(p, sizeof(pageid_t),
stasis_fixed_records_per_page(sizeof(pageid_t)));
stasis_fixed_records_per_page(sizeof(pageid_t)));
recordid initialSizeRid, multiplierRid, slotSizeRid, maxOffsetRid, firstDataPageRid;
initialSizeRid.page
= multiplierRid.page = slotSizeRid.page
= maxOffsetRid.page = firstDataPageRid.page = p->id;
@ -190,11 +190,10 @@ void TarrayListDealloc(int xid, recordid rid) {
/** @todo locking for arrayList... this isn't pressing since currently
the only thing that calls arraylist (the hashtable
implementations) serialize bucket list operations anyway...
implementations) serializes bucket list operations anyway...
@todo this function calls pow(), which is horribly inefficient.
*/
int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch, 0);

View file

@ -17,7 +17,6 @@
@todo Improve concurrency of linearHashNTA and linkedListNTA by leveraging Page.impl on the data structure header page?
*/
static void linearHashNTAIterator_close(int xid, void * it);
static int linearHashNTAIterator_next (int xid, void * it);
static int linearHashNTAIterator_key (int xid, void * it, byte **key);
@ -45,12 +44,12 @@ void LinearHashNTAInit() {
pthread_mutexattr_destroy(&attr);
lladdIterator_def_t linearHashNTA_def = {
linearHashNTAIterator_close,
linearHashNTAIterator_next,
linearHashNTAIterator_next,
linearHashNTAIterator_key,
linearHashNTAIterator_value,
noopTupDone,
linearHashNTAIterator_close,
linearHashNTAIterator_next,
linearHashNTAIterator_next,
linearHashNTAIterator_key,
linearHashNTAIterator_value,
noopTupDone,
};
lladdIterator_register(LINEAR_HASH_NTA_ITERATOR, linearHashNTA_def);
}
@ -94,7 +93,7 @@ recordid ThashCreate(int xid, int keySize, int valueSize) {
for(i = 0; i < HASH_INIT_ARRAY_LIST_COUNT; i++) {
bucket.slot = i;
begin_action_ret(free, entry, NULLRID) {
Tset(xid, bucket, entry);
Tset(xid, bucket, entry);
} end_action_ret(NULLRID);
}
free (entry);
@ -116,17 +115,6 @@ void ThashDelete(int xid, recordid hash) {
static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize);
static int __ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize);
/*typedef struct {
recordid hashHeader;
int keySize;
} linearHash_insert_arg;
typedef struct {
recordid hashHeader;
int keySize;
int valueSize;
} linearHash_remove_arg;*/
static int op_linear_hash_insert(const LogEntry* e, Page* p) {
const linearHash_remove_arg * args = stasis_log_entry_update_args_cptr(e);
recordid hashHeader = args->hashHeader;
@ -187,9 +175,7 @@ int ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, cons
arg->keySize = keySize;
memcpy(arg+1, key, keySize);
/** @todo MEMORY LEAK arg, handle on pthread_cancel.. */
void * handle;
handle = TbeginNestedTopAction(xid, OPERATION_LINEAR_HASH_INSERT, (byte*)arg, argSize);
free(arg);
__ThashInsert(xid, hashHeader, key, keySize, value, valueSize);
@ -206,17 +192,17 @@ static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keyS
lhh.numEntries ++;
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
if(lhh.numEntries > (int)((double)(lhh.nextSplit
+ stasis_util_two_to_the(lhh.bits-1)) * (HASH_FILL_FACTOR))) {
ThashSplitBucket(xid, hashHeader, &lhh);
}
} else {
if(lhh.numEntries > (int)((double)(lhh.nextSplit
+ stasis_util_two_to_the(lhh.bits-1)) * HASH_FILL_FACTOR)) {
ThashSplitBucket(xid, hashHeader, &lhh);
}
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
if(lhh.numEntries > (int)((double)(lhh.nextSplit
+ stasis_util_two_to_the(lhh.bits-1)) * (HASH_FILL_FACTOR))) {
ThashSplitBucket(xid, hashHeader, &lhh);
}
} else {
if(lhh.numEntries > (int)((double)(lhh.nextSplit
+ stasis_util_two_to_the(lhh.bits-1)) * HASH_FILL_FACTOR)) {
ThashSplitBucket(xid, hashHeader, &lhh);
}
}
recordid bucket = lhh.buckets;
bucket.slot = stasis_linear_hash(key, keySize, lhh.bits, lhh.nextSplit);
@ -229,16 +215,9 @@ static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keyS
Tread(xid, bucket, &bucketList);
// int before = TpagedListSpansPages(xid, bucketList);
ret = TpagedListRemove(xid, bucketList, key, keySize);
TpagedListInsert(xid, bucketList, key, keySize, value, valueSize);
// int after = TpagedListSpansPages(xid, bucketList);
// if(before != after) { // Page overflowed...
// T hashSplitBucket(xid, hashHeader, &lhh);
// T hashSplitBucket(xid, hashHeader, &lhh);
// }
} else {
assert(lhh.keySize == keySize); assert(lhh.valueSize == valueSize);
ret = TlinkedListRemove(xid, bucket, key, keySize);
@ -333,67 +312,67 @@ int ThashLookup(int xid, recordid hashHeader, const byte * key, int keySize, byt
}
static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh) {
long old_bucket = lhh->nextSplit;
long new_bucket = old_bucket + stasis_util_two_to_the(lhh->bits-1);
recordid old_bucket_rid = lhh->buckets;
recordid new_bucket_rid = lhh->buckets;
old_bucket_rid.slot = old_bucket;
new_bucket_rid.slot = new_bucket;
if(!(new_bucket % HASH_INIT_ARRAY_LIST_COUNT)) {
TarrayListExtend(xid, lhh->buckets, HASH_INIT_ARRAY_LIST_COUNT);
}
recordid new_bucket_list; // will be uninitialized if we have fixed length entries.
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
new_bucket_list = TpagedListAlloc(xid);
Tset(xid, new_bucket_rid, &new_bucket_list);
} else {
long old_bucket = lhh->nextSplit;
long new_bucket = old_bucket + stasis_util_two_to_the(lhh->bits-1);
recordid old_bucket_rid = lhh->buckets;
recordid new_bucket_rid = lhh->buckets;
old_bucket_rid.slot = old_bucket;
new_bucket_rid.slot = new_bucket;
if(!(new_bucket % HASH_INIT_ARRAY_LIST_COUNT)) {
TarrayListExtend(xid, lhh->buckets, HASH_INIT_ARRAY_LIST_COUNT);
}
recordid new_bucket_list; // will be uninitialized if we have fixed length entries.
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
new_bucket_list = TpagedListAlloc(xid);
Tset(xid, new_bucket_rid, &new_bucket_list);
} else {
#ifdef ARRAY_LIST_OLD_ALLOC
byte * entry = calloc(1, lhh->buckets.size);
Tset(xid, new_bucket_rid, entry);
free(entry);
byte * entry = calloc(1, lhh->buckets.size);
Tset(xid, new_bucket_rid, entry);
free(entry);
#endif
}
if(lhh->nextSplit < stasis_util_two_to_the(lhh->bits-1)-1) {
lhh->nextSplit++;
} else {
lhh->nextSplit = 0;
lhh->bits++;
}
}
if(lhh->nextSplit < stasis_util_two_to_the(lhh->bits-1)-1) {
lhh->nextSplit++;
} else {
lhh->nextSplit = 0;
lhh->bits++;
}
/** @todo linearHashNTA's split bucket should use the 'move' function call. */
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
recordid old_bucket_list;
Tread(xid, old_bucket_rid, &old_bucket_list);
/** @todo linearHashNTA's split bucket should use the 'move' function call. */
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
recordid old_bucket_list;
Tread(xid, old_bucket_rid, &old_bucket_list);
lladd_pagedList_iterator * pit = TpagedListIterator(xid, old_bucket_list);
lladd_pagedList_iterator * pit = TpagedListIterator(xid, old_bucket_list);
byte *key, *value;
int keySize, valueSize;
while(TpagedListNext(xid, pit, &key, &keySize, &value, &valueSize)) {
if(stasis_linear_hash(key, keySize, lhh->bits, lhh->nextSplit) != old_bucket) {
TpagedListRemove(xid, old_bucket_list, key, keySize);
TpagedListInsert(xid, new_bucket_list, key, keySize, value, valueSize);
}
free(key);
free(value);
byte *key, *value;
int keySize, valueSize;
while(TpagedListNext(xid, pit, &key, &keySize, &value, &valueSize)) {
if(stasis_linear_hash(key, keySize, lhh->bits, lhh->nextSplit) != old_bucket) {
TpagedListRemove(xid, old_bucket_list, key, keySize);
TpagedListInsert(xid, new_bucket_list, key, keySize, value, valueSize);
}
TpagedListClose(xid,pit);
} else {
stasis_linkedList_iterator * it = TlinkedListIterator(xid, old_bucket_rid, lhh->keySize, lhh->valueSize);
byte * key, *value;
int keySize, valueSize;
while(TlinkedListNext(xid, it, &key, &keySize, &value, &valueSize)) {
assert(valueSize == lhh->valueSize);
assert(keySize == lhh->keySize);
if(stasis_linear_hash(key, keySize, lhh->bits, lhh->nextSplit) != old_bucket) {
TlinkedListRemove(xid, old_bucket_rid, key, keySize);
TlinkedListInsert(xid, new_bucket_rid, key, keySize, value, valueSize);
}
free(key);
free(value);
}
TlinkedListClose(xid, it);
free(key);
free(value);
}
TpagedListClose(xid,pit);
} else {
stasis_linkedList_iterator * it = TlinkedListIterator(xid, old_bucket_rid, lhh->keySize, lhh->valueSize);
byte * key, *value;
int keySize, valueSize;
while(TlinkedListNext(xid, it, &key, &keySize, &value, &valueSize)) {
assert(valueSize == lhh->valueSize);
assert(keySize == lhh->keySize);
if(stasis_linear_hash(key, keySize, lhh->bits, lhh->nextSplit) != old_bucket) {
TlinkedListRemove(xid, old_bucket_rid, key, keySize);
TlinkedListInsert(xid, new_bucket_rid, key, keySize, value, valueSize);
}
free(key);
free(value);
}
TlinkedListClose(xid, it);
}
return;
}
@ -406,10 +385,7 @@ lladd_hash_iterator * ThashIterator(int xid, recordid hashHeader, int keySize, i
it->bucket = lhh.buckets;
it->numBuckets = lhh.nextSplit + stasis_util_two_to_the(lhh.bits-1);
it->bucket.slot = 0;
/* it->keySize = keySize;
it->valueSize = valueSize;
assert(keySize == lhh.keySize);
assert(valueSize == lhh.valueSize); */
keySize = lhh.keySize;
it->keySize = lhh.keySize;
valueSize = lhh.valueSize;
@ -433,29 +409,29 @@ int ThashNext(int xid, lladd_hash_iterator * it, byte ** key, int * keySize, byt
while(!TlinkedListNext(xid, it->it, key, keySize, value, valueSize)) {
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
TlinkedListClose(xid, it->it);
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
} else {
TlinkedListClose(xid, it->it);
it->it = 0;
return 0;
}
if(it->bucket.slot < it->numBuckets) {
TlinkedListClose(xid, it->it);
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
} else {
TlinkedListClose(xid, it->it);
it->it = 0;
return 0;
}
}
} else {
assert(it->pit);
while(!TpagedListNext(xid, it->pit, key, keySize, value, valueSize)) {
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
recordid bucketList;
Tread(xid, it->bucket, &bucketList);
TpagedListClose(xid,it->pit);
it->pit = TpagedListIterator(xid, bucketList);
} else {
TpagedListClose(xid,it->pit);
it->pit = 0;
return 0;
}
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
recordid bucketList;
Tread(xid, it->bucket, &bucketList);
TpagedListClose(xid,it->pit);
it->pit = TpagedListIterator(xid, bucketList);
} else {
TpagedListClose(xid,it->pit);
it->pit = 0;
return 0;
}
}
}
return 1;
@ -587,11 +563,10 @@ void * ThashAsyncWorker(void * argp) {
return NULL;
}
/*lladdMultiplexer_t **/
lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
int mainFifoLen, int numFifos,
int subFifoLen, int dirtyFifoLen,
lladdIterator_t ** dirtyIterator) {
int mainFifoLen, int numFifos,
int subFifoLen, int dirtyFifoLen,
lladdIterator_t ** dirtyIterator) {
lladdFifo_t * mainFifo = logMemoryFifo(mainFifoLen, 0);
lladdFifo_t * dirtyFifos = logMemoryFifo(dirtyFifoLen, 0);
@ -607,7 +582,6 @@ lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
int i = 0;
for(i = 0; i < numWorkerThreads; i++) {
pthread_t thread;
pthread_create(&thread, &attr, ThashAsyncWorker, mux->fifoPool->dirtyPoolFifo->iterator);

View file

@ -7,10 +7,6 @@
#include <assert.h>
/*#ifndef PTHREAD_MUTEX_RECURSIVE
#define PTHREAD_MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE_NP
#endif*/
/** A quick note on the format of linked lists. Each entry consists
of a struct with some variable length data appended to it.
@ -32,9 +28,7 @@
stasis_linkedList_entry next = entry->next;
@file
@file
*/
static pthread_mutex_t stasis_linked_list_mutex;
@ -150,38 +144,36 @@ stasis_operation_impl stasis_op_impl_linked_list_remove() {
return o;
}
static void stasis_linked_list_insert_helper(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
//int ret = Tli nkedListRemove(xid, list, key, keySize);
stasis_linkedList_entry * entry = malloc(sizeof(stasis_linkedList_entry) + keySize + valueSize);
stasis_linkedList_entry * entry = malloc(sizeof(stasis_linkedList_entry) + keySize + valueSize);
Tread(xid, list, entry);
if(!entry->next.size) {
memcpy(entry+1, key, keySize);
memcpy(((byte*)(entry+1))+keySize, value, valueSize);
entry->next.page = 0;
entry->next.slot = 0;
entry->next.size = -1;
Tset(xid, list, entry);
} else {
stasis_linkedList_entry * newEntry = malloc(sizeof(stasis_linkedList_entry) + keySize + valueSize);
memcpy(newEntry + 1, key, keySize);
memcpy(((byte*)(newEntry+1))+keySize, value, valueSize);
newEntry->next = entry->next;
recordid newRid = Talloc(xid, sizeof(stasis_linkedList_entry) + keySize + valueSize);
Tset(xid, newRid, newEntry);
entry->next = newRid;
Tset(xid, list, entry);
free(newEntry);
}
free(entry);
Tread(xid, list, entry);
if(!entry->next.size) {
memcpy(entry+1, key, keySize);
memcpy(((byte*)(entry+1))+keySize, value, valueSize);
entry->next.page = 0;
entry->next.slot = 0;
entry->next.size = -1;
Tset(xid, list, entry);
} else {
stasis_linkedList_entry * newEntry = malloc(sizeof(stasis_linkedList_entry) + keySize + valueSize);
memcpy(newEntry + 1, key, keySize);
memcpy(((byte*)(newEntry+1))+keySize, value, valueSize);
newEntry->next = entry->next;
recordid newRid = Talloc(xid, sizeof(stasis_linkedList_entry) + keySize + valueSize);
Tset(xid, newRid, newEntry);
entry->next = newRid;
Tset(xid, list, entry);
free(newEntry);
}
free(entry);
}
int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
stasis_linkedList_entry * entry = malloc(list.size);
pthread_mutex_lock(&stasis_linked_list_mutex);
Tread(xid, list, entry);
pthread_mutex_lock(&stasis_linked_list_mutex);
Tread(xid, list, entry);
if(!entry->next.size) {
free(entry);
@ -195,18 +187,18 @@ int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte
while(!done) {
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
int valueSize = list.size - (sizeof(stasis_linkedList_entry) + keySize);
*value = malloc(valueSize);
memcpy(*value, ((byte*)(entry+1))+keySize, valueSize);
done = 1;
ret = valueSize;
// Bucket contains the entry of interest.
int valueSize = list.size - (sizeof(stasis_linkedList_entry) + keySize);
*value = malloc(valueSize);
memcpy(*value, ((byte*)(entry+1))+keySize, valueSize);
done = 1;
ret = valueSize;
}
if(entry->next.size != -1) {
assert(entry->next.size == list.size); // Don't handle lists with variable length records for now
Tread(xid, entry->next, entry);
assert(entry->next.size == list.size); // Don't handle lists with variable length records for now
Tread(xid, entry->next, entry);
} else {
done = 1;
done = 1;
}
}
free(entry);
@ -216,9 +208,6 @@ int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte
return ret;
}
int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
byte * value;
int valueSize;
@ -275,48 +264,45 @@ static int stasis_linked_list_remove_helper(int xid, recordid list, const byte *
oldLastRead.size = -2;
int ret = 0;
while(1) {
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
if(listRoot) {
if(entry->next.size == -1) {
memset(entry, 0, list.size);
Tset(xid, lastRead, entry);
} else {
assert(entry->next.size == list.size); // Otherwise, something strange is happening, or the list contains entries with variable sizes.
stasis_linkedList_entry * entry2 = malloc(list.size);
Tread(xid, entry->next, entry2);
Tdealloc(xid, entry->next); // could break iterator, since it writes one entry ahead.
Tset(xid, lastRead, entry2);
free(entry2);
}
} else {
stasis_linkedList_entry * entry2 = malloc(list.size);
assert(oldLastRead.size != -2);
Tread(xid, oldLastRead, entry2);
memcpy(&(entry2->next), &(entry->next), sizeof(recordid));
Tset(xid, oldLastRead, entry2);
Tdealloc(xid, lastRead);
free (entry2);
}
// free(entry);
// pthread_mutex_unlock(&linked_list_mutex);
// return 1;
ret = 1;
break;
} else { // Entry doesn't match the key we're looking for.
if(entry->next.size != -1) {
assert(entry->next.size == list.size); // Don't handle lists with variable length records for now
oldLastRead = lastRead;
lastRead = entry->next;
Tread(xid, entry->next, entry);
listRoot = 0;
} else {
break;
}
while(1) {
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
if(listRoot) {
if(entry->next.size == -1) {
memset(entry, 0, list.size);
Tset(xid, lastRead, entry);
} else {
assert(entry->next.size == list.size); // Otherwise, something strange is happening, or the list contains entries with variable sizes.
stasis_linkedList_entry * entry2 = malloc(list.size);
Tread(xid, entry->next, entry2);
Tdealloc(xid, entry->next); // could break iterator, since it writes one entry ahead.
Tset(xid, lastRead, entry2);
free(entry2);
}
} else {
stasis_linkedList_entry * entry2 = malloc(list.size);
assert(oldLastRead.size != -2);
Tread(xid, oldLastRead, entry2);
memcpy(&(entry2->next), &(entry->next), sizeof(recordid));
Tset(xid, oldLastRead, entry2);
Tdealloc(xid, lastRead);
free (entry2);
}
ret = 1;
break;
} else { // Entry doesn't match the key we're looking for.
if(entry->next.size != -1) {
assert(entry->next.size == list.size); // Don't handle lists with variable length records for now
oldLastRead = lastRead;
lastRead = entry->next;
Tread(xid, entry->next, entry);
listRoot = 0;
} else {
break;
}
}
free(entry);
}
free(entry);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
@ -325,20 +311,16 @@ static int stasis_linked_list_remove_helper(int xid, recordid list, const byte *
int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize) {
byte * value = 0;
int ret;
pthread_mutex_lock(&stasis_linked_list_mutex);
int valueSize = TlinkedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
// pthread_mutex_unlock(&linked_list_mutex);
// return 0;
ret = 0;
} else {
TlinkedListRemove(xid, start_list, key, keySize);
TlinkedListInsert(xid, end_list, key, keySize, value, valueSize);
// pthread_mutex_unlock(&linked_list_mutex);
// return 1;
ret = 1;
}
if(value) { free(value); }
pthread_mutex_lock(&stasis_linked_list_mutex);
int valueSize = TlinkedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
ret = 0;
} else {
TlinkedListRemove(xid, start_list, key, keySize);
TlinkedListInsert(xid, end_list, key, keySize, value, valueSize);
ret = 1;
}
if(value) { free(value); }
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
@ -354,23 +336,23 @@ recordid TlinkedListCreate(int xid, int keySize, int valueSize) {
return ret;
}
void TlinkedListDelete(int xid, recordid list) {
stasis_linkedList_entry * entry = malloc(list.size);
stasis_linkedList_entry * entry = malloc(list.size);
Tread(xid, list, entry);
Tdealloc(xid, list);
Tread(xid, list, entry);
Tdealloc(xid, list);
if(entry->next.size == 0) {
return;
}
if(entry->next.size == 0) {
return;
}
while(entry->next.size != -1) {
recordid nextEntry;
Tread(xid, nextEntry, entry);
assert(!memcmp(&nextEntry, &(entry->next), sizeof(recordid)));
Tdealloc(xid, nextEntry);
}
while(entry->next.size != -1) {
recordid nextEntry;
Tread(xid, nextEntry, entry);
assert(!memcmp(&nextEntry, &(entry->next), sizeof(recordid)));
Tdealloc(xid, nextEntry);
}
free(entry);
free(entry);
}
stasis_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize) {
@ -397,36 +379,34 @@ int TlinkedListNext(int xid, stasis_linkedList_iterator * it, byte ** key, int *
pthread_mutex_lock(&stasis_linked_list_mutex);
if(it->first == -1) {
it->first = 1;
} else if(it->first) {
entry = malloc(it->next.size);
Tread(xid, it->listRoot, entry);
int listTouched;
listTouched = memcmp(&(entry->next), &(it->next), sizeof(recordid));
free(entry);
if(listTouched) {
//The root entry was removed. Reset the iterator.
it->first = -1;
it->next = it->listRoot;
ret = TlinkedListNext(xid, it, key, keySize, value, valueSize);
// pthread_mutex_unlock(&linked_list_mutex);
done = 1;
// return ret;
} else {
//continue as normal.
it->first = 0;
}
if(it->first == -1) {
it->first = 1;
} else if(it->first) {
entry = malloc(it->next.size);
Tread(xid, it->listRoot, entry);
int listTouched;
listTouched = memcmp(&(entry->next), &(it->next), sizeof(recordid));
free(entry);
if(listTouched) {
//The root entry was removed. Reset the iterator.
it->first = -1;
it->next = it->listRoot;
ret = TlinkedListNext(xid, it, key, keySize, value, valueSize);
done = 1;
} else {
//continue as normal.
it->first = 0;
}
}
if(done) {
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}
assert(it->keySize + it->valueSize + sizeof(stasis_linkedList_entry) == it->next.size);
entry = malloc(it->next.size);
Tread(xid, it->next, entry);
assert(it->keySize + it->valueSize + sizeof(stasis_linkedList_entry) == it->next.size);
entry = malloc(it->next.size);
Tread(xid, it->next, entry);
if(entry->next.size) {
*keySize = it->keySize;

View file

@ -35,12 +35,11 @@ static pblHashTable_t * openHashes = NULL;
static void rehash(int xid, recordid hash, pageid_t next_split, pageid_t i, unsigned int keySize, unsigned int valSize);
static void update_hash_header(int xid, recordid hash, pageid_t i, pageid_t next_split);
static int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
void * key, int keySize, int valSize, recordid * deletedEntry);
void * key, int keySize, int valSize, recordid * deletedEntry);
static void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete);
hashEntry * e, int keySize, int valSize, int skipDelete);
static int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize);
static int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize) {
int found;
@ -109,142 +108,142 @@ static void update_hash_header(int xid, recordid hash, pageid_t i, pageid_t next
}
static void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, unsigned int keySize, unsigned int valSize) {
int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + stasis_util_two_to_the(i-1);
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + stasis_util_two_to_the(i-1);
hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
Tread(xid, ba, A_contents);
Tread(xid, bb, D_contents);
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid B = A_contents->next;
recordid C;
Tread(xid, ba, A_contents);
Tread(xid, bb, D_contents);
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid B = A_contents->next;
recordid C;
if(!A_contents->next.size) {
/* Bucket A is empty, so we're done. */
if(!A_contents->next.size) {
/* Bucket A is empty, so we're done. */
free(D_contents);
free(A_contents);
free(B_contents);
return;
}
uint64_t old_hash;
uint64_t new_hash =
2 + stasis_linear_hash(A_contents+1, keySize, i, UINT_MAX);
while(new_hash != next_split) {
// Need a record in A that belongs in the first bucket...
recordid oldANext = A_contents->next;
A_contents->next = NULLRID;
if(firstD) {
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
Tset(xid, D, A_contents);
firstD = 0;
} else {
/* D at end of list => can overwrite next. */
D_contents->next = Talloc(xid, sizeof(hashEntry) + keySize + valSize); /* @todo
unfortunate
to
dealloc
A's
successor,
then
alloc.. */
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
Tset(xid, D_contents->next, A_contents);
// assert(memcmp(&D, &D_contents->next, sizeof(recordid)));
Tset(xid, D, D_contents);
D = A;
}
hashEntry * swap = D_contents;
D_contents = A_contents;
A_contents = swap;
/* A_contents is now garbage. */
assert(A.size == sizeof(hashEntry) + keySize + valSize);
if(oldANext.size == -1) {
memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
Tset(xid, A, A_contents);
free(D_contents);
free(A_contents);
free(B_contents);
return;
}
assert(oldANext.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, oldANext, A_contents);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
Tset(xid, A, A_contents);
Tdealloc(xid, oldANext);
uint64_t old_hash;
uint64_t new_hash =
2 + stasis_linear_hash(A_contents+1, keySize, i, UINT_MAX);
new_hash = stasis_linear_hash(A_contents+1, keySize, i, UINT_MAX) + 2;
}
while(new_hash != next_split) {
// Need a record in A that belongs in the first bucket...
B = A_contents->next;
recordid oldANext = A_contents->next;
while(B.size != -1) {
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
C = B_contents->next;
A_contents->next = NULLRID;
old_hash = stasis_linear_hash(B_contents+1, keySize, i-1, UINT_MAX) + 2;
new_hash = stasis_linear_hash(B_contents+1, keySize, i, UINT_MAX) + 2;
if(firstD) {
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
Tset(xid, D, A_contents);
firstD = 0;
} else {
/* D at end of list => can overwrite next. */
D_contents->next = Talloc(xid, sizeof(hashEntry) + keySize + valSize); /* @todo
unfortunate
to
dealloc
A's
successor,
then
alloc.. */
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
Tset(xid, D_contents->next, A_contents);
// assert(memcmp(&D, &D_contents->next, sizeof(recordid)));
Tset(xid, D, D_contents);
D = A;
}
hashEntry * swap = D_contents;
D_contents = A_contents;
A_contents = swap;
assert(next_split == old_hash);
assert(new_hash == old_hash || new_hash == old_hash + stasis_util_two_to_the(i-1));
/* A_contents is now garbage. */
if(new_hash == old_hash) {
A = B;
B = C;
C.size = -1;
firstA = 0;
} else {
assert(D.size == sizeof(hashEntry) + keySize + valSize);
assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, D, D_contents);
D_contents->next = B;
assert(B.size != 0);
Tset(xid, D, D_contents);
// A is somewhere in the first list.
assert(A.size == sizeof(hashEntry) + keySize + valSize);
if(oldANext.size == -1) {
memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
Tset(xid, A, A_contents);
free(D_contents);
free(A_contents);
free(B_contents);
return;
}
assert(oldANext.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, oldANext, A_contents);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, A, A_contents);
A_contents->next = C;
assert(C.size != 0);
Tset(xid, A, A_contents);
Tdealloc(xid, oldANext);
new_hash = stasis_linear_hash(A_contents+1, keySize, i, UINT_MAX) + 2;
}
B = A_contents->next;
while(B.size != -1) {
// B _can't_ be a bucket.
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
C = B_contents->next;
B_contents->next = NULLRID;
Tset(xid, B, B_contents);
old_hash = stasis_linear_hash(B_contents+1, keySize, i-1, UINT_MAX) + 2;
new_hash = stasis_linear_hash(B_contents+1, keySize, i, UINT_MAX) + 2;
assert(next_split == old_hash);
assert(new_hash == old_hash || new_hash == old_hash + stasis_util_two_to_the(i-1));
if(new_hash == old_hash) {
A = B;
B = C;
C.size = -1;
firstA = 0;
} else {
assert(D.size == sizeof(hashEntry) + keySize + valSize);
assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, D, D_contents);
D_contents->next = B;
assert(B.size != 0);
Tset(xid, D, D_contents);
// A is somewhere in the first list.
assert(A.size == sizeof(hashEntry) + keySize + valSize);
assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, A, A_contents);
A_contents->next = C;
assert(C.size != 0);
Tset(xid, A, A_contents);
// B _can't_ be a bucket.
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
B_contents->next = NULLRID;
Tset(xid, B, B_contents);
// Update Loop State
D = B;
B = C;
C.size = -1;
firstD = 0;
}
// Update Loop State
D = B;
B = C;
C.size = -1;
firstD = 0;
}
free(D_contents);
free(A_contents);
free(B_contents);
}
free(D_contents);
free(A_contents);
free(B_contents);
}
static void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete) {
hashEntry * e, int keySize, int valSize, int skipDelete) {
recordid deleteMe;
if(!skipDelete) {
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, valSize, &deleteMe)) {
@ -259,9 +258,6 @@ static void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashE
/*@todo consider recovery for insertIntoBucket. */
hashRid.slot = bucket_number;
// Page * p = loadPage(xid, hashRid.page);
// assert(stasis_record_type_to_size(stasis_record_dereference(xid, p, hashRid).size) == sizeof(hashEntry) + valSize + keySize);
// releasePage(p);
Tread(xid, hashRid, bucket_contents);
@ -284,7 +280,7 @@ static void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashE
}
static int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
void * key, int keySize, int valSize, recordid * deletedEntry) {
void * key, int keySize, int valSize, recordid * deletedEntry) {
if(bucket_contents->next.size == 0) { return 0; }
recordid this = hash;
@ -335,7 +331,6 @@ static int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry
found = 1;
break;
}
}
free(A);
@ -403,8 +398,8 @@ void TnaiveHashDeinit() {
}
void TnaiveHashInsert(int xid, recordid hashRid,
void * key, int keySize,
void * val, int valSize) {
void * key, int keySize,
void * val, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(hashRid.page));
@ -429,7 +424,7 @@ void TnaiveHashInsert(int xid, recordid hashRid,
/** @todo hash hable probably should track the number of items in it,
so that expand can be selectively called. */
int TnaiveHashDelete(int xid, recordid hashRid,
void * key, int keySize, int valSize) {
void * key, int keySize, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(hashRid.page));
int bucket_number = stasis_linear_hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
@ -464,7 +459,6 @@ int TnaiveHashOpen(int xid, recordid hashRid, int keySize, int valSize) {
void TnaiveHashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) {
TnaiveHashDelete(xid, hashRid, key, keySize, valSize);
TnaiveHashInsert(xid, hashRid, key, keySize, val, valSize);
}
@ -476,7 +470,6 @@ int TnaiveHashClose(int xid, recordid hashRid) {
}
int TnaiveHashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(hashRid.page));
int bucket_number = stasis_linear_hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
int ret = findInBucket(xid, hashRid, bucket_number, key, keySize, buf, valSize);

View file

@ -21,93 +21,93 @@ recordid TpagedListAlloc(int xid) {
int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int ret;
pagedListHeader header;
Tread(xid, list, &header);
recordid headerRid = list;
pagedListHeader header;
Tread(xid, list, &header);
recordid headerRid = list;
pagedListHeader firstHeader = header;
pagedListHeader firstHeader = header;
ret = 0;
int entrySize = sizeof(pagedListEntry) + keySize + valueSize;
ret = 0;
int entrySize = sizeof(pagedListEntry) + keySize + valueSize;
recordid rid = TallocFromPage(xid, headerRid.page, entrySize);
DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size);
recordid rid = TallocFromPage(xid, headerRid.page, entrySize);
DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size);
// When the loop completes, header will contain the contents of the page header the entry will be inserted into,
// headerrid will contain the rid of that header, and rid will contain the newly allocated recordid
while(rid.size == -1) {
if(header.nextPage.size == -1) {
// We're at the end of the list
// When the loop completes, header will contain the contents of the page header the entry will be inserted into,
// headerrid will contain the rid of that header, and rid will contain the newly allocated recordid
while(rid.size == -1) {
if(header.nextPage.size == -1) {
// We're at the end of the list
recordid newHeadRid = Talloc(xid, sizeof(pagedListHeader));
pagedListHeader newHead;
memset(&newHead,0,sizeof(newHead));
newHead.thisPage = 0;
newHead.nextPage = firstHeader.nextPage;
recordid newHeadRid = Talloc(xid, sizeof(pagedListHeader));
pagedListHeader newHead;
memset(&newHead,0,sizeof(newHead));
newHead.thisPage = 0;
newHead.nextPage = firstHeader.nextPage;
firstHeader.nextPage = newHeadRid;
firstHeader.nextPage = newHeadRid;
Tset(xid, newHeadRid, &newHead);
Tset(xid, list, &firstHeader);
Tset(xid, newHeadRid, &newHead);
Tset(xid, list, &firstHeader);
header = newHead;
headerRid = newHeadRid;
} else {
headerRid = header.nextPage;
Tread(xid, header.nextPage, &header);
}
rid = TallocFromPage(xid, headerRid.page, entrySize);
DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size);
header = newHead;
headerRid = newHeadRid;
} else {
headerRid = header.nextPage;
Tread(xid, header.nextPage, &header);
}
rid = TallocFromPage(xid, headerRid.page, entrySize);
DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size);
}
pagedListEntry * dat = malloc(entrySize);
pagedListEntry * dat = malloc(entrySize);
dat->keySize = keySize;
dat->nextEntry = header.thisPage;
memcpy(dat+1, key, keySize);
memcpy(((byte*)(dat+1))+keySize, value, valueSize);
Tset(xid, rid, dat);
dat->keySize = keySize;
dat->nextEntry = header.thisPage;
memcpy(dat+1, key, keySize);
memcpy(((byte*)(dat+1))+keySize, value, valueSize);
Tset(xid, rid, dat);
header.thisPage = rid.slot;
DEBUG("Header.thisPage = %d\n", rid.slot);
Tset(xid, headerRid, &header);
free(dat);
header.thisPage = rid.slot;
DEBUG("Header.thisPage = %d\n", rid.slot);
Tset(xid, headerRid, &header);
free(dat);
return ret;
}
int TpagedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
pagedListHeader header;
Tread(xid, list, &header);
pagedListHeader header;
Tread(xid, list, &header);
recordid rid;
rid.page = list.page;
rid.slot = header.thisPage;
rid.size = 0;
recordid rid;
rid.page = list.page;
rid.slot = header.thisPage;
rid.size = 0;
while(rid.slot || header.nextPage.size != -1) {
while(rid.slot || header.nextPage.size != -1) {
if(rid.slot) {
rid.size = TrecordSize(xid, rid);
pagedListEntry * dat;
dat = malloc(rid.size);
Tread(xid, rid, dat);
if(rid.slot) {
rid.size = TrecordSize(xid, rid);
pagedListEntry * dat;
dat = malloc(rid.size);
Tread(xid, rid, dat);
if(dat->keySize == keySize && !memcmp(dat+1, key, keySize)) {
int valueSize = rid.size - keySize - sizeof(pagedListEntry);
*value = malloc(valueSize);
memcpy(*value, ((byte*)(dat+1))+keySize, valueSize);
free(dat);
return valueSize;
}
rid.slot = dat->nextEntry; // rid.slot will be zero if this is the last entry
if(dat->keySize == keySize && !memcmp(dat+1, key, keySize)) {
int valueSize = rid.size - keySize - sizeof(pagedListEntry);
*value = malloc(valueSize);
memcpy(*value, ((byte*)(dat+1))+keySize, valueSize);
free(dat);
} else if (header.nextPage.size != -1) { // another page
rid.page = header.nextPage.page;
Tread(xid, header.nextPage, &header);
rid.slot = header.thisPage;
return valueSize;
}
rid.slot = dat->nextEntry; // rid.slot will be zero if this is the last entry
free(dat);
} else if (header.nextPage.size != -1) { // another page
rid.page = header.nextPage.page;
Tread(xid, header.nextPage, &header);
rid.slot = header.thisPage;
}
}
return -1;
}
@ -115,67 +115,66 @@ int TpagedListRemove(int xid, recordid list, const byte * key, int keySize) {
pagedListHeader header;
int ret = 0;
Tread(xid, list, &header);
recordid headerRid;
recordid rid;
rid.page = list.page;
rid.slot = header.thisPage;
short lastSlot = -1;
headerRid = list;
while(rid.slot || header.nextPage.size != -1) {
if(rid.slot) {
rid.size = TrecordSize(xid, rid);
pagedListEntry * dat = malloc(rid.size);
Tread(xid, rid, dat);
Tread(xid, list, &header);
recordid headerRid;
recordid rid;
rid.page = list.page;
rid.slot = header.thisPage;
short lastSlot = -1;
headerRid = list;
while(rid.slot || header.nextPage.size != -1) {
if(rid.slot) {
rid.size = TrecordSize(xid, rid);
pagedListEntry * dat = malloc(rid.size);
Tread(xid, rid, dat);
if(dat->keySize == keySize && !memcmp(dat+1, key, keySize)) {
if(dat->keySize == keySize && !memcmp(dat+1, key, keySize)) {
if(lastSlot != -1) {
recordid lastRid = rid;
lastRid.slot = lastSlot;
lastRid.size = TrecordSize(xid, lastRid);
pagedListEntry * lastRidBuf = malloc(lastRid.size);
Tread(xid, lastRid, lastRidBuf);
lastRidBuf->nextEntry = dat->nextEntry;
Tset(xid, lastRid, lastRidBuf);
free(lastRidBuf);
} else {
header.thisPage = dat->nextEntry;
Tset(xid, headerRid, &header);
}
Tdealloc(xid, rid);
free(dat);
ret = 1;
break;
if(lastSlot != -1) {
recordid lastRid = rid;
lastRid.slot = lastSlot;
lastRid.size = TrecordSize(xid, lastRid);
pagedListEntry * lastRidBuf = malloc(lastRid.size);
Tread(xid, lastRid, lastRidBuf);
lastRidBuf->nextEntry = dat->nextEntry;
Tset(xid, lastRid, lastRidBuf);
free(lastRidBuf);
} else {
header.thisPage = dat->nextEntry;
Tset(xid, headerRid, &header);
}
lastSlot = rid.slot;
rid.slot = dat->nextEntry;
Tdealloc(xid, rid);
free(dat);
} else if (header.nextPage.size != -1) { // another page
lastSlot = -1;
rid.page = header.nextPage.page;
headerRid = header.nextPage;
Tread(xid, header.nextPage, &header);
rid.slot = header.thisPage;
ret = 1;
break;
}
lastSlot = rid.slot;
rid.slot = dat->nextEntry;
free(dat);
} else if (header.nextPage.size != -1) { // another page
lastSlot = -1;
rid.page = header.nextPage.page;
headerRid = header.nextPage;
Tread(xid, header.nextPage, &header);
rid.slot = header.thisPage;
}
}
return ret;
}
int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte * key, int keySize) {
byte * value = NULL;
int ret;
int valueSize = TpagedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
ret = TpagedListRemove(xid, start_list, key, keySize);
assert(ret);
ret = TpagedListInsert(xid, end_list, key, keySize, value, valueSize);
assert(!ret);
if(value) { free(value); }
// ret = 1;
} else {
ret = 0;
}
int valueSize = TpagedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
ret = TpagedListRemove(xid, start_list, key, keySize);
assert(ret);
ret = TpagedListInsert(xid, end_list, key, keySize, value, valueSize);
assert(!ret);
if(value) { free(value); }
} else {
ret = 0;
}
return ret;
}

View file

@ -161,7 +161,6 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
stasis_record_read_done(xid,p,rid,dat);
return 0;
}
/**
@todo stasis_record_dereference should dispatch via page_impl...

View file

@ -99,7 +99,6 @@ int Tinit() {
iterator_init();
consumer_init();
setupLockManagerCallbacksNil();
//setupLockManagerCallbacksPage();
stasis_recovery_initiate(stasis_log_file, stasis_transaction_table, stasis_alloc);
stasis_truncation = stasis_truncation_init(stasis_dirty_page_table, stasis_transaction_table,
@ -112,7 +111,6 @@ int Tinit() {
return 0;
}
int Tbegin() {
assert(stasis_initted);
@ -351,20 +349,20 @@ int Tabort(int xid) {
assert(t->xid == xid);
if( t->prevLSN != INVALID_LSN ) {
lsn = stasis_log_abort_transaction(stasis_log_file, stasis_transaction_table, t);
lsn = stasis_log_abort_transaction(stasis_log_file, stasis_transaction_table, t);
/** @todo is the order of the next two calls important? */
undoTrans(stasis_log_file, stasis_transaction_table, *t); // XXX don't really need to pass the whole table in...
if(globalLockManager.abort) { globalLockManager.abort(xid); }
/** @todo is the order of the next two calls important? */
undoTrans(stasis_log_file, stasis_transaction_table, *t); // XXX don't really need to pass the whole table in...
if(globalLockManager.abort) { globalLockManager.abort(xid); }
} else {
// This would normally be called by stasis_recovery_undo inside of undoTrans.
// Since we skip the call to undoTrans, we call it here. Note that this is
// different than the API usage in TcommitHelper(). The reason is that
// undoTrans needs to deal with Tprepare().
// This would normally be called by stasis_recovery_undo inside of undoTrans.
// Since we skip the call to undoTrans, we call it here. Note that this is
// different than the API usage in TcommitHelper(). The reason is that
// undoTrans needs to deal with Tprepare().
// @todo pull up all calls to stasis_transaction_table_forget(),
// and move this invocation outside of the if-then-else.
stasis_transaction_table_forget(stasis_transaction_table, xid);
// @todo pull up all calls to stasis_transaction_table_forget(),
// and move this invocation outside of the if-then-else.
stasis_transaction_table_forget(stasis_transaction_table, xid);
}
return 0;
@ -382,9 +380,9 @@ int Tdeinit() {
for(int i = 0; i < count; i++) {
if(stasis_transaction_table_get(stasis_transaction_table,
active[i])->prevLSN != INVALID_LSN) {
active[i])->prevLSN != INVALID_LSN) {
if(!stasis_suppress_unclean_shutdown_warnings) {
fprintf(stderr, "WARNING: Tdeinit() is aborting transaction %d\n", active[i]);
fprintf(stderr, "WARNING: Tdeinit() is aborting transaction %d\n", active[i]);
}
}
Tabort(active[i]);
@ -461,10 +459,8 @@ typedef struct {
int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) {
stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(xid >= 0);
void * e = stasis_log_begin_nta(stasis_log_file,
xact,
op, dat, datSize);
// HACK: breaks encapsulation.
void * e = stasis_log_begin_nta(stasis_log_file, xact, op, dat, datSize);
// XXX HACK: breaks encapsulation.
stasis_operation_do(e, NULL);
stasis_log_end_nta(stasis_log_file, xact, e);
@ -489,7 +485,7 @@ lsn_t TendNestedTopAction(int xid, void * handle) {
lsn_t ret = stasis_log_end_nta(stasis_log_file, stasis_transaction_table_get(stasis_transaction_table, xid), handle);
DEBUG("NestedTopAction CLR %d, LSN: %ld type: %ld (undoing: %ld, next to undo: %ld)\n", e->xid,
clrLSN, undoneLSN, *prevLSN);
clrLSN, undoneLSN, *prevLSN);
return ret;
}
@ -524,7 +520,6 @@ int TisActiveTransaction(stasis_transaction_fingerprint_t * fp) {
&& stasis_transaction_table_get(stasis_transaction_table, fp->xid)->recLSN == fp->rec_lsn;
}
void * stasis_log() {
return stasis_log_file;
}

View file

@ -162,9 +162,9 @@ typedef struct {
#pragma pack(pop)
/*
Define Page as an incomplete type to hide its implementation from clients.
Define Page as an incomplete type to hide its implementation from clients.
Include stasis/page.h for the complete definition.
Include stasis/page.h for the complete definition.
*/
typedef struct Page_s Page;

View file

@ -69,13 +69,6 @@ terms specified in this license.
#ifndef __pageOrientedListNTA_H
#define __pageOrientedListNTA_H
//typedef struct {
// long page;
/* The slot of the next record to be returned. */
// int slot;
//} lladd_pagedList_iterator;
typedef struct {
recordid headerRid;
recordid entryRid;
@ -87,7 +80,6 @@ typedef struct {
} pagedListHeader;
//recordid dereferencePagedListRID(int xid, recordid rid);
/** @return 1 if the key was already in the list. */
int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
int TpagedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value);

View file

@ -55,11 +55,11 @@ terms specified in this license.
#define __SET_H__
/**
Changes the value of a record.
@param xid the transaction id
@param rid the recordid of the record to be changed.
@param dat the new value of the record.
*/
Changes the value of a record.
@param xid the transaction id
@param rid the recordid of the record to be changed.
@param dat the new value of the record.
*/
int Tset(int xid, recordid rid, const void * dat);
Page * TsetWithPage(int xid, recordid rid, Page *p, const void * dat);