From ec99c0e258cb9d42a3dd5659b931a61142e1db8f Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Tue, 19 Oct 2004 21:16:37 +0000 Subject: [PATCH] Lots of optimizations; re-did on-disk layout for the linear hash table. --- benchmarks/logicalHash.c | 34 +-- benchmarks/naiveHash.c | 2 +- benchmarks/readLogicalHash.c | 2 +- lladd/common.h | 2 +- lladd/operations/arrayList.h | 1 + lladd/operations/linearHash.h | 2 +- lladd/operations/naiveLinearHash.h | 8 +- lladd/transactional.h | 1 + src/lladd/logger/logWriter.c | 8 +- src/lladd/operations/arrayList.c | 4 +- src/lladd/operations/instantSet.c | 22 +- src/lladd/operations/linearHash.c | 403 +++++++++++++++++++++---- src/lladd/operations/naiveLinearHash.c | 367 +++++++++++++++++----- src/lladd/page.c | 43 +++ src/lladd/page.h | 3 +- src/lladd/page/fixed.c | 12 +- src/lladd/page/indirect.c | 2 +- src/lladd/page/indirect.h | 1 + src/lladd/page/slotted.c | 36 ++- src/lladd/page/slotted.h | 2 + src/lladd/transactional2.c | 24 +- test/lladd/check_linearHash.c | 8 +- 22 files changed, 816 insertions(+), 171 deletions(-) diff --git a/benchmarks/logicalHash.c b/benchmarks/logicalHash.c index 6f8e03d..2b34820 100644 --- a/benchmarks/logicalHash.c +++ b/benchmarks/logicalHash.c @@ -24,23 +24,23 @@ int main(int argc, char** argv) { recordid hash = ThashAlloc(xid, sizeof(int), sizeof(int)); Tcommit(xid); + + int i = 0; + + for(k = 0; k < xact_count; k++) { + + xid = Tbegin(); + + for(;i < count *(k+1) ; i++) { + + TlogicalHashInsert(xid, hash, &i, sizeof(int), &i, sizeof(int)); + + } + + Tcommit(xid); + + } - int i; - - for(k = 0; k < xact_count; k++) { - - xid = Tbegin(); - - for(i = 0; i < count ; i++) { - - TlogicalHashInsert(xid, hash, &i, sizeof(int), &i, sizeof(int)); - - } - - Tcommit(xid); - - } - - /* Tdeinit(); */ + /* Tdeinit(); */ } diff --git a/benchmarks/naiveHash.c b/benchmarks/naiveHash.c index b481ade..c570804 100644 --- a/benchmarks/naiveHash.c +++ b/benchmarks/naiveHash.c @@ -32,6 +32,6 @@ int main(int argc, char** argv) { Tcommit(xid); - Tdeinit(); + /* Tdeinit(); */ } diff --git a/benchmarks/readLogicalHash.c b/benchmarks/readLogicalHash.c index d24e3f4..e7ca3e7 100644 --- a/benchmarks/readLogicalHash.c +++ b/benchmarks/readLogicalHash.c @@ -35,7 +35,7 @@ int main(int argc, char** argv) { xid = Tbegin(); - ThashOpen(xid, hash); + ThashOpen(xid, hash, sizeof(int), sizeof(int)); int k; for(k = 0; k < 10; k++) { diff --git a/lladd/common.h b/lladd/common.h index ad71c53..6161943 100644 --- a/lladd/common.h +++ b/lladd/common.h @@ -103,7 +103,7 @@ extern int errno; /*#define DEBUGGING */ /*#define PROFILE_LATCHES*/ -#define NO_LATCHES +/*#define NO_LATCHES */ #ifdef DEBUGGING /** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */ diff --git a/lladd/operations/arrayList.h b/lladd/operations/arrayList.h index a04ee0c..48cdf6e 100644 --- a/lladd/operations/arrayList.h +++ b/lladd/operations/arrayList.h @@ -25,6 +25,7 @@ Operation getUnInitPage(); #define TunInitPage(xid, rid) Tupdate(xid, rid, NULL, OPERATION_UNINITIALIZE_PAGE) recordid dereferenceArrayListRid(Page * p, int offset); +#define dereferenceArrayListRidUnlocked(x, y) dereferenceArrayListRid((x),(y)) int TarrayListExtend(int xid, recordid rid, int slots); int TarrayListInstantExtend(int xid, recordid rid, int slots); #endif diff --git a/lladd/operations/linearHash.h b/lladd/operations/linearHash.h index c44374c..907f2cb 100644 --- a/lladd/operations/linearHash.h +++ b/lladd/operations/linearHash.h @@ -19,7 +19,7 @@ void ThashInstantInsert(int xid, recordid hashRid, const void * key, int keySize, const void * val, int valSize); void ThashInstantDelete(int xid, recordid hashRid, - const void * key, int keySize); + const void * key, int keySize, int valSize); void ThashInstantUpdate(int xid, recordid hashRid, const void * key, int keySize, const void * val, int valSize); void TlogicalHashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize); void TlogicalHashInsert(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize); diff --git a/lladd/operations/naiveLinearHash.h b/lladd/operations/naiveLinearHash.h index 3f40aaf..b1cf951 100644 --- a/lladd/operations/naiveLinearHash.h +++ b/lladd/operations/naiveLinearHash.h @@ -18,12 +18,14 @@ recordid ThashAlloc(int xid, int keySize, int valSize) ; void ThashInsert(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize); -void ThashDelete(int xid, recordid hashRid, - void * key, int keySize); +/*void ThashDelete(int xid, recordid hashRid, + void * key, int keySize);*/ +int ThashDelete(int xid, recordid hashRid, + void * key, int keySize, int valSize); void ThashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize); int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize); void ThashInit(); void ThashDeinit(); -int ThashOpen(int xid, recordid hashRid); +int ThashOpen(int xid, recordid hashRid, int keySize, int valSize); int ThashClose(int xid, recordid hashRid) ; #endif diff --git a/lladd/transactional.h b/lladd/transactional.h index 8873d89..cf18ec6 100644 --- a/lladd/transactional.h +++ b/lladd/transactional.h @@ -163,6 +163,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op); * @param dat buffer into which data goes */ void Tread(int xid, recordid rid, void *dat); +void TreadUnlocked(int xid, recordid rid, void *dat); /** * @param xid transaction ID diff --git a/src/lladd/logger/logWriter.c b/src/lladd/logger/logWriter.c index 51a34cc..c327672 100644 --- a/src/lladd/logger/logWriter.c +++ b/src/lladd/logger/logWriter.c @@ -39,6 +39,9 @@ authors grant the U.S. Government and others acting in its behalf permission to use and distribute the software in accordance with the terms specified in this license. ---*/ +#define _XOPEN_SOURCE 600 + + #include #include @@ -49,6 +52,7 @@ terms specified in this license. #include "io.h" #include #include +#include #include @@ -127,7 +131,9 @@ pthread_mutex_t truncateLog_mutex; static int sought = 1; int openLogWriter() { #define BUFSIZE 1024*16 - char * buffer = malloc(BUFSIZE); + char * buffer ;/*= malloc(BUFSIZE);*/ + assert(!posix_memalign((void*)&(buffer), PAGE_SIZE, BUFSIZE)); + log = fopen(LOG_FILE, "a+"); if (log==NULL) { diff --git a/src/lladd/operations/arrayList.c b/src/lladd/operations/arrayList.c index 94d0738..f7428f8 100644 --- a/src/lladd/operations/arrayList.c +++ b/src/lladd/operations/arrayList.c @@ -228,7 +228,7 @@ int TarrayListInstantExtend(int xid, recordid rid, int slots) { tmp.slot = MAX_OFFSET_POSITION; int newMaxOffset = tlp.maxOffset+slots; - /** @todo CORRECTNESS BUG: From recovery's point of view, this is totally wrong! The + /** @todo CORRECTNESS BUG: From recovery's point of view, arrayList is totally wrong! The only reason we mess with p is beacuse TinstantSet doesn't handle ARRAY_LIST_PAGES the way we need it to, so this won't be hard to fix... */ @@ -277,7 +277,7 @@ Operation getUnInitPage() { /*----------------------------------------------------------------------------*/ - +/** @todo locking for arrayLists */ recordid dereferenceArrayListRid(Page * p, int offset) { TarrayListParameters tlp = pageToTLP(p); diff --git a/src/lladd/operations/instantSet.c b/src/lladd/operations/instantSet.c index 17ee983..3891825 100644 --- a/src/lladd/operations/instantSet.c +++ b/src/lladd/operations/instantSet.c @@ -48,13 +48,31 @@ terms specified in this license. #include #include #include "../page.h" +#include "../page/fixed.h" + +static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) { + // writeRecord(xid, p, lsn, rid, dat); + writeRecordUnlocked(xid, p, lsn, rid, dat); + return 0; +} + +Operation getInstantSet() { + Operation o = { + OPERATION_INSTANT_SET, /* id */ + SIZEOF_RECORD, /* use the size of the record as size of arg */ + OPERATION_NOOP, + &operate /* Function */ + }; + return o; +} + /** @todo The spirit of instantSet suggests that it should hold a shorter write lock than set, but instant set was written before the lock manager... */ -Operation getInstantSet() { +/*Operation getInstantSet() { Operation o = getSet(); o.id = OPERATION_INSTANT_SET; o.undo = OPERATION_NOOP; return o; -} + }*/ diff --git a/src/lladd/operations/linearHash.c b/src/lladd/operations/linearHash.c index 0c949aa..a4e7c4f 100644 --- a/src/lladd/operations/linearHash.c +++ b/src/lladd/operations/linearHash.c @@ -13,8 +13,8 @@ #define BUCKETS_OFFSET (2) -#define headerKeySize (headerRidA.page) -#define headerValSize (headerRidA.slot) +#define headerKeySize (headerRidA->page) +#define headerValSize (headerRidA->slot) #define headerHashBits (headerRidB->page) #define headerNextSplit (headerRidB->slot) @@ -36,15 +36,18 @@ extern pblHashTable_t * openHashes ; /*pblHashTable_t * openHashes = NULL; */ static int operateUndoInsert(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { - + int keySize = rid.size; + int valSize = rid.slot; + rid.slot = 0; rid.size = sizeof(recordid); if(!pblHtLookup(openHashes, &rid.page, sizeof(int))) { - ThashOpen(xid, rid); + abort(); + /* ThashOpen(xid, rid); */ } - ThashInstantDelete(xid, rid, dat, keySize); + ThashInstantDelete(xid, rid, dat, keySize, valSize); return 0; } @@ -62,8 +65,8 @@ static int operateUndoDelete(int xid, Page * p, lsn_t lsn, recordid rid, const v rid.slot = 0; - /* Tread(xid, dereferenceArrayListRid(p, rid.slot), &headerRidA); */ - /* Tread(xid, rid, &headerRidA); */ + /* TreadUnlocked(xid, dereferenceArrayListRid(p, rid.slot), &headerRidA); */ + /* TreadUnlocked(xid, rid, &headerRidA); */ assert(keySize == sizeof(int)); assert(valSize == sizeof(recordid)); @@ -122,11 +125,12 @@ void TlogicalHashInsert(int xid, recordid hashRid, void * key, int keySize, void assert(headerRidB); + hashRid.slot = valSize; hashRid.size = keySize; Tupdate(xid, hashRid, key, OPERATION_LINEAR_INSERT); /* Perform redo-only insert. */ - hashRid.size = sizeof(recordid); + hashRid.size = sizeof(hashEntry) + keySize + valSize; ThashInstantInsert(xid, hashRid, key, keySize, val, valSize); instant_expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize); @@ -144,7 +148,7 @@ int TlogicalHashDelete(int xid, recordid hashRid, void * key, int keySize, void Tupdate(xid, hashRid, arg, OPERATION_LINEAR_DELETE); free(arg); hashRid.size = sizeof(recordid); - ThashInstantDelete(xid, hashRid, key, keySize); + ThashInstantDelete(xid, hashRid, key, keySize, valSize); return 1; } else { return 0; @@ -155,8 +159,10 @@ int TlogicalHashDelete(int xid, recordid hashRid, void * key, int keySize, void void instant_rehash(int xid, recordid hash, int next_split, int i, int keySize, int valSize); void instant_update_hash_header(int xid, recordid hash, int i, int next_split); -int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_rid, const void * key, int keySize, recordid * deletedEntry); -void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_rid, hashEntry * e, int keySize, int valSize, recordid deletedEntry, int skipDelete); +int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents, + const void * key, int keySize, int valSize, recordid * deletedEntry); +void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents, + hashEntry * e, int keySize, int valSize, int skipDelete); int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize); @@ -167,7 +173,7 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, recordid nextEntry; bucket.slot = bucket_number; - Tread(xid, bucket, &nextEntry); + TreadUnlocked(xid, bucket, &nextEntry); if(nextEntry.size) { assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize); @@ -176,7 +182,7 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int found = 0; while(nextEntry.size > 0) { - Tread(xid, nextEntry, e); + TreadUnlocked(xid, nextEntry, e); if(!memcmp(key, e+1, keySize)) { memcpy(val, ((byte*)(e+1))+keySize, valSize); found = 1; @@ -188,7 +194,7 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, return found; }*/ -int extendCount = 0; +/*int extendCount = 0; void instant_expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) { extendCount ++; if(extendCount >= 70) { @@ -206,18 +212,43 @@ void instant_expand (int xid, recordid hash, int next_split, int i, int keySize, extendCount = 0; } + }*/ +void instant_expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) { + /* Total hack; need to do this better, by storing stuff in the hash table headers.*/ + static int count = 4096 * .25; + count --; +#define AMORTIZE 1000 +#define FF_AM 750 + if(count <= 0 && !(count * -1) % FF_AM) { + recordid * headerRidB = pblHtLookup(openHashes, &(hash.page), sizeof(int)); + int j; + TarrayListInstantExtend(xid, hash, AMORTIZE); + for(j = 0; j < AMORTIZE; j++) { + + if(next_split >= twoToThe(i-1)+2) { + i++; + next_split = 2; + } + instant_rehash(xid, hash, next_split, i, keySize, valSize); + next_split++; + headerNextSplit = next_split; + headerHashBits = i; + } + instant_update_hash_header(xid, hash, i, next_split); + } } + void instant_update_hash_header(int xid, recordid hash, int i, int next_split) { recordid * headerRidB = pblHtLookup(openHashes, &hash.page, sizeof(int)); headerHashBits = i; headerNextSplit = next_split; - + hash.slot = 1; TinstantSet(xid, hash, headerRidB); } -void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) { +//void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) { /* Picture of what's going on: @@ -272,8 +303,8 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz /** @todo Actually implement recovery for linearHash. */ - int firstA = 1; /* Is 'A' the recordid of a bucket? */ - int firstD = 1; /* What about 'D'? */ +/* int firstA = 1; // Is 'A' the recordid of a bucket? + int firstD = 1; // What about 'D'? assert(hashRid.size == sizeof(recordid)); @@ -281,12 +312,12 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1); recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = 0; - recordid ba_contents; Tread(xid, ba, &ba_contents); + recordid ba_contents; TreadUnlocked(xid, ba, &ba_contents); recordid bb_contents = NULLRID; - TinstantSet(xid, bb, &bb_contents);/*Tread(xid, bb, &bb_contents);*/ + TinstantSet(xid, bb, &bb_contents);//TreadUnlocked(xid, bb, &bb_contents); - recordid A = ba; /*ba_contents; */ - recordid D = bb; /*bb_contents; */ + recordid A = ba; //ba_contents; + recordid D = bb; //bb_contents; recordid B = ba_contents; recordid C; @@ -297,7 +328,7 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz while(B.size) { assert(B.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, B, B_contents); + TreadUnlocked(xid, B, B_contents); C = B_contents->next; int old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2; @@ -313,15 +344,15 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz firstA = 0; } else { if(firstD) { - /* D is a bucket entry */ + // D is a bucket entry assert(B.size == sizeof(hashEntry) + keySize + valSize); assert(D.size == sizeof(recordid)); TinstantSet(xid, D, &B); } else { - /* D is the tail of our list. */ + // D is the tail of our list. assert(D.size == sizeof(hashEntry) + keySize + valSize); assert(B.size == 0 || B.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, D, D_contents); + TreadUnlocked(xid, D, D_contents); D_contents->next = B; TinstantSet(xid, D, D_contents); } @@ -331,23 +362,191 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz assert(A.size == sizeof(recordid)); TinstantSet(xid, A, &C); } else { - /* A is somewhere in the first list. */ + // A is somewhere in the first list. assert(A.size == sizeof(hashEntry) + keySize + valSize); assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, A, A_contents); + TreadUnlocked(xid, A, A_contents); A_contents->next = C; TinstantSet(xid, A, A_contents); } - /* B _can't_ be a bucket.*/ + // B _can't_ be a bucket. assert(B.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, B, B_contents); + TreadUnlocked(xid, B, B_contents); B_contents->next = NULLRID; TinstantSet(xid, B, B_contents); - /* Update Loop State */ + // Update Loop State + D = B; + B = C; + C.size = -1; + firstD = 0; + } + } + free(D_contents); + free(A_contents); + free(B_contents); + +} */ + +void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) { + int firstA = 1; // Is 'A' the recordid of a bucket? + int firstD = 1; // What about 'D'? + + /* assert(hashRid.size == sizeof(recordid)); */ + assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); + recordid ba = hashRid; ba.slot = next_split; + recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1); + recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = -1; + + // recordid ba_contents; TreadUnlocked(xid, ba, &ba_contents); + // recordid bb_contents = NULLRID; + /* Tset(xid, bb, &bb_contents); */ //TreadUnlocked(xid, bb, &bb_contents); + + hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); + hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); + hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); + + TreadUnlocked(xid, ba, A_contents); + TreadUnlocked(xid, bb, D_contents); + recordid A = ba; //ba_contents; + recordid D = bb; //bb_contents; + recordid B = A_contents->next; + recordid C; + + if(!A_contents->next.size) { + /* Bucket A is empty, so we're done. */ + free(D_contents); + free(A_contents); + free(B_contents); + /* printf("Expand was a noop.\n"); + fflush(NULL); */ + return; + } + + int old_hash; + int new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; + + while(new_hash != next_split) { + // Need a record in A that belongs in the first bucket... + + recordid oldANext = A_contents->next; + + A_contents->next = NULLRID; + + if(firstD) { + // assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid))); + TinstantSet(xid, D, A_contents); + firstD = 0; + } else { + /* D at end of list => can overwrite next. */ + D_contents->next = Talloc(xid, sizeof(hashEntry) + keySize + valSize); /* @todo + unfortunate + to + dealloc + A's + successor, + then + alloc.. */ + // assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid))); + TinstantSet(xid, D_contents->next, A_contents); + // assert(memcmp(&D, &D_contents->next, sizeof(recordid))); + TinstantSet(xid, D, D_contents); + D = A; + } + hashEntry * swap = D_contents; + D_contents = A_contents; + A_contents = swap; + + /* A_contents is now garbage. */ + + assert(A.size == sizeof(hashEntry) + keySize + valSize); + if(oldANext.size == -1) { + memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize); + // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); + TinstantSet(xid, A, A_contents); + free(D_contents); + free(A_contents); + free(B_contents); + /* printf("Loop 1 returning.\n"); + fflush(NULL); */ + return; + } + assert(oldANext.size == sizeof(hashEntry) + keySize + valSize); + TreadUnlocked(xid, oldANext, A_contents); + // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); + TinstantSet(xid, A, A_contents); + Tdealloc(xid, oldANext); + + new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; + } + /* printf("Got past loop 1\n"); + fflush(NULL); */ + + B = A_contents->next; + + while(B.size != -1) { + assert(B.size == sizeof(hashEntry) + keySize + valSize); + TreadUnlocked(xid, B, B_contents); + C = B_contents->next; + + old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2; + new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2; + + assert(next_split == old_hash); + assert(new_hash == old_hash || new_hash == old_hash + twoToThe(i-1)); + + if(new_hash == old_hash) { + A = B; + B = C; + C.size = -1; + firstA = 0; + } else { + /* if(firstD) { + // D is a bucket entry + assert(B.size == sizeof(hashEntry) + keySize + valSize); + assert(D.size == sizeof(recordid)); + Tset(xid, D, &B); + } else { */ + // D is the tail of our list. + assert(D.size == sizeof(hashEntry) + keySize + valSize); + assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize); + TreadUnlocked(xid, D, D_contents); + D_contents->next = B; + assert(B.size != 0); + // assert(memcmp(&D, &D_contents->next, sizeof(recordid))); + TinstantSet(xid, D, D_contents); + /* } */ + + /* if(firstA) { + assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize); + assert(A.size == sizeof(recordid)); + Tset(xid, A, &C); + } else { */ + // A is somewhere in the first list. + assert(A.size == sizeof(hashEntry) + keySize + valSize); + assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize); + TreadUnlocked(xid, A, A_contents); + A_contents->next = C; + assert(C.size != 0); + + + // assert(memcmp(&A, &A_contents->next, sizeof(recordid))); + + TinstantSet(xid, A, A_contents); + /* } */ + + // B _can't_ be a bucket. + + + assert(B.size == sizeof(hashEntry) + keySize + valSize); + TreadUnlocked(xid, B, B_contents); + B_contents->next = NULLRID; + TinstantSet(xid, B, B_contents); + + // Update Loop State D = B; B = C; C.size = -1; @@ -359,21 +558,123 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz free(B_contents); } -/** @todo fine grained locking for insertIntoBucket and the other operations in this file. */ -void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) { - assert(newEntry.size == (sizeof(hashEntry) + keySize + valSize)); + +void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents, + hashEntry * e, int keySize, int valSize, int skipDelete) { recordid deleteMe; if(!skipDelete) { - if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) { - /* Tdealloc(xid, deleteMe); */ - hashRid.slot = bucket_number; - Tread(xid, hashRid, &bucket_contents); - hashRid.slot = 0; + if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, valSize, &deleteMe)) { + if(deleteMe.size) { + Tdealloc(xid, deleteMe); + hashRid.slot = bucket_number; + assert(hashRid.size == sizeof(hashEntry) + valSize + keySize); + TreadUnlocked(xid, hashRid, bucket_contents); + hashRid.slot = 0; + } } } /*@todo consider recovery for insertIntoBucket. */ + hashRid.slot = bucket_number; + assert(hashRid.size == sizeof(hashEntry) + valSize + keySize); + TreadUnlocked(xid, hashRid, bucket_contents); + + assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); + if(!bucket_contents->next.size) { // Size = 0 -> nothing in bucket. Size != 0 -> bucket occupied. + e->next.page = 0; + e->next.slot = 0; + e->next.size = -1; + TinstantSet(xid, hashRid, e); + } else { + recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize); + e->next = bucket_contents->next; + bucket_contents->next = newEntry; + assert(newEntry.size == sizeof(hashEntry) + keySize + valSize); + TinstantSet(xid, newEntry, e); + assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); + TinstantSet(xid, hashRid, bucket_contents); + } + +} + +int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents, + const void * key, int keySize, int valSize, recordid * deletedEntry) { + if(bucket_contents->next.size == 0) { return 0; } + + recordid this = hash; + this.slot = bucket_number; + + int found = 0; + if(!memcmp(bucket_contents+1, key, keySize)) { + if(deletedEntry) + deletedEntry->size = 0; /* size = 0 -> don't delete (this is a bucket!) */ + if(bucket_contents->next.size == -1) { + memset(bucket_contents, 0, sizeof(hashEntry) + keySize + valSize); + TinstantSet(xid, this, bucket_contents); + } else { + assert(bucket_contents->next.size == sizeof(hashEntry) + keySize + valSize); + recordid oldNext = bucket_contents->next; + TreadUnlocked(xid, bucket_contents->next, bucket_contents); + TinstantSet(xid, this, bucket_contents); + *deletedEntry = oldNext; /* @todo delete from bucket really should do its own deallocation.. */ + } + return 1; + } + + if(bucket_contents->next.size == -1) { return 0; } + + hashEntry * A = malloc(sizeof(hashEntry) + keySize + valSize); + hashEntry * B = malloc(sizeof(hashEntry) + keySize + valSize); + + recordid Aaddr, Baddr; + + memcpy(B, bucket_contents, sizeof(hashEntry) + keySize + valSize); + Baddr = this; + while(B->next.size != -1) { + hashEntry * tmp = A; + A = B; + Aaddr = Baddr; + B = tmp; + assert(A->next.size == sizeof(hashEntry) + keySize + valSize); + Baddr = A->next; + TreadUnlocked(xid, Baddr, B); + + if(!memcmp(B+1, key, keySize)) { + A->next = B->next; + assert(Aaddr.size == sizeof(hashEntry) + keySize + valSize); + TinstantSet(xid, Aaddr, A); + if(deletedEntry) { + *deletedEntry = Baddr; + } + found = 1; + break; + } + + } + + free(A); + free(B); + + return found; +} + + +/** @todo fine grained locking for insertIntoBucket and the other operations in this file. */ +/*void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) { + assert(newEntry.size == (sizeof(hashEntry) + keySize + valSize)); + recordid deleteMe; + if(!skipDelete) { + if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) { + //Tdealloc(xid, deleteMe); + hashRid.slot = bucket_number; + TreadUnlocked(xid, hashRid, &bucket_contents); + hashRid.slot = 0; + } + } + + //@todo consider recovery for insertIntoBucket. + recordid bucket = hashRid; bucket.slot = bucket_number; assert(bucket_contents.size == 0 || bucket_contents.size == sizeof(hashEntry) + keySize + valSize); @@ -400,14 +701,14 @@ int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, recordid int found = 0; recordid lastEntry; while(nextEntry.size > 0) { - Tread(xid, nextEntry, e); + TreadUnlocked(xid, nextEntry, e); if(!memcmp(key, e+1, keySize)) { if(first) { assert(e->next.size < 40); TinstantSet(xid, bucket, &(e->next)); } else { recordid next = e->next; - Tread(xid, lastEntry, e); + TreadUnlocked(xid, lastEntry, e); assert(next.size < 40); e->next = next; TinstantSet(xid, lastEntry, e); @@ -423,7 +724,7 @@ int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, recordid free(e); return found; } - +*/ /* Same as normal implementation, so commented it out. / * recordid ThashAlloc(int xid, int keySize, int valSize) { @@ -475,13 +776,13 @@ void ThashInstantInsert(int xid, recordid hashRid, memcpy(((byte*)(e+1)) + keySize, val, valSize); /** @todo instantAlloc / instantDealloc */ - recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize); - recordid bucket_contents; + /* recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize); */ + hashEntry * bucket_contents = malloc(sizeof(hashEntry) + keySize + valSize); hashRid.slot = bucket; - Tread(xid, hashRid, &bucket_contents); + TreadUnlocked(xid, hashRid, bucket_contents); hashRid.slot = 0; - instant_insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, newEntry, 0); + instant_insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, 0); free(e); @@ -489,7 +790,7 @@ void ThashInstantInsert(int xid, recordid hashRid, /** @todo hash hable probably should track the number of items in it, so that expand can be selectively called. */ void ThashInstantDelete(int xid, recordid hashRid, - const void * key, int keySize) { + const void * key, int keySize, int valSize) { recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int)); recordid tmp = hashRid; tmp.slot = 1; @@ -497,10 +798,10 @@ void ThashInstantDelete(int xid, recordid hashRid, int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2; recordid deleteMe; hashRid.slot = bucket_number; - recordid bucket_contents; - Tread(xid, hashRid, &bucket_contents); + hashEntry * bucket_contents = malloc(sizeof(hashEntry) + keySize + valSize); + TreadUnlocked(xid, hashRid, bucket_contents); hashRid.slot = 0; - if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, &deleteMe)) { + if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, valSize, &deleteMe)) { /* Tdealloc(xid, deleteMe); */ } } @@ -508,7 +809,7 @@ void ThashInstantDelete(int xid, recordid hashRid, /*int ThashOpen(int xid, recordid hashRid) { recordid * headerRidB = malloc(sizeof(recordid)); hashRid.slot = 1; - Tread(xid, hashRid, headerRidB); + TreadUnlocked(xid, hashRid, headerRidB); pblHtInsert(openHashes, &hashRid.page, sizeof(int), headerRidB); diff --git a/src/lladd/operations/naiveLinearHash.c b/src/lladd/operations/naiveLinearHash.c index ee852b4..af31117 100644 --- a/src/lladd/operations/naiveLinearHash.c +++ b/src/lladd/operations/naiveLinearHash.c @@ -11,8 +11,8 @@ #define BUCKETS_OFFSET (2) -#define headerKeySize (headerRidA.page) -#define headerValSize (headerRidA.slot) +#define headerKeySize (headerRidA->page) +#define headerValSize (headerRidA->slot) #define headerHashBits (headerRidB->page) #define headerNextSplit (headerRidB->slot) @@ -33,8 +33,10 @@ pblHashTable_t * openHashes = NULL; void rehash(int xid, recordid hash, int next_split, int i, int keySize, int valSize); void update_hash_header(int xid, recordid hash, int i, int next_split); -int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_rid, void * key, int keySize, recordid * deletedEntry); -void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_rid, hashEntry * e, int keySize, int valSize, recordid deletedEntry, int skipDelete); +int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents, + void * key, int keySize, int valSize, recordid * deletedEntry); +void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents, + hashEntry * e, int keySize, int valSize, int skipDelete); int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize); @@ -45,14 +47,15 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, recordid nextEntry; hashRid.slot = bucket_number; - Tread(xid, hashRid, &nextEntry); + /* Tread(xid, hashRid, &nextEntry); */ + nextEntry = hashRid; int found = 0; - while(nextEntry.size > 0) { + while(nextEntry.size != -1 && nextEntry.size != 0) { assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize); Tread(xid, nextEntry, e); - if(!memcmp(key, e+1, keySize)) { + if(!memcmp(key, e+1, keySize) && e->next.size != 0) { memcpy(val, ((byte*)(e+1))+keySize, valSize); found = 1; break; @@ -63,23 +66,42 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, return found; } + void expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) { - TarrayListExtend(xid, hash, 1); - if(next_split >= twoToThe(i-1)+2) { - i++; - next_split = 2; + /* Total hack; need to do this better, by storing stuff in the hash table headers.*/ + static int count = 4096 * .25; + count --; +#define AMORTIZE 1000 +#define FF_AM 750 + if(count <= 0 && !(count * -1) % FF_AM) { + recordid * headerRidB = pblHtLookup(openHashes, &(hash.page), sizeof(int)); + int j; + TarrayListExtend(xid, hash, AMORTIZE); + for(j = 0; j < AMORTIZE; j++) { + + if(next_split >= twoToThe(i-1)+2) { + i++; + next_split = 2; + } + rehash(xid, hash, next_split, i, keySize, valSize); + next_split++; + headerNextSplit = next_split; + headerHashBits = i; + } + update_hash_header(xid, hash, i, next_split); } - rehash(xid, hash, next_split, i, keySize, valSize); - next_split++; - update_hash_header(xid, hash, i, next_split); } void update_hash_header(int xid, recordid hash, int i, int next_split) { - recordid * headerRidB = pblHtLookup(openHashes, &hash.page, sizeof(int)); + hashEntry * he = pblHtLookup(openHashes, &(hash.page), sizeof(int)); + assert(he); + recordid * headerRidB = &he->next; + + assert(headerRidB); headerHashBits = i; headerNextSplit = next_split; - + hash.slot = 1; Tset(xid, hash, headerRidB); } @@ -134,33 +156,105 @@ void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int v int firstA = 1; // Is 'A' the recordid of a bucket? int firstD = 1; // What about 'D'? - assert(hashRid.size == sizeof(recordid)); - + /* assert(hashRid.size == sizeof(recordid)); */ + assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); recordid ba = hashRid; ba.slot = next_split; recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1); - recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = 0; + recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = -1; - recordid ba_contents; Tread(xid, ba, &ba_contents); - recordid bb_contents = NULLRID; - Tset(xid, bb, &bb_contents);//Tread(xid, bb, &bb_contents); + // recordid ba_contents; Tread(xid, ba, &ba_contents); + // recordid bb_contents = NULLRID; + /* Tset(xid, bb, &bb_contents); */ //Tread(xid, bb, &bb_contents); - recordid A = ba; //ba_contents; - recordid D = bb; //bb_contents; - recordid B = ba_contents; - recordid C; - - hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); - while(B.size) { + Tread(xid, ba, A_contents); + Tread(xid, bb, D_contents); + recordid A = ba; //ba_contents; + recordid D = bb; //bb_contents; + recordid B = A_contents->next; + recordid C; + + if(!A_contents->next.size) { + /* Bucket A is empty, so we're done. */ + free(D_contents); + free(A_contents); + free(B_contents); + /* printf("Expand was a noop.\n"); + fflush(NULL); */ + return; + } + + int old_hash; + int new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; + + while(new_hash != next_split) { + // Need a record in A that belongs in the first bucket... + + recordid oldANext = A_contents->next; + + A_contents->next = NULLRID; + + if(firstD) { + // assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid))); + Tset(xid, D, A_contents); + firstD = 0; + } else { + /* D at end of list => can overwrite next. */ + D_contents->next = Talloc(xid, sizeof(hashEntry) + keySize + valSize); /* @todo + unfortunate + to + dealloc + A's + successor, + then + alloc.. */ + // assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid))); + Tset(xid, D_contents->next, A_contents); + // assert(memcmp(&D, &D_contents->next, sizeof(recordid))); + Tset(xid, D, D_contents); + D = A; + } + hashEntry * swap = D_contents; + D_contents = A_contents; + A_contents = swap; + + /* A_contents is now garbage. */ + + assert(A.size == sizeof(hashEntry) + keySize + valSize); + if(oldANext.size == -1) { + memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize); + // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); + Tset(xid, A, A_contents); + free(D_contents); + free(A_contents); + free(B_contents); + /* printf("Loop 1 returning.\n"); + fflush(NULL); */ + return; + } + assert(oldANext.size == sizeof(hashEntry) + keySize + valSize); + Tread(xid, oldANext, A_contents); + // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); + Tset(xid, A, A_contents); + Tdealloc(xid, oldANext); + + new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; + } + /* printf("Got past loop 1\n"); + fflush(NULL); */ + + B = A_contents->next; + + while(B.size != -1) { assert(B.size == sizeof(hashEntry) + keySize + valSize); Tread(xid, B, B_contents); C = B_contents->next; - int old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2; - int new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2; + old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2; + new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2; assert(next_split == old_hash); assert(new_hash == old_hash || new_hash == old_hash + twoToThe(i-1)); @@ -171,32 +265,39 @@ void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int v C.size = -1; firstA = 0; } else { - if(firstD) { + /* if(firstD) { // D is a bucket entry assert(B.size == sizeof(hashEntry) + keySize + valSize); assert(D.size == sizeof(recordid)); Tset(xid, D, &B); - } else { + } else { */ // D is the tail of our list. assert(D.size == sizeof(hashEntry) + keySize + valSize); - assert(B.size == 0 || B.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, D, D_contents); + assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize); + Tread(xid, D, D_contents); D_contents->next = B; + assert(B.size != 0); + // assert(memcmp(&D, &D_contents->next, sizeof(recordid))); Tset(xid, D, D_contents); - } + /* } */ - if(firstA) { + /* if(firstA) { assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize); assert(A.size == sizeof(recordid)); Tset(xid, A, &C); - } else { + } else { */ // A is somewhere in the first list. assert(A.size == sizeof(hashEntry) + keySize + valSize); - assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize); + assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize); Tread(xid, A, A_contents); A_contents->next = C; + assert(C.size != 0); + + + // assert(memcmp(&A, &A_contents->next, sizeof(recordid))); + Tset(xid, A, A_contents); - } + /* } */ // B _can't_ be a bucket. @@ -218,29 +319,111 @@ void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int v free(B_contents); } -void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) { +void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents, + hashEntry * e, int keySize, int valSize, int skipDelete) { recordid deleteMe; if(!skipDelete) { - if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) { + if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, valSize, &deleteMe)) { Tdealloc(xid, deleteMe); hashRid.slot = bucket_number; - Tread(xid, hashRid, &bucket_contents); + assert(hashRid.size == sizeof(hashEntry) + valSize + keySize); + Tread(xid, hashRid, bucket_contents); hashRid.slot = 0; } } /*@todo consider recovery for insertIntoBucket. */ + hashRid.slot = bucket_number; + assert(hashRid.size == sizeof(hashEntry) + valSize + keySize); + Tread(xid, hashRid, bucket_contents); + + assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); + if(!bucket_contents->next.size) { // Size = 0 -> nothing in bucket. Size != 0 -> bucket occupied. + e->next.page = 0; + e->next.slot = 0; + e->next.size = -1; + Tset(xid, hashRid, e); + } else { + recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize); + e->next = bucket_contents->next; + bucket_contents->next = newEntry; + assert(newEntry.size == sizeof(hashEntry) + keySize + valSize); + Tset(xid, newEntry, e); + assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); + Tset(xid, hashRid, bucket_contents); + } + + /* recordid bucket = hashRid; bucket.slot = bucket_number; e->next = bucket_contents; Tset(xid, newEntry, e); - Tset(xid, bucket, &newEntry); + Tset(xid, bucket, &newEntry); */ } -int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_contents, void * key, int keySize, recordid * deletedEntry) { - hashEntry * e; +int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents, + void * key, int keySize, int valSize, recordid * deletedEntry) { + if(bucket_contents->next.size == 0) { return 0; } + + recordid this = hash; + this.slot = bucket_number; + + int found = 0; + if(!memcmp(bucket_contents+1, key, keySize)) { + if(deletedEntry) + deletedEntry->size = 0; /* size = 0 -> don't delete (this is a bucket!) */ + if(bucket_contents->next.size == -1) { + memset(bucket_contents, 0, sizeof(hashEntry) + keySize + valSize); + Tset(xid, this, bucket_contents); + } else { + assert(bucket_contents->next.size == sizeof(hashEntry) + keySize + valSize); + recordid oldNext = bucket_contents->next; + Tread(xid, bucket_contents->next, bucket_contents); + Tset(xid, this, bucket_contents); + *deletedEntry = oldNext; /* @todo delete from bucket really should do its own deallocation.. */ + } + return 1; + } + + if(bucket_contents->next.size == -1) { return 0; } + + hashEntry * A = malloc(sizeof(hashEntry) + keySize + valSize); + hashEntry * B = malloc(sizeof(hashEntry) + keySize + valSize); + + recordid Aaddr, Baddr; + + memcpy(B, bucket_contents, sizeof(hashEntry) + keySize + valSize); + Baddr = this; + while(B->next.size != -1) { + hashEntry * tmp = A; + A = B; + Aaddr = Baddr; + B = tmp; + assert(A->next.size == sizeof(hashEntry) + keySize + valSize); + Baddr = A->next; + Tread(xid, Baddr, B); + + if(!memcmp(B+1, key, keySize)) { + A->next = B->next; + assert(Aaddr.size == sizeof(hashEntry) + keySize + valSize); + Tset(xid, Aaddr, A); + if(deletedEntry) { + *deletedEntry = Baddr; + } + found = 1; + break; + } + + } + + free(A); + free(B); + + return found; +} + /* hashEntry * e; recordid bucket = hash; bucket.slot = bucket_number; recordid nextEntry; @@ -275,16 +458,15 @@ int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_ first = 0; nextEntry = e->next; } - return found; -} + return found; */ recordid ThashAlloc(int xid, int keySize, int valSize) { /* Want 16 buckets, doubling on overflow. */ - recordid rid = TarrayListAlloc(xid, 4096, 2, sizeof(recordid)); + recordid rid = TarrayListAlloc(xid, 4096, 2, sizeof(hashEntry) + keySize + valSize); TarrayListExtend(xid, rid, 4096+2); - recordid headerRidA; - recordid * headerRidB = malloc (sizeof(recordid)); + recordid * headerRidA = calloc (1, sizeof(recordid) + keySize + valSize); + recordid * headerRidB = calloc (1, sizeof(recordid) + keySize + valSize); headerKeySize = keySize; headerValSize = valSize; @@ -295,9 +477,11 @@ recordid ThashAlloc(int xid, int keySize, int valSize) { rid.slot =0; Tset(xid, rid, &headerRidA); rid.slot =1; - Tset(xid, rid, headerRidB); + Tset(xid, rid, &headerRidB); - pblHtInsert(openHashes, &rid.page, sizeof(int), headerRidB); + pblHtInsert(openHashes, &(rid.page), sizeof(int), headerRidB); + + free(headerRidA); rid.slot =0; return rid; @@ -315,7 +499,9 @@ void ThashInsert(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) { - recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int)); + recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(int)); + + /* printf("header: %d %d\n", headerHashBits, headerNextSplit); */ int bucket = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2; @@ -323,67 +509,92 @@ void ThashInsert(int xid, recordid hashRid, memcpy(e+1, key, keySize); memcpy(((byte*)(e+1)) + keySize, val, valSize); - recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize); - - recordid bucket_contents; + hashEntry * bucket_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); hashRid.slot = bucket; - Tread(xid, hashRid, &bucket_contents); - hashRid.slot = 0; - insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, newEntry, 0); - expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize); + Tread(xid, hashRid, bucket_contents); + /* if(!bucket_contents->next.size) { // Size = 0 -> nothing in bucket. Size != 0 -> bucket occupied. + e->next.page = 0; + e->next.slot = 0; + e->next.size = -1; + Tset(xid, hashRid, e); + } else { + recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize); + e->next = bucket_contents->next; + bucket_contents->next = newEntry; + Tset(xid, newEntry, e); + Tset(xid, hashRid, bucket_contents); + + }*/ + + /* hashRid.slot = 0; */ + insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, 0); + expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize); + + free(bucket_contents); free(e); } /** @todo hash hable probably should track the number of items in it, so that expand can be selectively called. */ -void ThashDelete(int xid, recordid hashRid, - void * key, int keySize) { - recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int)); - recordid tmp = hashRid; - tmp.slot = 1; +int ThashDelete(int xid, recordid hashRid, + void * key, int keySize, int valSize) { + recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(int)); + /* recordid tmp = hashRid; + tmp.slot = 1; */ int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2; - recordid deleteMe; + recordid deleteMe; + /* hashEntry * deleteMe = malloc(sizeof(hashEntry) + keySize + valSize); */ hashRid.slot = bucket_number; - recordid bucket_contents; - Tread(xid, hashRid, &bucket_contents); + + hashEntry * bucket_contents = malloc(sizeof(hashEntry) + keySize + valSize); + assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); + Tread(xid, hashRid, bucket_contents); hashRid.slot = 0; - if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, &deleteMe)) { - Tdealloc(xid, deleteMe); + int ret = 0; + if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, valSize, &deleteMe)) { + if(deleteMe.size) { + Tdealloc(xid, deleteMe); + } + ret = 1; } + free(bucket_contents); + return ret; } -int ThashOpen(int xid, recordid hashRid) { - recordid * headerRidB = malloc(sizeof(recordid)); +int ThashOpen(int xid, recordid hashRid, int keySize, int valSize) { + recordid * headerRidB = malloc(sizeof(recordid) + keySize + valSize); hashRid.slot = 1; Tread(xid, hashRid, headerRidB); - pblHtInsert(openHashes, &hashRid.page, sizeof(int), headerRidB); + pblHtInsert(openHashes, &(hashRid.page), sizeof(int), headerRidB); return 0; } void ThashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) { - ThashDelete(xid, hashRid, key, keySize); + ThashDelete(xid, hashRid, key, keySize, valSize); ThashInsert(xid, hashRid, key, keySize, val, valSize); } int ThashClose(int xid, recordid hashRid) { - recordid * freeMe = pblHtLookup(openHashes, &hashRid.page, sizeof(int)); - pblHtRemove(openHashes, &hashRid.page, sizeof(int)); + recordid * freeMe = pblHtLookup(openHashes, &(hashRid.page), sizeof(int)); + pblHtRemove(openHashes, &(hashRid.page), sizeof(int)); free(freeMe); return 0; } int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize) { - recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int)); + recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(int)); + /* printf("lookup header: %d %d\n", headerHashBits, headerNextSplit); */ recordid tmp = hashRid; tmp.slot = 1; int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2; int ret = findInBucket(xid, hashRid, bucket_number, key, keySize, buf, valSize); return ret; } + diff --git a/src/lladd/page.c b/src/lladd/page.c index 315a29f..ae582aa 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -243,6 +243,7 @@ Page *pageMalloc() { } }*/ + void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { assert( (p->id == rid.page) && (p->memAddr != NULL) ); @@ -283,3 +284,45 @@ void readRecord(int xid, Page * p, recordid rid, void *buf) { assert(rid.page == p->id); } + +void readRecordUnlocked(int xid, Page * p, recordid rid, void *buf) { + assert(rid.page == p->id); + + int page_type = *page_type_ptr(p); + + if(rid.size > BLOB_THRESHOLD_SIZE) { + abort(); /* Unsupported for now. */ + readBlob(xid, p, rid, buf); + } else if(page_type == SLOTTED_PAGE) { + slottedReadUnlocked(xid, p, rid, buf); + /* FIXED_PAGES can function correctly even if they have not been + initialized. */ + } else if(page_type == FIXED_PAGE || !page_type) { + fixedReadUnlocked(p, rid, buf); + } else { + abort(); + } + assert(rid.page == p->id); +} + +void writeRecordUnlocked(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { + + assert( (p->id == rid.page) && (p->memAddr != NULL) ); + + if(rid.size > BLOB_THRESHOLD_SIZE) { + abort(); + writeBlob(xid, p, lsn, rid, dat); + } else if(*page_type_ptr(p) == SLOTTED_PAGE) { + slottedWriteUnlocked(xid, p, lsn, rid, dat); + } else if(*page_type_ptr(p) == FIXED_PAGE || !*page_type_ptr(p) ) { + fixedWriteUnlocked(p, rid, dat); + } else { + abort(); + } + assert( (p->id == rid.page) && (p->memAddr != NULL) ); + + writelock(p->rwlatch, 225); /* Need a writelock so that we can update the lsn. */ + pageWriteLSN(p, lsn); + unlock(p->rwlatch); + +} diff --git a/src/lladd/page.h b/src/lladd/page.h index 1f2168d..e21ae38 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -251,12 +251,13 @@ lsn_t pageReadLSN(const Page * page); * wish to write */ void writeRecord(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat); - +void writeRecordUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat); /** * @param xid transaction ID * @param dat buffer for data */ void readRecord(int xid, Page * page, recordid rid, void *dat); +void readRecordUnlocked(int xid, Page * p, recordid rid, void *buf); void pageCommit(int xid); void pageAbort(int xid); diff --git a/src/lladd/page/fixed.c b/src/lladd/page/fixed.c index 16706a9..df252a4 100644 --- a/src/lladd/page/fixed.c +++ b/src/lladd/page/fixed.c @@ -58,9 +58,9 @@ static void checkRid(Page * page, recordid rid) { assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); assert(page->id == rid.page); assert(*recordsize_ptr(page) == rid.size); - /* assert(recordsPerPage(rid.size) > rid.slot); */ + // assert(recordsPerPage(rid.size) > rid.slot); int recCount = *recordcount_ptr(page); - assert(recCount > rid.slot); + assert(recCount > rid.slot); } else { fixedPageInitialize(page, rid.size, recordsPerPage(rid.size)); } @@ -74,8 +74,12 @@ void fixedReadUnlocked(Page * page, recordid rid, byte * buf) { } void fixedRead(Page * page, recordid rid, byte * buf) { readlock(page->rwlatch, 57); + + // printf("R { %d %d %d }\n", rid.page, rid.slot, rid.size); checkRid(page, rid); + + fixedReadUnlocked(page, rid, buf); unlock(page->rwlatch); @@ -83,6 +87,7 @@ void fixedRead(Page * page, recordid rid, byte * buf) { } void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) { + checkRid(page,rid); if(!memcpy(fixed_record_ptr(page, rid.slot), dat, rid.size)) { perror("memcpy"); abort(); @@ -92,7 +97,8 @@ void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) { void fixedWrite(Page * page, recordid rid, const byte* dat) { readlock(page->rwlatch, 73); - checkRid(page, rid); + // printf("W { %d %d %d }\n", rid.page, rid.slot, rid.size); + // checkRid(page, rid); fixedWriteUnlocked(page, rid, dat); diff --git a/src/lladd/page/indirect.c b/src/lladd/page/indirect.c index e498c2b..610e775 100644 --- a/src/lladd/page/indirect.c +++ b/src/lladd/page/indirect.c @@ -13,7 +13,7 @@ void indirectInitialize(Page * p, int height) { *page_type_ptr(p) = INDIRECT_PAGE; memset(p->memAddr, INVALID_SLOT, ((int)level_ptr(p)) - ((int)p->memAddr)); } - +/** @todo locking for dereferenceRID? */ recordid dereferenceRID(recordid rid) { Page * this = loadPage(rid.page); int offset = 0; diff --git a/src/lladd/page/indirect.h b/src/lladd/page/indirect.h index eed83a8..54e2f68 100644 --- a/src/lladd/page/indirect.h +++ b/src/lladd/page/indirect.h @@ -44,6 +44,7 @@ BEGIN_C_DECLS physical location of the record. */ recordid dereferenceRID(recordid rid); +#define dereferenceRIDUnlocked(x) dereferenceRID((x)) void indirectInitialize(Page * p, int height); recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount); unsigned int indirectPageRecordCount(recordid rid); diff --git a/src/lladd/page/slotted.c b/src/lladd/page/slotted.c index 6bc7302..f4a085f 100644 --- a/src/lladd/page/slotted.c +++ b/src/lladd/page/slotted.c @@ -138,7 +138,7 @@ void slottedPageInitialize(Page * page) { } -static int unlocked_freespace(Page * page) { +int slottedFreespaceUnlocked(Page * page) { return (int)slot_length_ptr(page, *numslots_ptr(page)) - (int)(page->memAddr + *freespace_ptr(page)); } @@ -146,7 +146,7 @@ static int unlocked_freespace(Page * page) { int slottedFreespace(Page * page) { int ret; readlock(page->rwlatch, 292); - ret = unlocked_freespace(page); + ret = slottedFreespaceUnlocked(page); readunlock(page->rwlatch); return ret; } @@ -228,11 +228,11 @@ static void __really_do_ralloc(Page * page, recordid rid) { assert(rid.size > 0); - if(unlocked_freespace(page) < rid.size) { + if(slottedFreespaceUnlocked(page) < rid.size) { slottedCompact(page); /* Make sure there's enough free space... */ - assert (unlocked_freespace(page) >= rid.size); + assert (slottedFreespaceUnlocked(page) >= rid.size); } freeSpace = *freespace_ptr(page); @@ -320,6 +320,20 @@ void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid) { unlock(page->rwlatch); } +void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) { + int slot_length; + + assert(page->id == rid.page); + slot_length = *slot_length_ptr(page, rid.slot); + assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE)); + + if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) { + perror("memcpy"); + abort(); + } + +} + /* This should trust the rid (since the caller needs to override the size in special circumstances) @@ -368,6 +382,20 @@ void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat unlock(page->rwlatch); } +void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) { + int slot_length; + + assert(rid.size < PAGE_SIZE); + assert(page->id == rid.page); + + slot_length = *slot_length_ptr(page, rid.slot); + assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE)); + + if(!memcpy(record_ptr(page, rid.slot), data, rid.size)) { + perror("memcpy"); + abort(); + } +} /*void slottedSetType(Page * p, int slot, int type) { assert(type > PAGE_SIZE); diff --git a/src/lladd/page/slotted.h b/src/lladd/page/slotted.h index 9aef2f7..9f37f96 100644 --- a/src/lladd/page/slotted.h +++ b/src/lladd/page/slotted.h @@ -61,6 +61,8 @@ Slotted page layout: void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data); void slottedRead(int xid, Page * page, recordid rid, byte *buff); +void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data); +void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff); void slottedPageInitialize(Page * p); diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index b71c2b5..af59d5f 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -162,10 +162,32 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) { } +void TreadUnlocked(int xid, recordid rid, void * dat) { + Page * p = loadPage(rid.page); + int page_type = *page_type_ptr(p); + if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) { + + } else if(page_type == INDIRECT_PAGE) { + releasePage(p); + rid = dereferenceRIDUnlocked(rid); + p = loadPage(rid.page); + + } else if(page_type == ARRAY_LIST_PAGE) { + rid = dereferenceArrayListRidUnlocked(p, rid.slot); + releasePage(p); + p = loadPage(rid.page); + + } else { + abort(); + } + readRecordUnlocked(xid, p, rid, dat); + releasePage(p); +} + void Tread(int xid, recordid rid, void * dat) { Page * p = loadPage(rid.page); int page_type = *page_type_ptr(p); - if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE ) { + if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) { } else if(page_type == INDIRECT_PAGE) { releasePage(p); diff --git a/test/lladd/check_linearHash.c b/test/lladd/check_linearHash.c index fe5d7b8..ba7229c 100644 --- a/test/lladd/check_linearHash.c +++ b/test/lladd/check_linearHash.c @@ -126,7 +126,9 @@ START_TEST(simpleLinearHashTest) assert(rid.page == (i+1)); assert(rid.slot == (i+2)); assert(rid.size == (i+3)); - ThashDelete(xid, hashRoot, &i, sizeof(int)); + assert(ThashDelete(xid, hashRoot, &i, sizeof(int), sizeof(recordid))); + assert(!ThashDelete(xid, hashRoot, &i, sizeof(int), sizeof(recordid))); + assert(!ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid))); } @@ -155,13 +157,13 @@ START_TEST(simpleLinearHashTest) if(i % 10) { assert(ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid))); - ThashDelete(xid, hashRoot, &i, sizeof(int)); + ThashDelete(xid, hashRoot, &i, sizeof(int), sizeof(recordid)); assert(rid.page == (i+1)); assert(rid.slot == (i+2)); assert(rid.size == (i+3)); } else { assert(!ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid))); - ThashDelete(xid, hashRoot, &i, sizeof(int)); + ThashDelete(xid, hashRoot, &i, sizeof(int), sizeof(recordid)); } }