diff --git a/src/stasis/bufferManager/concurrentBufferManager.c b/src/stasis/bufferManager/concurrentBufferManager.c index 291c562..b8f131e 100644 --- a/src/stasis/bufferManager/concurrentBufferManager.c +++ b/src/stasis/bufferManager/concurrentBufferManager.c @@ -183,7 +183,7 @@ static void deinitTLS(void *tlsp) { stasis_buffer_concurrent_hash_t *ch = tls->bm->impl; Page * p = tls->p; - p->id = -1; + p->id = -2; while(hashtable_test_and_set(ch->ht,p->id, p)) { p->id --; } @@ -240,7 +240,12 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma || (!tls->p->dirty) )) { // The getStaleAndRemove was not atomic with the hashtable remove, which is OK (but we can't trust tmp anymore...) - assert(tmp == tls->p); + if(tmp != tls->p) { + int copy_count = hashtable_debug_number_of_key_copies(ch->ht, tmp->id); + assert(copy_count == 1); + assert(tmp == tls->p); + abort(); + } // note that we'd like to assert that the page is unpinned here. However, we can't simply look at p->queue, since another thread could be inside the "spooky" quote below. tmp = 0; if(tls->p->id >= 0) { @@ -444,7 +449,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_ for(pageid_t i = 0; i < stasis_buffer_manager_size; i++) { Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool); - stasis_buffer_pool_free_page(ch->buffer_pool, p,-1*i); + stasis_buffer_pool_free_page(ch->buffer_pool, p,(-1*i)-2); pageSetNode(p,0,0); (*pagePinCountPtr(p)) = 1; ch->lru->insert(ch->lru, p); // decrements pin count ptr (setting it to zero) diff --git a/src/stasis/concurrentHash.c b/src/stasis/concurrentHash.c index dd3e16f..abbc1f5 100644 --- a/src/stasis/concurrentHash.c +++ b/src/stasis/concurrentHash.c @@ -10,6 +10,8 @@ #include #include +//#define STASIS_HASHTABLE_FSCK_THREAD + struct bucket_t { pageid_t key; pthread_mutex_t mut; @@ -19,6 +21,10 @@ struct bucket_t { struct hashtable_t { bucket_t* buckets; pageid_t maxbucketid; +#ifdef STASIS_HASHTABLE_FSCK_THREAD + int is_open; + pthread_t fsck_thread; +#endif }; static inline pageid_t hashtable_wrap(hashtable_t *ht, pageid_t p) { @@ -55,6 +61,10 @@ static inline pageid_t hashtable_func(hashtable_t *ht, pageid_t key) { return hashtable_wrap(ht, hash6432shift(key)); } +#ifdef STASIS_HASHTABLE_FSCK_THREAD +void * hashtable_fsck_worker(void * htp); +#endif + hashtable_t * hashtable_init(pageid_t size) { pageid_t newsize = 1; for(int i = 0; size; i++) { @@ -65,22 +75,72 @@ hashtable_t * hashtable_init(pageid_t size) { ht->maxbucketid = (newsize) - 1; ht->buckets = calloc(ht->maxbucketid+1, sizeof(bucket_t)); + for(int i = 0; i <= ht->maxbucketid; i++) { + ht->buckets[i].key = -1; + } pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); for(pageid_t i = 0; i <= ht->maxbucketid; i++) { pthread_mutex_init(&(ht->buckets[i].mut), &attr); } - +#ifdef STASIS_HASHTABLE_FSCK_THREAD + ht->is_open = 1; + pthread_create(&ht->fsck_thread,0, hashtable_fsck_worker, ht); +#endif return ht; } void hashtable_deinit(hashtable_t * ht) { - for(pageid_t i = 0; i < ht->maxbucketid; i++) { +#ifdef STASIS_HASHTABLE_FSCK_THREAD + ht->is_open = 0; + pthread_join(ht->fsck_thread, 0); +#endif + for(pageid_t i = 0; i <= ht->maxbucketid; i++) { pthread_mutex_destroy(&ht->buckets[i].mut); } free(ht->buckets); free(ht); } + +int hashtable_debug_number_of_key_copies(hashtable_t *ht, pageid_t pageid) { + int count = 0; + for(int i = 0; i <= ht->maxbucketid; i++) { + if(ht->buckets[i].key == pageid) { count ++; } + } + if(count > 0) { fprintf(stderr, "%d copies of key %lld in hashtable!", count, (unsigned long long) pageid); } + return count; +} + +void hashtable_fsck(hashtable_t *ht) { + pthread_mutex_lock(&ht->buckets[0].mut); + for(int i = 1; i <= ht->maxbucketid; i++) { + pthread_mutex_lock(&ht->buckets[i].mut); + if(ht->buckets[i].key != -1) { + pageid_t this_hash_code = hashtable_func(ht, ht->buckets[i].key); + if(this_hash_code != i) { + assert(ht->buckets[i-1].key != -1); + assert(ht->buckets[i-1].val != 0); + assert(this_hash_code < i || (this_hash_code > i + (ht->maxbucketid/2))); + } + } else { + assert(ht->buckets[i].val == NULL); + } + pthread_mutex_unlock(&ht->buckets[i-1].mut); + } + pthread_mutex_lock(&ht->buckets[0].mut); + if(ht->buckets[0].key != -1) { + pageid_t this_hash_code = hashtable_func(ht, ht->buckets[0].key); + if(this_hash_code != 0) { + assert(ht->buckets[ht->maxbucketid].key != -1); + assert(ht->buckets[ht->maxbucketid].val != 0); + assert(this_hash_code < 0 || (this_hash_code > 0 + (ht->maxbucketid/2))); + } + } else { + assert(ht->buckets[ht->maxbucketid].val == NULL); + } + pthread_mutex_unlock(&ht->buckets[ht->maxbucketid].mut); + pthread_mutex_unlock(&ht->buckets[0].mut); +} typedef enum { LOOKUP, INSERT, @@ -89,6 +149,7 @@ typedef enum { } hashtable_mode; static inline void * hashtable_begin_op(hashtable_mode mode, hashtable_t *ht, pageid_t p, void *val, hashtable_bucket_handle_t *h) { static int warned = 0; + assert(p != -1); pageid_t idx = hashtable_func(ht, p); void * ret; bucket_t *b1 = &ht->buckets[idx], *b2 = NULL; @@ -105,8 +166,8 @@ static inline void * hashtable_begin_op(hashtable_mode mode, hashtable_t *ht, pa } assert(num_incrs < (ht->maxbucketid/4)); num_incrs++; - if(b1->key == p) { ret = b1->val; break; } - if(b1->val == NULL) { ret = NULL; break; } + if(b1->key == p) { assert(b1->val); ret = b1->val; break; } + if(b1->val == NULL) { assert(b1->key == -1); ret = NULL; break; } idx = hashtable_wrap(ht, idx+1); b2 = b1; b1 = &ht->buckets[idx]; @@ -120,6 +181,20 @@ static inline void * hashtable_begin_op(hashtable_mode mode, hashtable_t *ht, pa return ret; } + +#ifdef STASIS_HASHTABLE_FSCK_THREAD +void * hashtable_fsck_worker(void * htp) { + hashtable_t * ht = htp; + while(ht->is_open) { + fprintf(stderr, "Scanning hashtable %x", (unsigned int)ht); + sleep(1); + hashtable_fsck(ht); + } + return 0; +} +#endif + + void hashtable_end_op(hashtable_mode mode, hashtable_t *ht, void *val, hashtable_bucket_handle_t *h) { pageid_t idx = h->idx; bucket_t * b1 = h->b1; @@ -142,7 +217,8 @@ void hashtable_end_op(hashtable_mode mode, hashtable_t *ht, void *val, hashtable // Case 1: It is null, we win. if(b1->val == NULL) { // printf("d\n"); fflush(0); - b2->key = 0; + assert(b1->key == -1); + b2->key = -1; b2->val = NULL; break; } else { diff --git a/stasis/concurrentHash.h b/stasis/concurrentHash.h index 494e7c4..49db084 100644 --- a/stasis/concurrentHash.h +++ b/stasis/concurrentHash.h @@ -36,6 +36,11 @@ void * hashtable_remove_begin(hashtable_t *ht, pageid_t p, hashtable_bucket_hand void hashtable_remove_finish(hashtable_t *ht, hashtable_bucket_handle_t *h); void hashtable_remove_cancel(hashtable_t *ht, hashtable_bucket_handle_t *h); +/** + * @return -0 if key not found, 1 if the key exists, >1 if the hashtable is corrupt, and the key appears multiple times.. + */ +int hashtable_debug_number_of_key_copies(hashtable_t *ht, pageid_t pageied); + void hashtable_unlock(hashtable_bucket_handle_t *h); #endif /* CONCURRENTHASH_H_ */