Implemented recover_split(), which is needed by the linear hash to recover properly from crashes. It hasn't been tested at all, and is not currently called by anything.

This commit is contained in:
Sears Russell 2004-11-29 02:35:37 +00:00
parent 2c23fb8a43
commit 646f9dfca9
4 changed files with 114 additions and 9 deletions

View file

@ -11,11 +11,11 @@ filter.file.ignore.hidden=0
filter.dir.ignore.hidden=0 filter.dir.ignore.hidden=0
[filenumbers] [filenumbers]
0=386 0=43
1=28 1=33
2=340 2=340
3=229 3=1
4=14 4=312
5=510 5=510
6=71 6=71
7=1 7=1

View file

@ -30,5 +30,5 @@ int ThashOpen(int xid, recordid hashRid, int keySize, int valSize);
int ThashClose(int xid, recordid hashRid) ; int ThashClose(int xid, recordid hashRid) ;
void lockBucket(int bucket); void lockBucket(int bucket);
void unlockBucket(int bucket); void unlockBucket(int bucket);
int lockBucketForKey(byte * key, int keySize, recordid * headerRidB); int lockBucketForKey(const byte * key, int keySize, recordid * headerRidB);
#endif #endif

View file

@ -216,10 +216,10 @@ if(mycount <= 0 && !(mycount * -1) % FF_AM) { */
next_split = 2; next_split = 2;
} }
lockBucket(&next_split); lockBucket(next_split);
int other_bucket = next_split + twoToThe(i-1); int other_bucket = next_split + twoToThe(i-1);
lockBucket(&other_bucket); lockBucket(other_bucket);
pthread_mutex_unlock(&linearHashMutex); pthread_mutex_unlock(&linearHashMutex);
@ -304,8 +304,106 @@ void instant_update_hash_header(int xid, recordid hash, int i, int next_split) {
@todo Actually implement recovery for linearHash. @todo Actually implement recovery for linearHash.
*/ */
/**
If a bucket split may have been interrupted, call this function to restore
logical consistency to the buckets' linked lists. Then, call instant_rehash
to complete the split.
@todo Need to test recover_split, and figure out where to call it!
*/
static void recover_split(int xid, recordid hashRid, int i, int next_split, int keySize, int valSize) {
// This function would be simple, except for the ridiculous mount
// of state that it must maintain. See above for a description of what it does.
recordid * headerRidB;
hashRid.slot = 1;
Tread(xid, hashRid, headerRidB);
hashRid.slot = 0;
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = -1;
if(headerHashBits <= i && headerNextSplit <= next_split) {
// The split has not been completed. First, look for a duplicate entry in
// the two linked lists.
pblHashTable_t * firstList = pblHtCreate();
recordid * next = malloc(hashRid.size);
recordid current_rid;
recordid last_rid;
last_rid = NULLRID;
current_rid = hashRid;
Tread(xid, ba, next);
while(next->size) { // size = 0 -> nothing in bucket. size = -1 -> last bucket.
// Add an entry to the hash table:
// key -> recordid of entry pointing to current entry
byte * val = malloc(sizeof(recordid));
memcpy(val, &last_rid, sizeof(recordid));
pblHtInsert(firstList, next+1, keySize, val);
if(next->size == -1)
break;
last_rid = current_rid;
current_rid = *next;
Tread(xid, current_rid, next);
}
// firstList now contains a copy of each entry in the first list.
// Find any duplicates.
Tread(xid, bb, next);
int foundDup = 0;
recordid lastRead = bb;
recordid * A;
while(next->size) {
if(NULL != (A = pblHtLookup(firstList, next+1, keySize))) {
foundDup = 1;
break;
}
if(next->size == -1)
break;
lastRead = *next;
Tread(xid, *next, next);
}
if(foundDup) {
long new_hash = hash(next+1, keySize, i, ULONG_MAX) + 2;
if(new_hash == next_split) {
// set B->next = 0
*next = NULLRID;
TinstantSet(xid, lastRead, next);
} else {
// set A->next = C
if(A->size != 0 && A->size != -1) {
hashEntry * A_contents = malloc(hashRid.size);
Tread(xid, *A, A_contents);
A_contents->next = *next;
TinstantSet(xid, *A, A_contents);
} else {
// B is a bucket. :o
void * C_contents = malloc(next->size);
// Copy the next item in the list into the bucket (C)
Tread(xid, *next, C_contents);
TinstantSet(xid, bb, C_contents);
// And free the record used by C
Tdealloc(xid, *next);
free(C_contents);
}
// set B->next = 0 We don't care if it's a bucket.
*next = NULLRID;
TinstantSet(xid, lastRead, next);
}
}
}
}
void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) { void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) {
int firstA = 1; // Is 'A' the recordid of a bucket? int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'? int firstD = 1; // What about 'D'?
@ -340,6 +438,13 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz
fflush(NULL); */ fflush(NULL); */
return; return;
} }
while(D_contents->next.size) {
firstD = 0;
if(D_contents->next.size != -1) {
D = D_contents->next;
TreadUnlocked(xid, D, D_contents);
}
}
int old_hash; int old_hash;
int new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; int new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2;

View file

@ -40,7 +40,7 @@ void lockBucket(int bucket) {
pblHtInsert(lockedBuckets, &bucket, sizeof(int), (void*)1); pblHtInsert(lockedBuckets, &bucket, sizeof(int), (void*)1);
} }
int lockBucketForKey(byte * key, int keySize, recordid * headerRidB) { int lockBucketForKey(const byte * key, int keySize, recordid * headerRidB) {
int bucket = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2; int bucket = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
while(pblHtLookup(lockedBuckets, &bucket, sizeof(int))) { while(pblHtLookup(lockedBuckets, &bucket, sizeof(int))) {