Lots of optimizations; re-did on-disk layout for the linear hash table.

This commit is contained in:
Sears Russell 2004-10-19 21:16:37 +00:00
parent 6b265f28b0
commit ec99c0e258
22 changed files with 816 additions and 171 deletions

View file

@ -24,23 +24,23 @@ int main(int argc, char** argv) {
recordid hash = ThashAlloc(xid, sizeof(int), sizeof(int));
Tcommit(xid);
int i = 0;
for(k = 0; k < xact_count; k++) {
xid = Tbegin();
for(;i < count *(k+1) ; i++) {
TlogicalHashInsert(xid, hash, &i, sizeof(int), &i, sizeof(int));
}
Tcommit(xid);
}
int i;
for(k = 0; k < xact_count; k++) {
xid = Tbegin();
for(i = 0; i < count ; i++) {
TlogicalHashInsert(xid, hash, &i, sizeof(int), &i, sizeof(int));
}
Tcommit(xid);
}
/* Tdeinit(); */
/* Tdeinit(); */
}

View file

@ -32,6 +32,6 @@ int main(int argc, char** argv) {
Tcommit(xid);
Tdeinit();
/* Tdeinit(); */
}

View file

@ -35,7 +35,7 @@ int main(int argc, char** argv) {
xid = Tbegin();
ThashOpen(xid, hash);
ThashOpen(xid, hash, sizeof(int), sizeof(int));
int k;
for(k = 0; k < 10; k++) {

View file

@ -103,7 +103,7 @@ extern int errno;
/*#define DEBUGGING */
/*#define PROFILE_LATCHES*/
#define NO_LATCHES
/*#define NO_LATCHES */
#ifdef DEBUGGING
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */

View file

@ -25,6 +25,7 @@ Operation getUnInitPage();
#define TunInitPage(xid, rid) Tupdate(xid, rid, NULL, OPERATION_UNINITIALIZE_PAGE)
recordid dereferenceArrayListRid(Page * p, int offset);
#define dereferenceArrayListRidUnlocked(x, y) dereferenceArrayListRid((x),(y))
int TarrayListExtend(int xid, recordid rid, int slots);
int TarrayListInstantExtend(int xid, recordid rid, int slots);
#endif

View file

@ -19,7 +19,7 @@ void ThashInstantInsert(int xid, recordid hashRid,
const void * key, int keySize,
const void * val, int valSize);
void ThashInstantDelete(int xid, recordid hashRid,
const void * key, int keySize);
const void * key, int keySize, int valSize);
void ThashInstantUpdate(int xid, recordid hashRid, const void * key, int keySize, const void * val, int valSize);
void TlogicalHashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize);
void TlogicalHashInsert(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize);

View file

@ -18,12 +18,14 @@ recordid ThashAlloc(int xid, int keySize, int valSize) ;
void ThashInsert(int xid, recordid hashRid,
void * key, int keySize,
void * val, int valSize);
void ThashDelete(int xid, recordid hashRid,
void * key, int keySize);
/*void ThashDelete(int xid, recordid hashRid,
void * key, int keySize);*/
int ThashDelete(int xid, recordid hashRid,
void * key, int keySize, int valSize);
void ThashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize);
int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize);
void ThashInit();
void ThashDeinit();
int ThashOpen(int xid, recordid hashRid);
int ThashOpen(int xid, recordid hashRid, int keySize, int valSize);
int ThashClose(int xid, recordid hashRid) ;
#endif

View file

@ -163,6 +163,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op);
* @param dat buffer into which data goes
*/
void Tread(int xid, recordid rid, void *dat);
void TreadUnlocked(int xid, recordid rid, void *dat);
/**
* @param xid transaction ID

View file

@ -39,6 +39,9 @@ authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#define _XOPEN_SOURCE 600
#include <config.h>
#include <lladd/common.h>
@ -49,6 +52,7 @@ terms specified in this license.
#include "io.h"
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <lladd/bufferManager.h>
@ -127,7 +131,9 @@ pthread_mutex_t truncateLog_mutex;
static int sought = 1;
int openLogWriter() {
#define BUFSIZE 1024*16
char * buffer = malloc(BUFSIZE);
char * buffer ;/*= malloc(BUFSIZE);*/
assert(!posix_memalign((void*)&(buffer), PAGE_SIZE, BUFSIZE));
log = fopen(LOG_FILE, "a+");
if (log==NULL) {

View file

@ -228,7 +228,7 @@ int TarrayListInstantExtend(int xid, recordid rid, int slots) {
tmp.slot = MAX_OFFSET_POSITION;
int newMaxOffset = tlp.maxOffset+slots;
/** @todo CORRECTNESS BUG: From recovery's point of view, this is totally wrong! The
/** @todo CORRECTNESS BUG: From recovery's point of view, arrayList is totally wrong! The
only reason we mess with p is beacuse TinstantSet doesn't handle
ARRAY_LIST_PAGES the way we need it to, so this won't be hard to
fix... */
@ -277,7 +277,7 @@ Operation getUnInitPage() {
/*----------------------------------------------------------------------------*/
/** @todo locking for arrayLists */
recordid dereferenceArrayListRid(Page * p, int offset) {
TarrayListParameters tlp = pageToTLP(p);

View file

@ -48,13 +48,31 @@ terms specified in this license.
#include <lladd/operations/instantSet.h>
#include <lladd/operations/set.h>
#include "../page.h"
#include "../page/fixed.h"
static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
// writeRecord(xid, p, lsn, rid, dat);
writeRecordUnlocked(xid, p, lsn, rid, dat);
return 0;
}
Operation getInstantSet() {
Operation o = {
OPERATION_INSTANT_SET, /* id */
SIZEOF_RECORD, /* use the size of the record as size of arg */
OPERATION_NOOP,
&operate /* Function */
};
return o;
}
/** @todo The spirit of instantSet suggests that it should hold a
shorter write lock than set, but instant set was written before
the lock manager... */
Operation getInstantSet() {
/*Operation getInstantSet() {
Operation o = getSet();
o.id = OPERATION_INSTANT_SET;
o.undo = OPERATION_NOOP;
return o;
}
}*/

View file

@ -13,8 +13,8 @@
#define BUCKETS_OFFSET (2)
#define headerKeySize (headerRidA.page)
#define headerValSize (headerRidA.slot)
#define headerKeySize (headerRidA->page)
#define headerValSize (headerRidA->slot)
#define headerHashBits (headerRidB->page)
#define headerNextSplit (headerRidB->slot)
@ -36,15 +36,18 @@ extern pblHashTable_t * openHashes ;
/*pblHashTable_t * openHashes = NULL; */
static int operateUndoInsert(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
int keySize = rid.size;
int valSize = rid.slot;
rid.slot = 0;
rid.size = sizeof(recordid);
if(!pblHtLookup(openHashes, &rid.page, sizeof(int))) {
ThashOpen(xid, rid);
abort();
/* ThashOpen(xid, rid); */
}
ThashInstantDelete(xid, rid, dat, keySize);
ThashInstantDelete(xid, rid, dat, keySize, valSize);
return 0;
}
@ -62,8 +65,8 @@ static int operateUndoDelete(int xid, Page * p, lsn_t lsn, recordid rid, const v
rid.slot = 0;
/* Tread(xid, dereferenceArrayListRid(p, rid.slot), &headerRidA); */
/* Tread(xid, rid, &headerRidA); */
/* TreadUnlocked(xid, dereferenceArrayListRid(p, rid.slot), &headerRidA); */
/* TreadUnlocked(xid, rid, &headerRidA); */
assert(keySize == sizeof(int));
assert(valSize == sizeof(recordid));
@ -122,11 +125,12 @@ void TlogicalHashInsert(int xid, recordid hashRid, void * key, int keySize, void
assert(headerRidB);
hashRid.slot = valSize;
hashRid.size = keySize;
Tupdate(xid, hashRid, key, OPERATION_LINEAR_INSERT);
/* Perform redo-only insert. */
hashRid.size = sizeof(recordid);
hashRid.size = sizeof(hashEntry) + keySize + valSize;
ThashInstantInsert(xid, hashRid, key, keySize, val, valSize);
instant_expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize);
@ -144,7 +148,7 @@ int TlogicalHashDelete(int xid, recordid hashRid, void * key, int keySize, void
Tupdate(xid, hashRid, arg, OPERATION_LINEAR_DELETE);
free(arg);
hashRid.size = sizeof(recordid);
ThashInstantDelete(xid, hashRid, key, keySize);
ThashInstantDelete(xid, hashRid, key, keySize, valSize);
return 1;
} else {
return 0;
@ -155,8 +159,10 @@ int TlogicalHashDelete(int xid, recordid hashRid, void * key, int keySize, void
void instant_rehash(int xid, recordid hash, int next_split, int i, int keySize, int valSize);
void instant_update_hash_header(int xid, recordid hash, int i, int next_split);
int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_rid, const void * key, int keySize, recordid * deletedEntry);
void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_rid, hashEntry * e, int keySize, int valSize, recordid deletedEntry, int skipDelete);
int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
const void * key, int keySize, int valSize, recordid * deletedEntry);
void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete);
int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize);
@ -167,7 +173,7 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
recordid nextEntry;
bucket.slot = bucket_number;
Tread(xid, bucket, &nextEntry);
TreadUnlocked(xid, bucket, &nextEntry);
if(nextEntry.size) {
assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize);
@ -176,7 +182,7 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
int found = 0;
while(nextEntry.size > 0) {
Tread(xid, nextEntry, e);
TreadUnlocked(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize)) {
memcpy(val, ((byte*)(e+1))+keySize, valSize);
found = 1;
@ -188,7 +194,7 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
return found;
}*/
int extendCount = 0;
/*int extendCount = 0;
void instant_expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
extendCount ++;
if(extendCount >= 70) {
@ -206,18 +212,43 @@ void instant_expand (int xid, recordid hash, int next_split, int i, int keySize,
extendCount = 0;
}
}*/
void instant_expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
/* Total hack; need to do this better, by storing stuff in the hash table headers.*/
static int count = 4096 * .25;
count --;
#define AMORTIZE 1000
#define FF_AM 750
if(count <= 0 && !(count * -1) % FF_AM) {
recordid * headerRidB = pblHtLookup(openHashes, &(hash.page), sizeof(int));
int j;
TarrayListInstantExtend(xid, hash, AMORTIZE);
for(j = 0; j < AMORTIZE; j++) {
if(next_split >= twoToThe(i-1)+2) {
i++;
next_split = 2;
}
instant_rehash(xid, hash, next_split, i, keySize, valSize);
next_split++;
headerNextSplit = next_split;
headerHashBits = i;
}
instant_update_hash_header(xid, hash, i, next_split);
}
}
void instant_update_hash_header(int xid, recordid hash, int i, int next_split) {
recordid * headerRidB = pblHtLookup(openHashes, &hash.page, sizeof(int));
headerHashBits = i;
headerNextSplit = next_split;
hash.slot = 1;
TinstantSet(xid, hash, headerRidB);
}
void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) {
//void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) {
/* Picture of what's going on:
@ -272,8 +303,8 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz
/** @todo Actually implement recovery for linearHash. */
int firstA = 1; /* Is 'A' the recordid of a bucket? */
int firstD = 1; /* What about 'D'? */
/* int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
assert(hashRid.size == sizeof(recordid));
@ -281,12 +312,12 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = 0;
recordid ba_contents; Tread(xid, ba, &ba_contents);
recordid ba_contents; TreadUnlocked(xid, ba, &ba_contents);
recordid bb_contents = NULLRID;
TinstantSet(xid, bb, &bb_contents);/*Tread(xid, bb, &bb_contents);*/
TinstantSet(xid, bb, &bb_contents);//TreadUnlocked(xid, bb, &bb_contents);
recordid A = ba; /*ba_contents; */
recordid D = bb; /*bb_contents; */
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid B = ba_contents;
recordid C;
@ -297,7 +328,7 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz
while(B.size) {
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
TreadUnlocked(xid, B, B_contents);
C = B_contents->next;
int old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2;
@ -313,15 +344,15 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz
firstA = 0;
} else {
if(firstD) {
/* D is a bucket entry */
// D is a bucket entry
assert(B.size == sizeof(hashEntry) + keySize + valSize);
assert(D.size == sizeof(recordid));
TinstantSet(xid, D, &B);
} else {
/* D is the tail of our list. */
// D is the tail of our list.
assert(D.size == sizeof(hashEntry) + keySize + valSize);
assert(B.size == 0 || B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, D, D_contents);
TreadUnlocked(xid, D, D_contents);
D_contents->next = B;
TinstantSet(xid, D, D_contents);
}
@ -331,23 +362,191 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz
assert(A.size == sizeof(recordid));
TinstantSet(xid, A, &C);
} else {
/* A is somewhere in the first list. */
// A is somewhere in the first list.
assert(A.size == sizeof(hashEntry) + keySize + valSize);
assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, A, A_contents);
TreadUnlocked(xid, A, A_contents);
A_contents->next = C;
TinstantSet(xid, A, A_contents);
}
/* B _can't_ be a bucket.*/
// B _can't_ be a bucket.
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
TreadUnlocked(xid, B, B_contents);
B_contents->next = NULLRID;
TinstantSet(xid, B, B_contents);
/* Update Loop State */
// Update Loop State
D = B;
B = C;
C.size = -1;
firstD = 0;
}
}
free(D_contents);
free(A_contents);
free(B_contents);
} */
void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) {
int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
/* assert(hashRid.size == sizeof(recordid)); */
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = -1;
// recordid ba_contents; TreadUnlocked(xid, ba, &ba_contents);
// recordid bb_contents = NULLRID;
/* Tset(xid, bb, &bb_contents); */ //TreadUnlocked(xid, bb, &bb_contents);
hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
TreadUnlocked(xid, ba, A_contents);
TreadUnlocked(xid, bb, D_contents);
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid B = A_contents->next;
recordid C;
if(!A_contents->next.size) {
/* Bucket A is empty, so we're done. */
free(D_contents);
free(A_contents);
free(B_contents);
/* printf("Expand was a noop.\n");
fflush(NULL); */
return;
}
int old_hash;
int new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2;
while(new_hash != next_split) {
// Need a record in A that belongs in the first bucket...
recordid oldANext = A_contents->next;
A_contents->next = NULLRID;
if(firstD) {
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
TinstantSet(xid, D, A_contents);
firstD = 0;
} else {
/* D at end of list => can overwrite next. */
D_contents->next = Talloc(xid, sizeof(hashEntry) + keySize + valSize); /* @todo
unfortunate
to
dealloc
A's
successor,
then
alloc.. */
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
TinstantSet(xid, D_contents->next, A_contents);
// assert(memcmp(&D, &D_contents->next, sizeof(recordid)));
TinstantSet(xid, D, D_contents);
D = A;
}
hashEntry * swap = D_contents;
D_contents = A_contents;
A_contents = swap;
/* A_contents is now garbage. */
assert(A.size == sizeof(hashEntry) + keySize + valSize);
if(oldANext.size == -1) {
memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
TinstantSet(xid, A, A_contents);
free(D_contents);
free(A_contents);
free(B_contents);
/* printf("Loop 1 returning.\n");
fflush(NULL); */
return;
}
assert(oldANext.size == sizeof(hashEntry) + keySize + valSize);
TreadUnlocked(xid, oldANext, A_contents);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
TinstantSet(xid, A, A_contents);
Tdealloc(xid, oldANext);
new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2;
}
/* printf("Got past loop 1\n");
fflush(NULL); */
B = A_contents->next;
while(B.size != -1) {
assert(B.size == sizeof(hashEntry) + keySize + valSize);
TreadUnlocked(xid, B, B_contents);
C = B_contents->next;
old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2;
new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2;
assert(next_split == old_hash);
assert(new_hash == old_hash || new_hash == old_hash + twoToThe(i-1));
if(new_hash == old_hash) {
A = B;
B = C;
C.size = -1;
firstA = 0;
} else {
/* if(firstD) {
// D is a bucket entry
assert(B.size == sizeof(hashEntry) + keySize + valSize);
assert(D.size == sizeof(recordid));
Tset(xid, D, &B);
} else { */
// D is the tail of our list.
assert(D.size == sizeof(hashEntry) + keySize + valSize);
assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize);
TreadUnlocked(xid, D, D_contents);
D_contents->next = B;
assert(B.size != 0);
// assert(memcmp(&D, &D_contents->next, sizeof(recordid)));
TinstantSet(xid, D, D_contents);
/* } */
/* if(firstA) {
assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize);
assert(A.size == sizeof(recordid));
Tset(xid, A, &C);
} else { */
// A is somewhere in the first list.
assert(A.size == sizeof(hashEntry) + keySize + valSize);
assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize);
TreadUnlocked(xid, A, A_contents);
A_contents->next = C;
assert(C.size != 0);
// assert(memcmp(&A, &A_contents->next, sizeof(recordid)));
TinstantSet(xid, A, A_contents);
/* } */
// B _can't_ be a bucket.
assert(B.size == sizeof(hashEntry) + keySize + valSize);
TreadUnlocked(xid, B, B_contents);
B_contents->next = NULLRID;
TinstantSet(xid, B, B_contents);
// Update Loop State
D = B;
B = C;
C.size = -1;
@ -359,21 +558,123 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz
free(B_contents);
}
/** @todo fine grained locking for insertIntoBucket and the other operations in this file. */
void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) {
assert(newEntry.size == (sizeof(hashEntry) + keySize + valSize));
void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete) {
recordid deleteMe;
if(!skipDelete) {
if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) {
/* Tdealloc(xid, deleteMe); */
hashRid.slot = bucket_number;
Tread(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, valSize, &deleteMe)) {
if(deleteMe.size) {
Tdealloc(xid, deleteMe);
hashRid.slot = bucket_number;
assert(hashRid.size == sizeof(hashEntry) + valSize + keySize);
TreadUnlocked(xid, hashRid, bucket_contents);
hashRid.slot = 0;
}
}
}
/*@todo consider recovery for insertIntoBucket. */
hashRid.slot = bucket_number;
assert(hashRid.size == sizeof(hashEntry) + valSize + keySize);
TreadUnlocked(xid, hashRid, bucket_contents);
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
if(!bucket_contents->next.size) { // Size = 0 -> nothing in bucket. Size != 0 -> bucket occupied.
e->next.page = 0;
e->next.slot = 0;
e->next.size = -1;
TinstantSet(xid, hashRid, e);
} else {
recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize);
e->next = bucket_contents->next;
bucket_contents->next = newEntry;
assert(newEntry.size == sizeof(hashEntry) + keySize + valSize);
TinstantSet(xid, newEntry, e);
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
TinstantSet(xid, hashRid, bucket_contents);
}
}
int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
const void * key, int keySize, int valSize, recordid * deletedEntry) {
if(bucket_contents->next.size == 0) { return 0; }
recordid this = hash;
this.slot = bucket_number;
int found = 0;
if(!memcmp(bucket_contents+1, key, keySize)) {
if(deletedEntry)
deletedEntry->size = 0; /* size = 0 -> don't delete (this is a bucket!) */
if(bucket_contents->next.size == -1) {
memset(bucket_contents, 0, sizeof(hashEntry) + keySize + valSize);
TinstantSet(xid, this, bucket_contents);
} else {
assert(bucket_contents->next.size == sizeof(hashEntry) + keySize + valSize);
recordid oldNext = bucket_contents->next;
TreadUnlocked(xid, bucket_contents->next, bucket_contents);
TinstantSet(xid, this, bucket_contents);
*deletedEntry = oldNext; /* @todo delete from bucket really should do its own deallocation.. */
}
return 1;
}
if(bucket_contents->next.size == -1) { return 0; }
hashEntry * A = malloc(sizeof(hashEntry) + keySize + valSize);
hashEntry * B = malloc(sizeof(hashEntry) + keySize + valSize);
recordid Aaddr, Baddr;
memcpy(B, bucket_contents, sizeof(hashEntry) + keySize + valSize);
Baddr = this;
while(B->next.size != -1) {
hashEntry * tmp = A;
A = B;
Aaddr = Baddr;
B = tmp;
assert(A->next.size == sizeof(hashEntry) + keySize + valSize);
Baddr = A->next;
TreadUnlocked(xid, Baddr, B);
if(!memcmp(B+1, key, keySize)) {
A->next = B->next;
assert(Aaddr.size == sizeof(hashEntry) + keySize + valSize);
TinstantSet(xid, Aaddr, A);
if(deletedEntry) {
*deletedEntry = Baddr;
}
found = 1;
break;
}
}
free(A);
free(B);
return found;
}
/** @todo fine grained locking for insertIntoBucket and the other operations in this file. */
/*void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) {
assert(newEntry.size == (sizeof(hashEntry) + keySize + valSize));
recordid deleteMe;
if(!skipDelete) {
if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) {
//Tdealloc(xid, deleteMe);
hashRid.slot = bucket_number;
TreadUnlocked(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
}
}
//@todo consider recovery for insertIntoBucket.
recordid bucket = hashRid;
bucket.slot = bucket_number;
assert(bucket_contents.size == 0 || bucket_contents.size == sizeof(hashEntry) + keySize + valSize);
@ -400,14 +701,14 @@ int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, recordid
int found = 0;
recordid lastEntry;
while(nextEntry.size > 0) {
Tread(xid, nextEntry, e);
TreadUnlocked(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize)) {
if(first) {
assert(e->next.size < 40);
TinstantSet(xid, bucket, &(e->next));
} else {
recordid next = e->next;
Tread(xid, lastEntry, e);
TreadUnlocked(xid, lastEntry, e);
assert(next.size < 40);
e->next = next;
TinstantSet(xid, lastEntry, e);
@ -423,7 +724,7 @@ int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, recordid
free(e);
return found;
}
*/
/* Same as normal implementation, so commented it out.
/ *
recordid ThashAlloc(int xid, int keySize, int valSize) {
@ -475,13 +776,13 @@ void ThashInstantInsert(int xid, recordid hashRid,
memcpy(((byte*)(e+1)) + keySize, val, valSize);
/** @todo instantAlloc / instantDealloc */
recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize);
recordid bucket_contents;
/* recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize); */
hashEntry * bucket_contents = malloc(sizeof(hashEntry) + keySize + valSize);
hashRid.slot = bucket;
Tread(xid, hashRid, &bucket_contents);
TreadUnlocked(xid, hashRid, bucket_contents);
hashRid.slot = 0;
instant_insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, newEntry, 0);
instant_insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, 0);
free(e);
@ -489,7 +790,7 @@ void ThashInstantInsert(int xid, recordid hashRid,
/** @todo hash hable probably should track the number of items in it,
so that expand can be selectively called. */
void ThashInstantDelete(int xid, recordid hashRid,
const void * key, int keySize) {
const void * key, int keySize, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
recordid tmp = hashRid;
tmp.slot = 1;
@ -497,10 +798,10 @@ void ThashInstantDelete(int xid, recordid hashRid,
int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
recordid deleteMe;
hashRid.slot = bucket_number;
recordid bucket_contents;
Tread(xid, hashRid, &bucket_contents);
hashEntry * bucket_contents = malloc(sizeof(hashEntry) + keySize + valSize);
TreadUnlocked(xid, hashRid, bucket_contents);
hashRid.slot = 0;
if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, &deleteMe)) {
if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, valSize, &deleteMe)) {
/* Tdealloc(xid, deleteMe); */
}
}
@ -508,7 +809,7 @@ void ThashInstantDelete(int xid, recordid hashRid,
/*int ThashOpen(int xid, recordid hashRid) {
recordid * headerRidB = malloc(sizeof(recordid));
hashRid.slot = 1;
Tread(xid, hashRid, headerRidB);
TreadUnlocked(xid, hashRid, headerRidB);
pblHtInsert(openHashes, &hashRid.page, sizeof(int), headerRidB);

View file

@ -11,8 +11,8 @@
#define BUCKETS_OFFSET (2)
#define headerKeySize (headerRidA.page)
#define headerValSize (headerRidA.slot)
#define headerKeySize (headerRidA->page)
#define headerValSize (headerRidA->slot)
#define headerHashBits (headerRidB->page)
#define headerNextSplit (headerRidB->slot)
@ -33,8 +33,10 @@ pblHashTable_t * openHashes = NULL;
void rehash(int xid, recordid hash, int next_split, int i, int keySize, int valSize);
void update_hash_header(int xid, recordid hash, int i, int next_split);
int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_rid, void * key, int keySize, recordid * deletedEntry);
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_rid, hashEntry * e, int keySize, int valSize, recordid deletedEntry, int skipDelete);
int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
void * key, int keySize, int valSize, recordid * deletedEntry);
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete);
int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize);
@ -45,14 +47,15 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
recordid nextEntry;
hashRid.slot = bucket_number;
Tread(xid, hashRid, &nextEntry);
/* Tread(xid, hashRid, &nextEntry); */
nextEntry = hashRid;
int found = 0;
while(nextEntry.size > 0) {
while(nextEntry.size != -1 && nextEntry.size != 0) {
assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize)) {
if(!memcmp(key, e+1, keySize) && e->next.size != 0) {
memcpy(val, ((byte*)(e+1))+keySize, valSize);
found = 1;
break;
@ -63,23 +66,42 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
return found;
}
void expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
TarrayListExtend(xid, hash, 1);
if(next_split >= twoToThe(i-1)+2) {
i++;
next_split = 2;
/* Total hack; need to do this better, by storing stuff in the hash table headers.*/
static int count = 4096 * .25;
count --;
#define AMORTIZE 1000
#define FF_AM 750
if(count <= 0 && !(count * -1) % FF_AM) {
recordid * headerRidB = pblHtLookup(openHashes, &(hash.page), sizeof(int));
int j;
TarrayListExtend(xid, hash, AMORTIZE);
for(j = 0; j < AMORTIZE; j++) {
if(next_split >= twoToThe(i-1)+2) {
i++;
next_split = 2;
}
rehash(xid, hash, next_split, i, keySize, valSize);
next_split++;
headerNextSplit = next_split;
headerHashBits = i;
}
update_hash_header(xid, hash, i, next_split);
}
rehash(xid, hash, next_split, i, keySize, valSize);
next_split++;
update_hash_header(xid, hash, i, next_split);
}
void update_hash_header(int xid, recordid hash, int i, int next_split) {
recordid * headerRidB = pblHtLookup(openHashes, &hash.page, sizeof(int));
hashEntry * he = pblHtLookup(openHashes, &(hash.page), sizeof(int));
assert(he);
recordid * headerRidB = &he->next;
assert(headerRidB);
headerHashBits = i;
headerNextSplit = next_split;
hash.slot = 1;
Tset(xid, hash, headerRidB);
}
@ -134,33 +156,105 @@ void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int v
int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
assert(hashRid.size == sizeof(recordid));
/* assert(hashRid.size == sizeof(recordid)); */
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = 0;
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = -1;
recordid ba_contents; Tread(xid, ba, &ba_contents);
recordid bb_contents = NULLRID;
Tset(xid, bb, &bb_contents);//Tread(xid, bb, &bb_contents);
// recordid ba_contents; Tread(xid, ba, &ba_contents);
// recordid bb_contents = NULLRID;
/* Tset(xid, bb, &bb_contents); */ //Tread(xid, bb, &bb_contents);
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid B = ba_contents;
recordid C;
hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
while(B.size) {
Tread(xid, ba, A_contents);
Tread(xid, bb, D_contents);
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid B = A_contents->next;
recordid C;
if(!A_contents->next.size) {
/* Bucket A is empty, so we're done. */
free(D_contents);
free(A_contents);
free(B_contents);
/* printf("Expand was a noop.\n");
fflush(NULL); */
return;
}
int old_hash;
int new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2;
while(new_hash != next_split) {
// Need a record in A that belongs in the first bucket...
recordid oldANext = A_contents->next;
A_contents->next = NULLRID;
if(firstD) {
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
Tset(xid, D, A_contents);
firstD = 0;
} else {
/* D at end of list => can overwrite next. */
D_contents->next = Talloc(xid, sizeof(hashEntry) + keySize + valSize); /* @todo
unfortunate
to
dealloc
A's
successor,
then
alloc.. */
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
Tset(xid, D_contents->next, A_contents);
// assert(memcmp(&D, &D_contents->next, sizeof(recordid)));
Tset(xid, D, D_contents);
D = A;
}
hashEntry * swap = D_contents;
D_contents = A_contents;
A_contents = swap;
/* A_contents is now garbage. */
assert(A.size == sizeof(hashEntry) + keySize + valSize);
if(oldANext.size == -1) {
memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
Tset(xid, A, A_contents);
free(D_contents);
free(A_contents);
free(B_contents);
/* printf("Loop 1 returning.\n");
fflush(NULL); */
return;
}
assert(oldANext.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, oldANext, A_contents);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
Tset(xid, A, A_contents);
Tdealloc(xid, oldANext);
new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2;
}
/* printf("Got past loop 1\n");
fflush(NULL); */
B = A_contents->next;
while(B.size != -1) {
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
C = B_contents->next;
int old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2;
int new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2;
old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2;
new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2;
assert(next_split == old_hash);
assert(new_hash == old_hash || new_hash == old_hash + twoToThe(i-1));
@ -171,32 +265,39 @@ void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int v
C.size = -1;
firstA = 0;
} else {
if(firstD) {
/* if(firstD) {
// D is a bucket entry
assert(B.size == sizeof(hashEntry) + keySize + valSize);
assert(D.size == sizeof(recordid));
Tset(xid, D, &B);
} else {
} else { */
// D is the tail of our list.
assert(D.size == sizeof(hashEntry) + keySize + valSize);
assert(B.size == 0 || B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, D, D_contents);
assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, D, D_contents);
D_contents->next = B;
assert(B.size != 0);
// assert(memcmp(&D, &D_contents->next, sizeof(recordid)));
Tset(xid, D, D_contents);
}
/* } */
if(firstA) {
/* if(firstA) {
assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize);
assert(A.size == sizeof(recordid));
Tset(xid, A, &C);
} else {
} else { */
// A is somewhere in the first list.
assert(A.size == sizeof(hashEntry) + keySize + valSize);
assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize);
assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, A, A_contents);
A_contents->next = C;
assert(C.size != 0);
// assert(memcmp(&A, &A_contents->next, sizeof(recordid)));
Tset(xid, A, A_contents);
}
/* } */
// B _can't_ be a bucket.
@ -218,29 +319,111 @@ void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int v
free(B_contents);
}
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) {
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete) {
recordid deleteMe;
if(!skipDelete) {
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) {
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, valSize, &deleteMe)) {
Tdealloc(xid, deleteMe);
hashRid.slot = bucket_number;
Tread(xid, hashRid, &bucket_contents);
assert(hashRid.size == sizeof(hashEntry) + valSize + keySize);
Tread(xid, hashRid, bucket_contents);
hashRid.slot = 0;
}
}
/*@todo consider recovery for insertIntoBucket. */
hashRid.slot = bucket_number;
assert(hashRid.size == sizeof(hashEntry) + valSize + keySize);
Tread(xid, hashRid, bucket_contents);
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
if(!bucket_contents->next.size) { // Size = 0 -> nothing in bucket. Size != 0 -> bucket occupied.
e->next.page = 0;
e->next.slot = 0;
e->next.size = -1;
Tset(xid, hashRid, e);
} else {
recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize);
e->next = bucket_contents->next;
bucket_contents->next = newEntry;
assert(newEntry.size == sizeof(hashEntry) + keySize + valSize);
Tset(xid, newEntry, e);
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
Tset(xid, hashRid, bucket_contents);
}
/*
recordid bucket = hashRid;
bucket.slot = bucket_number;
e->next = bucket_contents;
Tset(xid, newEntry, e);
Tset(xid, bucket, &newEntry);
Tset(xid, bucket, &newEntry); */
}
int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_contents, void * key, int keySize, recordid * deletedEntry) {
hashEntry * e;
int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
void * key, int keySize, int valSize, recordid * deletedEntry) {
if(bucket_contents->next.size == 0) { return 0; }
recordid this = hash;
this.slot = bucket_number;
int found = 0;
if(!memcmp(bucket_contents+1, key, keySize)) {
if(deletedEntry)
deletedEntry->size = 0; /* size = 0 -> don't delete (this is a bucket!) */
if(bucket_contents->next.size == -1) {
memset(bucket_contents, 0, sizeof(hashEntry) + keySize + valSize);
Tset(xid, this, bucket_contents);
} else {
assert(bucket_contents->next.size == sizeof(hashEntry) + keySize + valSize);
recordid oldNext = bucket_contents->next;
Tread(xid, bucket_contents->next, bucket_contents);
Tset(xid, this, bucket_contents);
*deletedEntry = oldNext; /* @todo delete from bucket really should do its own deallocation.. */
}
return 1;
}
if(bucket_contents->next.size == -1) { return 0; }
hashEntry * A = malloc(sizeof(hashEntry) + keySize + valSize);
hashEntry * B = malloc(sizeof(hashEntry) + keySize + valSize);
recordid Aaddr, Baddr;
memcpy(B, bucket_contents, sizeof(hashEntry) + keySize + valSize);
Baddr = this;
while(B->next.size != -1) {
hashEntry * tmp = A;
A = B;
Aaddr = Baddr;
B = tmp;
assert(A->next.size == sizeof(hashEntry) + keySize + valSize);
Baddr = A->next;
Tread(xid, Baddr, B);
if(!memcmp(B+1, key, keySize)) {
A->next = B->next;
assert(Aaddr.size == sizeof(hashEntry) + keySize + valSize);
Tset(xid, Aaddr, A);
if(deletedEntry) {
*deletedEntry = Baddr;
}
found = 1;
break;
}
}
free(A);
free(B);
return found;
}
/* hashEntry * e;
recordid bucket = hash;
bucket.slot = bucket_number;
recordid nextEntry;
@ -275,16 +458,15 @@ int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_
first = 0;
nextEntry = e->next;
}
return found;
}
return found; */
recordid ThashAlloc(int xid, int keySize, int valSize) {
/* Want 16 buckets, doubling on overflow. */
recordid rid = TarrayListAlloc(xid, 4096, 2, sizeof(recordid));
recordid rid = TarrayListAlloc(xid, 4096, 2, sizeof(hashEntry) + keySize + valSize);
TarrayListExtend(xid, rid, 4096+2);
recordid headerRidA;
recordid * headerRidB = malloc (sizeof(recordid));
recordid * headerRidA = calloc (1, sizeof(recordid) + keySize + valSize);
recordid * headerRidB = calloc (1, sizeof(recordid) + keySize + valSize);
headerKeySize = keySize;
headerValSize = valSize;
@ -295,9 +477,11 @@ recordid ThashAlloc(int xid, int keySize, int valSize) {
rid.slot =0;
Tset(xid, rid, &headerRidA);
rid.slot =1;
Tset(xid, rid, headerRidB);
Tset(xid, rid, &headerRidB);
pblHtInsert(openHashes, &rid.page, sizeof(int), headerRidB);
pblHtInsert(openHashes, &(rid.page), sizeof(int), headerRidB);
free(headerRidA);
rid.slot =0;
return rid;
@ -315,7 +499,9 @@ void ThashInsert(int xid, recordid hashRid,
void * key, int keySize,
void * val, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(int));
/* printf("header: %d %d\n", headerHashBits, headerNextSplit); */
int bucket = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
@ -323,67 +509,92 @@ void ThashInsert(int xid, recordid hashRid,
memcpy(e+1, key, keySize);
memcpy(((byte*)(e+1)) + keySize, val, valSize);
recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize);
recordid bucket_contents;
hashEntry * bucket_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashRid.slot = bucket;
Tread(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, newEntry, 0);
expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize);
Tread(xid, hashRid, bucket_contents);
/* if(!bucket_contents->next.size) { // Size = 0 -> nothing in bucket. Size != 0 -> bucket occupied.
e->next.page = 0;
e->next.slot = 0;
e->next.size = -1;
Tset(xid, hashRid, e);
} else {
recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize);
e->next = bucket_contents->next;
bucket_contents->next = newEntry;
Tset(xid, newEntry, e);
Tset(xid, hashRid, bucket_contents);
}*/
/* hashRid.slot = 0; */
insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, 0);
expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize);
free(bucket_contents);
free(e);
}
/** @todo hash hable probably should track the number of items in it,
so that expand can be selectively called. */
void ThashDelete(int xid, recordid hashRid,
void * key, int keySize) {
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
recordid tmp = hashRid;
tmp.slot = 1;
int ThashDelete(int xid, recordid hashRid,
void * key, int keySize, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(int));
/* recordid tmp = hashRid;
tmp.slot = 1; */
int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
recordid deleteMe;
recordid deleteMe;
/* hashEntry * deleteMe = malloc(sizeof(hashEntry) + keySize + valSize); */
hashRid.slot = bucket_number;
recordid bucket_contents;
Tread(xid, hashRid, &bucket_contents);
hashEntry * bucket_contents = malloc(sizeof(hashEntry) + keySize + valSize);
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, hashRid, bucket_contents);
hashRid.slot = 0;
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, &deleteMe)) {
Tdealloc(xid, deleteMe);
int ret = 0;
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, valSize, &deleteMe)) {
if(deleteMe.size) {
Tdealloc(xid, deleteMe);
}
ret = 1;
}
free(bucket_contents);
return ret;
}
int ThashOpen(int xid, recordid hashRid) {
recordid * headerRidB = malloc(sizeof(recordid));
int ThashOpen(int xid, recordid hashRid, int keySize, int valSize) {
recordid * headerRidB = malloc(sizeof(recordid) + keySize + valSize);
hashRid.slot = 1;
Tread(xid, hashRid, headerRidB);
pblHtInsert(openHashes, &hashRid.page, sizeof(int), headerRidB);
pblHtInsert(openHashes, &(hashRid.page), sizeof(int), headerRidB);
return 0;
}
void ThashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) {
ThashDelete(xid, hashRid, key, keySize);
ThashDelete(xid, hashRid, key, keySize, valSize);
ThashInsert(xid, hashRid, key, keySize, val, valSize);
}
int ThashClose(int xid, recordid hashRid) {
recordid * freeMe = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
pblHtRemove(openHashes, &hashRid.page, sizeof(int));
recordid * freeMe = pblHtLookup(openHashes, &(hashRid.page), sizeof(int));
pblHtRemove(openHashes, &(hashRid.page), sizeof(int));
free(freeMe);
return 0;
}
int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(int));
/* printf("lookup header: %d %d\n", headerHashBits, headerNextSplit); */
recordid tmp = hashRid;
tmp.slot = 1;
int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
int ret = findInBucket(xid, hashRid, bucket_number, key, keySize, buf, valSize);
return ret;
}

View file

@ -243,6 +243,7 @@ Page *pageMalloc() {
}
}*/
void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
assert( (p->id == rid.page) && (p->memAddr != NULL) );
@ -283,3 +284,45 @@ void readRecord(int xid, Page * p, recordid rid, void *buf) {
assert(rid.page == p->id);
}
void readRecordUnlocked(int xid, Page * p, recordid rid, void *buf) {
assert(rid.page == p->id);
int page_type = *page_type_ptr(p);
if(rid.size > BLOB_THRESHOLD_SIZE) {
abort(); /* Unsupported for now. */
readBlob(xid, p, rid, buf);
} else if(page_type == SLOTTED_PAGE) {
slottedReadUnlocked(xid, p, rid, buf);
/* FIXED_PAGES can function correctly even if they have not been
initialized. */
} else if(page_type == FIXED_PAGE || !page_type) {
fixedReadUnlocked(p, rid, buf);
} else {
abort();
}
assert(rid.page == p->id);
}
void writeRecordUnlocked(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
assert( (p->id == rid.page) && (p->memAddr != NULL) );
if(rid.size > BLOB_THRESHOLD_SIZE) {
abort();
writeBlob(xid, p, lsn, rid, dat);
} else if(*page_type_ptr(p) == SLOTTED_PAGE) {
slottedWriteUnlocked(xid, p, lsn, rid, dat);
} else if(*page_type_ptr(p) == FIXED_PAGE || !*page_type_ptr(p) ) {
fixedWriteUnlocked(p, rid, dat);
} else {
abort();
}
assert( (p->id == rid.page) && (p->memAddr != NULL) );
writelock(p->rwlatch, 225); /* Need a writelock so that we can update the lsn. */
pageWriteLSN(p, lsn);
unlock(p->rwlatch);
}

View file

@ -251,12 +251,13 @@ lsn_t pageReadLSN(const Page * page);
* wish to write
*/
void writeRecord(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat);
void writeRecordUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat);
/**
* @param xid transaction ID
* @param dat buffer for data
*/
void readRecord(int xid, Page * page, recordid rid, void *dat);
void readRecordUnlocked(int xid, Page * p, recordid rid, void *buf);
void pageCommit(int xid);
void pageAbort(int xid);

View file

@ -58,9 +58,9 @@ static void checkRid(Page * page, recordid rid) {
assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE);
assert(page->id == rid.page);
assert(*recordsize_ptr(page) == rid.size);
/* assert(recordsPerPage(rid.size) > rid.slot); */
// assert(recordsPerPage(rid.size) > rid.slot);
int recCount = *recordcount_ptr(page);
assert(recCount > rid.slot);
assert(recCount > rid.slot);
} else {
fixedPageInitialize(page, rid.size, recordsPerPage(rid.size));
}
@ -74,8 +74,12 @@ void fixedReadUnlocked(Page * page, recordid rid, byte * buf) {
}
void fixedRead(Page * page, recordid rid, byte * buf) {
readlock(page->rwlatch, 57);
// printf("R { %d %d %d }\n", rid.page, rid.slot, rid.size);
checkRid(page, rid);
fixedReadUnlocked(page, rid, buf);
unlock(page->rwlatch);
@ -83,6 +87,7 @@ void fixedRead(Page * page, recordid rid, byte * buf) {
}
void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) {
checkRid(page,rid);
if(!memcpy(fixed_record_ptr(page, rid.slot), dat, rid.size)) {
perror("memcpy");
abort();
@ -92,7 +97,8 @@ void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) {
void fixedWrite(Page * page, recordid rid, const byte* dat) {
readlock(page->rwlatch, 73);
checkRid(page, rid);
// printf("W { %d %d %d }\n", rid.page, rid.slot, rid.size);
// checkRid(page, rid);
fixedWriteUnlocked(page, rid, dat);

View file

@ -13,7 +13,7 @@ void indirectInitialize(Page * p, int height) {
*page_type_ptr(p) = INDIRECT_PAGE;
memset(p->memAddr, INVALID_SLOT, ((int)level_ptr(p)) - ((int)p->memAddr));
}
/** @todo locking for dereferenceRID? */
recordid dereferenceRID(recordid rid) {
Page * this = loadPage(rid.page);
int offset = 0;

View file

@ -44,6 +44,7 @@ BEGIN_C_DECLS
physical location of the record.
*/
recordid dereferenceRID(recordid rid);
#define dereferenceRIDUnlocked(x) dereferenceRID((x))
void indirectInitialize(Page * p, int height);
recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount);
unsigned int indirectPageRecordCount(recordid rid);

View file

@ -138,7 +138,7 @@ void slottedPageInitialize(Page * page) {
}
static int unlocked_freespace(Page * page) {
int slottedFreespaceUnlocked(Page * page) {
return (int)slot_length_ptr(page, *numslots_ptr(page)) - (int)(page->memAddr + *freespace_ptr(page));
}
@ -146,7 +146,7 @@ static int unlocked_freespace(Page * page) {
int slottedFreespace(Page * page) {
int ret;
readlock(page->rwlatch, 292);
ret = unlocked_freespace(page);
ret = slottedFreespaceUnlocked(page);
readunlock(page->rwlatch);
return ret;
}
@ -228,11 +228,11 @@ static void __really_do_ralloc(Page * page, recordid rid) {
assert(rid.size > 0);
if(unlocked_freespace(page) < rid.size) {
if(slottedFreespaceUnlocked(page) < rid.size) {
slottedCompact(page);
/* Make sure there's enough free space... */
assert (unlocked_freespace(page) >= rid.size);
assert (slottedFreespaceUnlocked(page) >= rid.size);
}
freeSpace = *freespace_ptr(page);
@ -320,6 +320,20 @@ void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid) {
unlock(page->rwlatch);
}
void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) {
int slot_length;
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) {
perror("memcpy");
abort();
}
}
/*
This should trust the rid (since the caller needs to
override the size in special circumstances)
@ -368,6 +382,20 @@ void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat
unlock(page->rwlatch);
}
void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) {
int slot_length;
assert(rid.size < PAGE_SIZE);
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
if(!memcpy(record_ptr(page, rid.slot), data, rid.size)) {
perror("memcpy");
abort();
}
}
/*void slottedSetType(Page * p, int slot, int type) {
assert(type > PAGE_SIZE);

View file

@ -61,6 +61,8 @@ Slotted page layout:
void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data);
void slottedRead(int xid, Page * page, recordid rid, byte *buff);
void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data);
void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff);
void slottedPageInitialize(Page * p);

View file

@ -162,10 +162,32 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
}
void TreadUnlocked(int xid, recordid rid, void * dat) {
Page * p = loadPage(rid.page);
int page_type = *page_type_ptr(p);
if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) {
} else if(page_type == INDIRECT_PAGE) {
releasePage(p);
rid = dereferenceRIDUnlocked(rid);
p = loadPage(rid.page);
} else if(page_type == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRidUnlocked(p, rid.slot);
releasePage(p);
p = loadPage(rid.page);
} else {
abort();
}
readRecordUnlocked(xid, p, rid, dat);
releasePage(p);
}
void Tread(int xid, recordid rid, void * dat) {
Page * p = loadPage(rid.page);
int page_type = *page_type_ptr(p);
if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE ) {
if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) {
} else if(page_type == INDIRECT_PAGE) {
releasePage(p);

View file

@ -126,7 +126,9 @@ START_TEST(simpleLinearHashTest)
assert(rid.page == (i+1));
assert(rid.slot == (i+2));
assert(rid.size == (i+3));
ThashDelete(xid, hashRoot, &i, sizeof(int));
assert(ThashDelete(xid, hashRoot, &i, sizeof(int), sizeof(recordid)));
assert(!ThashDelete(xid, hashRoot, &i, sizeof(int), sizeof(recordid)));
assert(!ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
}
@ -155,13 +157,13 @@ START_TEST(simpleLinearHashTest)
if(i % 10) {
assert(ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
ThashDelete(xid, hashRoot, &i, sizeof(int));
ThashDelete(xid, hashRoot, &i, sizeof(int), sizeof(recordid));
assert(rid.page == (i+1));
assert(rid.slot == (i+2));
assert(rid.size == (i+3));
} else {
assert(!ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
ThashDelete(xid, hashRoot, &i, sizeof(int));
ThashDelete(xid, hashRoot, &i, sizeof(int), sizeof(recordid));
}
}