add paranoid checks to concurrentHash and concurrentBufferManager; tightened up invariants maintained by concurrentHash
This commit is contained in:
parent
00cf4eb4ce
commit
6525949f56
3 changed files with 94 additions and 8 deletions
|
@ -183,7 +183,7 @@ static void deinitTLS(void *tlsp) {
|
||||||
stasis_buffer_concurrent_hash_t *ch = tls->bm->impl;
|
stasis_buffer_concurrent_hash_t *ch = tls->bm->impl;
|
||||||
|
|
||||||
Page * p = tls->p;
|
Page * p = tls->p;
|
||||||
p->id = -1;
|
p->id = -2;
|
||||||
while(hashtable_test_and_set(ch->ht,p->id, p)) {
|
while(hashtable_test_and_set(ch->ht,p->id, p)) {
|
||||||
p->id --;
|
p->id --;
|
||||||
}
|
}
|
||||||
|
@ -240,7 +240,12 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma
|
||||||
|| (!tls->p->dirty)
|
|| (!tls->p->dirty)
|
||||||
)) {
|
)) {
|
||||||
// The getStaleAndRemove was not atomic with the hashtable remove, which is OK (but we can't trust tmp anymore...)
|
// The getStaleAndRemove was not atomic with the hashtable remove, which is OK (but we can't trust tmp anymore...)
|
||||||
assert(tmp == tls->p);
|
if(tmp != tls->p) {
|
||||||
|
int copy_count = hashtable_debug_number_of_key_copies(ch->ht, tmp->id);
|
||||||
|
assert(copy_count == 1);
|
||||||
|
assert(tmp == tls->p);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
// note that we'd like to assert that the page is unpinned here. However, we can't simply look at p->queue, since another thread could be inside the "spooky" quote below.
|
// note that we'd like to assert that the page is unpinned here. However, we can't simply look at p->queue, since another thread could be inside the "spooky" quote below.
|
||||||
tmp = 0;
|
tmp = 0;
|
||||||
if(tls->p->id >= 0) {
|
if(tls->p->id >= 0) {
|
||||||
|
@ -444,7 +449,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_
|
||||||
|
|
||||||
for(pageid_t i = 0; i < stasis_buffer_manager_size; i++) {
|
for(pageid_t i = 0; i < stasis_buffer_manager_size; i++) {
|
||||||
Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool);
|
Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool);
|
||||||
stasis_buffer_pool_free_page(ch->buffer_pool, p,-1*i);
|
stasis_buffer_pool_free_page(ch->buffer_pool, p,(-1*i)-2);
|
||||||
pageSetNode(p,0,0);
|
pageSetNode(p,0,0);
|
||||||
(*pagePinCountPtr(p)) = 1;
|
(*pagePinCountPtr(p)) = 1;
|
||||||
ch->lru->insert(ch->lru, p); // decrements pin count ptr (setting it to zero)
|
ch->lru->insert(ch->lru, p); // decrements pin count ptr (setting it to zero)
|
||||||
|
|
|
@ -10,6 +10,8 @@
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
|
||||||
|
//#define STASIS_HASHTABLE_FSCK_THREAD
|
||||||
|
|
||||||
struct bucket_t {
|
struct bucket_t {
|
||||||
pageid_t key;
|
pageid_t key;
|
||||||
pthread_mutex_t mut;
|
pthread_mutex_t mut;
|
||||||
|
@ -19,6 +21,10 @@ struct bucket_t {
|
||||||
struct hashtable_t {
|
struct hashtable_t {
|
||||||
bucket_t* buckets;
|
bucket_t* buckets;
|
||||||
pageid_t maxbucketid;
|
pageid_t maxbucketid;
|
||||||
|
#ifdef STASIS_HASHTABLE_FSCK_THREAD
|
||||||
|
int is_open;
|
||||||
|
pthread_t fsck_thread;
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
static inline pageid_t hashtable_wrap(hashtable_t *ht, pageid_t p) {
|
static inline pageid_t hashtable_wrap(hashtable_t *ht, pageid_t p) {
|
||||||
|
@ -55,6 +61,10 @@ static inline pageid_t hashtable_func(hashtable_t *ht, pageid_t key) {
|
||||||
return hashtable_wrap(ht, hash6432shift(key));
|
return hashtable_wrap(ht, hash6432shift(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef STASIS_HASHTABLE_FSCK_THREAD
|
||||||
|
void * hashtable_fsck_worker(void * htp);
|
||||||
|
#endif
|
||||||
|
|
||||||
hashtable_t * hashtable_init(pageid_t size) {
|
hashtable_t * hashtable_init(pageid_t size) {
|
||||||
pageid_t newsize = 1;
|
pageid_t newsize = 1;
|
||||||
for(int i = 0; size; i++) {
|
for(int i = 0; size; i++) {
|
||||||
|
@ -65,22 +75,72 @@ hashtable_t * hashtable_init(pageid_t size) {
|
||||||
|
|
||||||
ht->maxbucketid = (newsize) - 1;
|
ht->maxbucketid = (newsize) - 1;
|
||||||
ht->buckets = calloc(ht->maxbucketid+1, sizeof(bucket_t));
|
ht->buckets = calloc(ht->maxbucketid+1, sizeof(bucket_t));
|
||||||
|
for(int i = 0; i <= ht->maxbucketid; i++) {
|
||||||
|
ht->buckets[i].key = -1;
|
||||||
|
}
|
||||||
pthread_mutexattr_t attr;
|
pthread_mutexattr_t attr;
|
||||||
pthread_mutexattr_init(&attr);
|
pthread_mutexattr_init(&attr);
|
||||||
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
||||||
for(pageid_t i = 0; i <= ht->maxbucketid; i++) {
|
for(pageid_t i = 0; i <= ht->maxbucketid; i++) {
|
||||||
pthread_mutex_init(&(ht->buckets[i].mut), &attr);
|
pthread_mutex_init(&(ht->buckets[i].mut), &attr);
|
||||||
}
|
}
|
||||||
|
#ifdef STASIS_HASHTABLE_FSCK_THREAD
|
||||||
|
ht->is_open = 1;
|
||||||
|
pthread_create(&ht->fsck_thread,0, hashtable_fsck_worker, ht);
|
||||||
|
#endif
|
||||||
return ht;
|
return ht;
|
||||||
}
|
}
|
||||||
void hashtable_deinit(hashtable_t * ht) {
|
void hashtable_deinit(hashtable_t * ht) {
|
||||||
for(pageid_t i = 0; i < ht->maxbucketid; i++) {
|
#ifdef STASIS_HASHTABLE_FSCK_THREAD
|
||||||
|
ht->is_open = 0;
|
||||||
|
pthread_join(ht->fsck_thread, 0);
|
||||||
|
#endif
|
||||||
|
for(pageid_t i = 0; i <= ht->maxbucketid; i++) {
|
||||||
pthread_mutex_destroy(&ht->buckets[i].mut);
|
pthread_mutex_destroy(&ht->buckets[i].mut);
|
||||||
}
|
}
|
||||||
free(ht->buckets);
|
free(ht->buckets);
|
||||||
free(ht);
|
free(ht);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int hashtable_debug_number_of_key_copies(hashtable_t *ht, pageid_t pageid) {
|
||||||
|
int count = 0;
|
||||||
|
for(int i = 0; i <= ht->maxbucketid; i++) {
|
||||||
|
if(ht->buckets[i].key == pageid) { count ++; }
|
||||||
|
}
|
||||||
|
if(count > 0) { fprintf(stderr, "%d copies of key %lld in hashtable!", count, (unsigned long long) pageid); }
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
void hashtable_fsck(hashtable_t *ht) {
|
||||||
|
pthread_mutex_lock(&ht->buckets[0].mut);
|
||||||
|
for(int i = 1; i <= ht->maxbucketid; i++) {
|
||||||
|
pthread_mutex_lock(&ht->buckets[i].mut);
|
||||||
|
if(ht->buckets[i].key != -1) {
|
||||||
|
pageid_t this_hash_code = hashtable_func(ht, ht->buckets[i].key);
|
||||||
|
if(this_hash_code != i) {
|
||||||
|
assert(ht->buckets[i-1].key != -1);
|
||||||
|
assert(ht->buckets[i-1].val != 0);
|
||||||
|
assert(this_hash_code < i || (this_hash_code > i + (ht->maxbucketid/2)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assert(ht->buckets[i].val == NULL);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&ht->buckets[i-1].mut);
|
||||||
|
}
|
||||||
|
pthread_mutex_lock(&ht->buckets[0].mut);
|
||||||
|
if(ht->buckets[0].key != -1) {
|
||||||
|
pageid_t this_hash_code = hashtable_func(ht, ht->buckets[0].key);
|
||||||
|
if(this_hash_code != 0) {
|
||||||
|
assert(ht->buckets[ht->maxbucketid].key != -1);
|
||||||
|
assert(ht->buckets[ht->maxbucketid].val != 0);
|
||||||
|
assert(this_hash_code < 0 || (this_hash_code > 0 + (ht->maxbucketid/2)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assert(ht->buckets[ht->maxbucketid].val == NULL);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&ht->buckets[ht->maxbucketid].mut);
|
||||||
|
pthread_mutex_unlock(&ht->buckets[0].mut);
|
||||||
|
}
|
||||||
typedef enum {
|
typedef enum {
|
||||||
LOOKUP,
|
LOOKUP,
|
||||||
INSERT,
|
INSERT,
|
||||||
|
@ -89,6 +149,7 @@ typedef enum {
|
||||||
} hashtable_mode;
|
} hashtable_mode;
|
||||||
static inline void * hashtable_begin_op(hashtable_mode mode, hashtable_t *ht, pageid_t p, void *val, hashtable_bucket_handle_t *h) {
|
static inline void * hashtable_begin_op(hashtable_mode mode, hashtable_t *ht, pageid_t p, void *val, hashtable_bucket_handle_t *h) {
|
||||||
static int warned = 0;
|
static int warned = 0;
|
||||||
|
assert(p != -1);
|
||||||
pageid_t idx = hashtable_func(ht, p);
|
pageid_t idx = hashtable_func(ht, p);
|
||||||
void * ret;
|
void * ret;
|
||||||
bucket_t *b1 = &ht->buckets[idx], *b2 = NULL;
|
bucket_t *b1 = &ht->buckets[idx], *b2 = NULL;
|
||||||
|
@ -105,8 +166,8 @@ static inline void * hashtable_begin_op(hashtable_mode mode, hashtable_t *ht, pa
|
||||||
}
|
}
|
||||||
assert(num_incrs < (ht->maxbucketid/4));
|
assert(num_incrs < (ht->maxbucketid/4));
|
||||||
num_incrs++;
|
num_incrs++;
|
||||||
if(b1->key == p) { ret = b1->val; break; }
|
if(b1->key == p) { assert(b1->val); ret = b1->val; break; }
|
||||||
if(b1->val == NULL) { ret = NULL; break; }
|
if(b1->val == NULL) { assert(b1->key == -1); ret = NULL; break; }
|
||||||
idx = hashtable_wrap(ht, idx+1);
|
idx = hashtable_wrap(ht, idx+1);
|
||||||
b2 = b1;
|
b2 = b1;
|
||||||
b1 = &ht->buckets[idx];
|
b1 = &ht->buckets[idx];
|
||||||
|
@ -120,6 +181,20 @@ static inline void * hashtable_begin_op(hashtable_mode mode, hashtable_t *ht, pa
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef STASIS_HASHTABLE_FSCK_THREAD
|
||||||
|
void * hashtable_fsck_worker(void * htp) {
|
||||||
|
hashtable_t * ht = htp;
|
||||||
|
while(ht->is_open) {
|
||||||
|
fprintf(stderr, "Scanning hashtable %x", (unsigned int)ht);
|
||||||
|
sleep(1);
|
||||||
|
hashtable_fsck(ht);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
void hashtable_end_op(hashtable_mode mode, hashtable_t *ht, void *val, hashtable_bucket_handle_t *h) {
|
void hashtable_end_op(hashtable_mode mode, hashtable_t *ht, void *val, hashtable_bucket_handle_t *h) {
|
||||||
pageid_t idx = h->idx;
|
pageid_t idx = h->idx;
|
||||||
bucket_t * b1 = h->b1;
|
bucket_t * b1 = h->b1;
|
||||||
|
@ -142,7 +217,8 @@ void hashtable_end_op(hashtable_mode mode, hashtable_t *ht, void *val, hashtable
|
||||||
// Case 1: It is null, we win.
|
// Case 1: It is null, we win.
|
||||||
if(b1->val == NULL) {
|
if(b1->val == NULL) {
|
||||||
// printf("d\n"); fflush(0);
|
// printf("d\n"); fflush(0);
|
||||||
b2->key = 0;
|
assert(b1->key == -1);
|
||||||
|
b2->key = -1;
|
||||||
b2->val = NULL;
|
b2->val = NULL;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -36,6 +36,11 @@ void * hashtable_remove_begin(hashtable_t *ht, pageid_t p, hashtable_bucket_hand
|
||||||
void hashtable_remove_finish(hashtable_t *ht, hashtable_bucket_handle_t *h);
|
void hashtable_remove_finish(hashtable_t *ht, hashtable_bucket_handle_t *h);
|
||||||
void hashtable_remove_cancel(hashtable_t *ht, hashtable_bucket_handle_t *h);
|
void hashtable_remove_cancel(hashtable_t *ht, hashtable_bucket_handle_t *h);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return -0 if key not found, 1 if the key exists, >1 if the hashtable is corrupt, and the key appears multiple times..
|
||||||
|
*/
|
||||||
|
int hashtable_debug_number_of_key_copies(hashtable_t *ht, pageid_t pageied);
|
||||||
|
|
||||||
void hashtable_unlock(hashtable_bucket_handle_t *h);
|
void hashtable_unlock(hashtable_bucket_handle_t *h);
|
||||||
|
|
||||||
#endif /* CONCURRENTHASH_H_ */
|
#endif /* CONCURRENTHASH_H_ */
|
||||||
|
|
Loading…
Reference in a new issue