Preliminary version of logical logging linear hash. (No latching yet, and there are some bugs re-opening a closed hash.)

This commit is contained in:
Sears Russell 2004-10-18 18:24:54 +00:00
parent 04cfc7da8e
commit ec1276f26f
30 changed files with 1256 additions and 186 deletions

View file

@ -1,5 +1,5 @@
EXTRA_DIST = reconf
SUBDIRS = src test utilities
SUBDIRS = src test utilities benchmarks
AM_CFLAGS = -g -Wall -pedantic
docs:

5
benchmarks/Makefile.am Normal file
View file

@ -0,0 +1,5 @@
LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \
$(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a \
$(top_builddir)/src/libdfa/librw.a
bin_PROGRAMS=naiveHash logicalHash readLogicalHash
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

37
benchmarks/logicalHash.c Normal file
View file

@ -0,0 +1,37 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <lladd/transactional.h>
#include <unistd.h>
int main(int argc, char** argv) {
assert(argc == 2);
int count = atoi(argv[1]);
unlink("storefile.txt");
unlink("logfile.txt");
unlink("blob0_file.txt");
unlink("blob1_file.txt");
Tinit();
int xid = Tbegin();
recordid hash = ThashAlloc(xid, sizeof(int), sizeof(int));
int i;
for(i = 0; i < count ; i++) {
TlogicalHashInsert(xid, hash, &i, sizeof(int), &i, sizeof(int));
}
Tcommit(xid);
Tdeinit();
}

37
benchmarks/naiveHash.c Normal file
View file

@ -0,0 +1,37 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <lladd/transactional.h>
#include <unistd.h>
int main(int argc, char** argv) {
assert(argc == 2);
int count = atoi(argv[1]);
unlink("storefile.txt");
unlink("logfile.txt");
unlink("blob0_file.txt");
unlink("blob1_file.txt");
Tinit();
int xid = Tbegin();
recordid hash = ThashAlloc(xid, sizeof(int), sizeof(int));
int i;
for(i = 0; i < count ; i++) {
ThashInsert(xid, hash, &i, sizeof(int), &i, sizeof(int));
}
Tcommit(xid);
Tdeinit();
}

View file

@ -0,0 +1,54 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <lladd/transactional.h>
#include <unistd.h>
int main(int argc, char** argv) {
assert(argc == 2);
int count = atoi(argv[1]);
unlink("storefile.txt");
unlink("logfile.txt");
unlink("blob0_file.txt");
unlink("blob1_file.txt");
Tinit();
int xid = Tbegin();
recordid hash = ThashAlloc(xid, sizeof(int), sizeof(int));
int i;
for(i = 0; i < count ; i++) {
TlogicalHashInsert(xid, hash, &i, sizeof(int), &i, sizeof(int));
}
Tcommit(xid);
xid = Tbegin();
ThashOpen(xid, hash);
int k;
for(k = 0; k < 10; k++) {
for(i = 0; i < count ; i++) {
int j;
assert(ThashLookup(xid, hash, &i, sizeof(int), &j, sizeof(int)));
assert(i == j);
}
}
Tcommit(xid);
Tdeinit();
}

View file

@ -87,5 +87,7 @@ AC_CONFIG_FILES([Makefile
test/lladd/Makefile
test/messages/Makefile
test/monotree/Makefile
utilities/Makefile])
utilities/Makefile
benchmarks/Makefile
])
AC_OUTPUT

View file

@ -100,7 +100,7 @@ extern int errno;
/*#define DEBUGGING */
/*#define PROFILE_LATCHES */
/* #define PROFILE_LATCHES */
#ifdef DEBUGGING
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */

View file

@ -113,6 +113,10 @@ terms specified in this license.
#define OPERATION_ARRAY_LIST_ALLOC 20
#define OPERATION_INITIALIZE_FIXED_PAGE 21
#define OPERATION_UNINITIALIZE_PAGE 22
#define OPERATION_LINEAR_INSERT 23
#define OPERATION_UNDO_LINEAR_INSERT 24
#define OPERATION_LINEAR_DELETE 25
#define OPERATION_UNDO_LINEAR_DELETE 26
/* number above should be less than number below */
#define MAX_OPERATIONS 40

View file

@ -14,4 +14,4 @@
crc = crc32(buffer, length, crc)
*/
unsigned int crc32(void *buffer, unsigned int count, unsigned int crc);
unsigned int crc32(const void *buffer, unsigned int count, unsigned int crc);

View file

@ -1,4 +1,10 @@
#include <lladd/crc32.h>
#ifndef __HASH_H
#define __HASH_H
/** @todo replace() powl in hash with something more efficient, if hash() becomes a bottleneck. */
unsigned int hash(void * val, long val_length, unsigned char tableBits, unsigned long nextExtension);
unsigned int hash(const void * val, long val_length, unsigned char tableBits, unsigned long nextExtension);
#define twoToThe(x) (1 << (x))
#endif /*__HASH_H */

View file

@ -149,6 +149,7 @@ typedef struct {
#include "operations/instantSet.h"
#include "operations/arrayList.h"
#include "operations/linearHash.h"
#include "operations/naiveLinearHash.h"
extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */
/** Performs an operation during normal execution.
@ -179,8 +180,7 @@ void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn);
before redoing it. (if(e->lsn > e->rid.lsn) { doUpdate(e); } return)
Also, this is the only function in operations.h that can take
either CLR or UPDATE log entries. The other functions can only
handle update entries.
either CLR or UPDATE log entries. The other functions can handle update entries.
Does not write to the log.
*/

View file

@ -26,4 +26,5 @@ Operation getUnInitPage();
recordid dereferenceArrayListRid(Page * p, int offset);
int TarrayListExtend(int xid, recordid rid, int slots);
int TarrayListInstantExtend(int xid, recordid rid, int slots);
#endif

View file

@ -13,17 +13,26 @@
recordid ThashAlloc(int xid, int keySize, int valSize) ;
recordid ThashInstantAlloc(int xid, int keySize, int valSize) ;
void ThashInsert(int xid, recordid hashRid,
void * key, int keySize,
void * val, int valSize);
void ThashDelete(int xid, recordid hashRid,
void * key, int keySize);
void ThashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize);
int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize);
void ThashInstantInsert(int xid, recordid hashRid,
const void * key, int keySize,
const void * val, int valSize);
void ThashInstantDelete(int xid, recordid hashRid,
const void * key, int keySize);
void ThashInstantUpdate(int xid, recordid hashRid, const void * key, int keySize, const void * val, int valSize);
void TlogicalHashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize);
void TlogicalHashInsert(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize);
int TlogicalHashDelete(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize);
Operation getLinearInsert();
Operation getLinearDelete();
Operation getUndoLinearInsert();
Operation getUndoLinearDelete();
/*int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize);
void ThashInit();
void ThashDeinit();
int ThashOpen(int xid, recordid hashRid);
int ThashClose(int xid, recordid hashRid) ;
int ThashClose(int xid, recordid hashRid) ; */
#endif

View file

@ -0,0 +1,29 @@
#include <lladd/operations.h>
#ifndef __NAIVE_LINEAR_HASH_H
#define __NAIVE_LINEAR_HASH_H
/**
@file
@ingroup OPERATIONS
$Id$
*/
recordid ThashAlloc(int xid, int keySize, int valSize) ;
void ThashInsert(int xid, recordid hashRid,
void * key, int keySize,
void * val, int valSize);
void ThashDelete(int xid, recordid hashRid,
void * key, int keySize);
void ThashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize);
int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize);
void ThashInit();
void ThashDeinit();
int ThashOpen(int xid, recordid hashRid);
int ThashClose(int xid, recordid hashRid) ;
#endif

View file

@ -10,6 +10,6 @@ liblladd_a_SOURCES=crc32.c common.c stats.c io.c bufferManager.c linkedlist.c op
operations/increment.c operations/prepare.c operations/set.c \
operations/alloc.c operations/noop.c operations/instantSet.c \
page/slotted.c operations/lladdhash.c page/header.c page/fixed.c \
operations/arrayList.c hash.c operations/linearHash.c
operations/arrayList.c hash.c operations/linearHash.c operations/naiveLinearHash.c
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -249,7 +249,7 @@ Page * getPage(int pageid, int locktype) {
/* pblHtRemove(activePages, &(ret->id), sizeof(int)); */
pblHtRemove(activePages, &(oldid), sizeof(int));
/* Put off putting this back into cache until we're done with
/* @todo Put off putting this back into cache until we're done with
it. -- This could cause the cache to empty out if the ratio of
threads to buffer slots is above ~ 1/3, but it decreases the
liklihood of thrashing. */

View file

@ -9,7 +9,7 @@
// unsigned long crc = -1L
// crc = crc32(buffer, length, crc)
unsigned int crc32(void *buffer, unsigned int count, unsigned int crc);
unsigned int crc32(const void *buffer, unsigned int count, unsigned int crc);
static int BuildCRCTable(void);
static unsigned long *CRCTable; // Table constructed for fast lookup.
@ -42,8 +42,8 @@ static int BuildCRCTable(void)
}
return 0;
}
/* changed long to int. - rusty. */
unsigned int crc32(void *buffer, unsigned int count, unsigned int crc)
/* changed long to int, void to const void - rusty. */
unsigned int crc32(const void *buffer, unsigned int count, unsigned int crc)
{
unsigned int temp1, temp2;
static int firsttime = 1;

View file

@ -5,9 +5,7 @@
/** @todo replace powl in hash with something more efficient, if hash() becomes a bottleneck. */
#define twoToThe(x) (1 << (x))
unsigned int hash(void * val, long val_length, unsigned char tableBits, unsigned long nextExtension) {
unsigned int hash(const void * val, long val_length, unsigned char tableBits, unsigned long nextExtension) {
unsigned int oldTableLength = /*powl(2, tableBits - 1); */ twoToThe(tableBits - 1);
unsigned int unmixed = crc32(val, val_length, (unsigned long)-1L);
unsigned int ret = unmixed & (oldTableLength - 1);

View file

@ -118,21 +118,21 @@ void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
if(undo == NO_INVERSE) {
/* Physical undo */
DEBUG("OPERATION Physical undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size);
DEBUG("OPERATION %d Physical undo, %ld {%d %d %ld}\n", undo, e->LSN, e->contents.update.rid.page, e->contents.update.rid.slot, e->contents.update.rid.size);
writeRecord(e->xid, p, clr_lsn, e->contents.update.rid, getUpdatePreImage(e));
} else if(undo == NO_INVERSE_WHOLE_PAGE) {
DEBUG("OPERATION Whole page physical undo, %ld {%d}\n", e->LSN, rid.page);
DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN, e->contents.update.rid.page);
memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE);
pageWriteLSN(p, clr_lsn);
} else {
/* @see doUpdate() */
/* printf("Logical undo"); fflush(NULL); */
DEBUG("OPERATION Logical undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size);
DEBUG("OPERATION %d Logical undo, %ld {%d %d %ld}\n", undo, e->LSN, e->contents.update.rid.page, e->contents.update.rid.slot, e->contents.update.rid.size);
operationsTable[undo].run(e->xid, p, clr_lsn, e->contents.update.rid, getUpdateArgs(e));
}
} else {
DEBUG("OPERATION Skipping undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size);
DEBUG("OPERATION %d Skipping undo, %ld {%d %d %ld}\n", undo, e->LSN, e->contents.update.rid.page, e->contents.update.rid.slot, e->contents.update.rid.size);
}
/* printf("Undo done."); fflush(NULL); */

View file

@ -77,7 +77,7 @@ Operation getAlloc() {
Operation o = {
OPERATION_ALLOC, /* ID */
0,
OPERATION_DEALLOC,
OPERATION_DEALLOC, /* OPERATION_NOOP, */
&operate
};
return o;
@ -88,7 +88,7 @@ Operation getDealloc() {
Operation o = {
OPERATION_DEALLOC,
SIZEOF_RECORD,
OPERATION_REALLOC,
OPERATION_REALLOC, /* OPERATION_NOOP, */
&deoperate
};
return o;

View file

@ -68,17 +68,10 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
int multiplier = tlp->multiplier;
int size = tlp->size;
/* Page * p = loadPage(firstPage); */
fixedPageInitialize(p, sizeof(int), recordsPerPage(sizeof(int)));
/* recordid countRid = fixedRawRalloc(p);
recordid multiplierRid = fixedRawRalloc(p);
recordid slotSizeRid = fixedRawRalloc(p); */
#define MAX_OFFSET_POSITION 3
/* recordid maxOffset = fixedRawRalloc(p); */
#define FIRST_DATA_PAGE_OFFSET 4
/* recordid firstDataPageRid = fixedRawRalloc(p); */
recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid;
countRid.page = multiplierRid.page = slotSizeRid.page = maxOffset.page = firstDataPageRid.page = p->id;
@ -99,7 +92,6 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
int minusOne = -1;
fixedWriteUnlocked(p, maxOffset, (byte*)&minusOne);
/* Write lsn... */
*page_type_ptr(p) = ARRAY_LIST_PAGE;
@ -128,7 +120,6 @@ Operation getArrayListAlloc() {
int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p = loadPage(rid.page);
TarrayListParameters tlp = pageToTLP(p);
releasePage(p);
int lastCurrentBlock;
if(tlp.maxOffset == -1) {
@ -147,14 +138,20 @@ int TarrayListExtend(int xid, recordid rid, int slots) {
recordid tmp2; /* recordid of newly created pages. */
tmp2.slot = 0;
tmp2.size = tlp.size;
/* Iterate over the (small number) of indirection blocks that need to be updated */
for(int i = lastCurrentBlock+1; i <= lastNewBlock; i++) {
/* Alloc block i */
int blockSize = tlp.initialSize * powl(tlp.multiplier, i);
int newFirstPage = TpageAllocMany(xid, blockSize);
DEBUG("block %d\n", i);
/* Iterate over the storage blocks that are pointed to by our current indirection block. */
for(int j = 0; j < blockSize; j++) {
DEBUG("page %d (%d)\n", j, j + newFirstPage);
tmp2.page = j + newFirstPage;
/** @todo If we were a little smarter about this, and fixed.c
could handle uninitialized blocks correctly, then we
wouldn't have to iterate over the datapages in
TarrayListExtend() */
Tupdate(xid, tmp2, NULL, OPERATION_INITIALIZE_FIXED_PAGE);
}
@ -174,6 +171,71 @@ int TarrayListExtend(int xid, recordid rid, int slots) {
*page_type_ptr(p) = FIXED_PAGE;
Tset(xid, tmp, &newMaxOffset);
*page_type_ptr(p) = ARRAY_LIST_PAGE;
releasePage(p);
return 0;
}
/** @todo: TarrayListInstantExtend, is a hacked-up cut and paste version of TarrayListExtend */
int TarrayListInstantExtend(int xid, recordid rid, int slots) {
Page * p = loadPage(rid.page);
TarrayListParameters tlp = pageToTLP(p);
int lastCurrentBlock;
if(tlp.maxOffset == -1) {
lastCurrentBlock = -1;
} else{
lastCurrentBlock = getBlockContainingOffset(tlp, tlp.maxOffset, NULL);
}
int lastNewBlock = getBlockContainingOffset(tlp, tlp.maxOffset+slots, NULL);
DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock);
recordid tmp; /* recordid of slot in base page that holds new block. */
tmp.page = rid.page;
tmp.size = sizeof(int);
recordid tmp2; /* recordid of newly created pages. */
tmp2.slot = 0;
tmp2.size = tlp.size;
/* Iterate over the (small number) of indirection blocks that need to be updated */
for(int i = lastCurrentBlock+1; i <= lastNewBlock; i++) {
/* Alloc block i */
int blockSize = tlp.initialSize * powl(tlp.multiplier, i);
int newFirstPage = TpageAllocMany(xid, blockSize);
DEBUG("block %d\n", i);
/* Iterate over the storage blocks that are pointed to by our current indirection block. */
for(int j = 0; j < blockSize; j++) {
DEBUG("page %d (%d)\n", j, j + newFirstPage);
tmp2.page = j + newFirstPage;
/** @todo If we were a little smarter about this, and fixed.c
coulds handle uninitialized blocks correctly, then we
wouldn't have to iterate over the datapages in
TarrayListExtend() */
Tupdate(xid, tmp2, NULL, OPERATION_INITIALIZE_FIXED_PAGE);
}
tmp.slot = i + FIRST_DATA_PAGE_OFFSET;
/** @todo what does this do to recovery?? */
/** @todo locking for arrayList... */
*page_type_ptr(p) = FIXED_PAGE;
TinstantSet(xid, tmp, &newFirstPage);
*page_type_ptr(p) = ARRAY_LIST_PAGE;
DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage);
}
tmp.slot = MAX_OFFSET_POSITION;
int newMaxOffset = tlp.maxOffset+slots;
/** @todo CORRECTNESS BUG: From recovery's point of view, this is totally wrong! The
only reason we mess with p is beacuse TinstantSet doesn't handle
ARRAY_LIST_PAGES the way we need it to, so this won't be hard to
fix... */
*page_type_ptr(p) = FIXED_PAGE;
TinstantSet(xid, tmp, &newMaxOffset);
*page_type_ptr(p) = ARRAY_LIST_PAGE;
releasePage(p);
return 0;
@ -198,7 +260,7 @@ Operation getInitFixed() {
Operation o = {
OPERATION_INITIALIZE_FIXED_PAGE,
0, /* The necessary parameters are hidden in the rid */
OPERATION_UNINITIALIZE_PAGE,
/*OPERATION_UNINITIALIZE_PAGE,*/ OPERATION_NOOP,
&operateInitFixed
};
return o;
@ -207,7 +269,7 @@ Operation getUnInitPage() {
Operation o = {
OPERATION_UNINITIALIZE_PAGE,
PAGE_SIZE,
NO_INVERSE_WHOLE_PAGE,
NO_INVERSE_WHOLE_PAGE, /* OPERATION_NOOP,*/
&operateUnInitPage
};
return o;
@ -232,16 +294,10 @@ recordid dereferenceArrayListRid(Page * p, int offset) {
int blockPage = dataSlot / rec_per_page; /* The page in the block of interest that contains the slot we want */
int blockSlot = dataSlot - blockPage * rec_per_page;
/* recordid tmp;
tmp.page = tlp.firstPage;
tmp.size = sizeof(int);
tmp.slot = pageRidSlot + FIRST_DATA_PAGE_OFFSET; */
int thePage;
assert(pageRidSlot + FIRST_DATA_PAGE_OFFSET < fixedPageCount(p));
/* fixedReadUnlocked(p, tmp, (byte*)&thePage); *//* reading immutable record.. */
thePage = *(int*)fixed_record_ptr(p, pageRidSlot + FIRST_DATA_PAGE_OFFSET);
thePage = *(int*)fixed_record_ptr(p, pageRidSlot + FIRST_DATA_PAGE_OFFSET); /*reading immutable record; don't need latch.*/
recordid rid;
rid.page = thePage + blockPage;

View file

@ -2,6 +2,8 @@
#include <lladd/hash.h>
#include <limits.h>
#include <assert.h>
#include <stdlib.h>
#include "../page.h"
/**
A from-scratch implementation of linear hashing. Uses the
@ -27,36 +29,152 @@ typedef struct {
recordid next;
} hashEntry;
pblHashTable_t * openHashes = NULL;
void instant_expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize);
extern pblHashTable_t * openHashes ;
/*pblHashTable_t * openHashes = NULL; */
void rehash(int xid, recordid hash, int next_split, int i, int keySize, int valSize);
void update_hash_header(int xid, recordid hash, int i, int next_split);
int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_rid, void * key, int keySize, recordid * deletedEntry);
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_rid, hashEntry * e, int keySize, int valSize, recordid deletedEntry, int skipDelete);
static int operateUndoInsert(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
int keySize = rid.size;
rid.size = sizeof(recordid);
if(!pblHtLookup(openHashes, &rid.page, sizeof(int))) {
ThashOpen(xid, rid);
}
ThashInstantDelete(xid, rid, dat, keySize);
return 0;
}
typedef struct {
int keySize;
int valSize;
} undoDeleteArg;
static int operateUndoDelete(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
const undoDeleteArg * arg = dat;
int keySize = arg->keySize;
int valSize = arg->valSize;
const byte * argBytes = (byte*)(arg+1);
rid.slot = 0;
/* Tread(xid, dereferenceArrayListRid(p, rid.slot), &headerRidA); */
/* Tread(xid, rid, &headerRidA); */
assert(keySize == sizeof(int));
assert(valSize == sizeof(recordid));
ThashInstantInsert(xid, rid, argBytes, keySize,
argBytes + keySize, valSize);
return 0;
}
static int noop (int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { pageWriteLSN(p, lsn); return 0; }
Operation getLinearInsert() {
Operation o = {
OPERATION_LINEAR_INSERT,
SIZEOF_RECORD,
OPERATION_UNDO_LINEAR_INSERT,
&noop
};
return o;
}
Operation getLinearDelete() {
Operation o = {
OPERATION_LINEAR_DELETE,
SIZEOF_RECORD,
OPERATION_UNDO_LINEAR_DELETE,
&noop
};
return o;
}
Operation getUndoLinearInsert() {
Operation o = {
OPERATION_UNDO_LINEAR_INSERT,
SIZEOF_RECORD,
OPERATION_NOOP,
&operateUndoInsert
};
return o;
}
Operation getUndoLinearDelete() {
Operation o = {
OPERATION_UNDO_LINEAR_DELETE,
SIZEOF_RECORD,
OPERATION_NOOP,
&operateUndoDelete
};
return o;
}
void TlogicalHashInsert(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) {
/* Write undo-only log entry. */
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
assert(headerRidB);
hashRid.size = keySize;
Tupdate(xid, hashRid, key, OPERATION_LINEAR_INSERT);
/* Perform redo-only insert. */
hashRid.size = sizeof(recordid);
ThashInstantInsert(xid, hashRid, key, keySize, val, valSize);
instant_expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize);
}
int TlogicalHashDelete(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) {
if(ThashLookup(xid, hashRid, key, keySize, val, valSize)) {
undoDeleteArg * arg = malloc(sizeof(undoDeleteArg) + keySize+valSize);
arg->keySize = keySize;
arg->valSize = valSize;
memcpy(arg+1, key, keySize);
memcpy(((byte*)(arg+1)) + keySize, val, valSize);
hashRid.size = sizeof(undoDeleteArg) + keySize + valSize;
Tupdate(xid, hashRid, arg, OPERATION_LINEAR_DELETE);
free(arg);
hashRid.size = sizeof(recordid);
ThashInstantDelete(xid, hashRid, key, keySize);
return 1;
} else {
return 0;
}
}
void instant_rehash(int xid, recordid hash, int next_split, int i, int keySize, int valSize);
void instant_update_hash_header(int xid, recordid hash, int i, int next_split);
int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_rid, const void * key, int keySize, recordid * deletedEntry);
void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_rid, hashEntry * e, int keySize, int valSize, recordid deletedEntry, int skipDelete);
int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize);
int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize) {
/*int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize) {
hashEntry * e = malloc(sizeof(hashEntry) + keySize + valSize);
recordid bucket = hashRid;
bucket.slot = bucket_number;
recordid nextEntry;
bucket.slot = bucket_number;
Tread(xid, bucket, &nextEntry);
if(nextEntry.size) {
assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize);
}
/* if(nextEntry.size) {
e = malloc(nextEntry.size);
} else {
e = malloc(1);
} */
int found = 0;
while(nextEntry.size > 0) {
Tread(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize)) {
@ -68,95 +186,177 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
}
free(e);
return found;
}
}*/
void expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
TarrayListExtend(xid, hash, 1);
if(next_split >= powl(2,i-1)+2) {
/* printf("\n\n%d %d (i++)\n\n", next_split, i); */
void instant_expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
TarrayListInstantExtend(xid, hash, 1);
if(next_split >= twoToThe(i-1)+2) {
i++;
next_split = 2;
}
/* printf("-%d-", next_split); */
/* printf("rehash(%d, %d + 2)\n", i, next_split - 2); */
rehash(xid, hash, next_split, i, keySize, valSize);
instant_rehash(xid, hash, next_split, i, keySize, valSize);
next_split++;
update_hash_header(xid, hash, i, next_split);
instant_update_hash_header(xid, hash, i, next_split);
}
void update_hash_header(int xid, recordid hash, int i, int next_split) {
void instant_update_hash_header(int xid, recordid hash, int i, int next_split) {
recordid * headerRidB = pblHtLookup(openHashes, &hash.page, sizeof(int));
/* hash.slot = 1; */
/* Tread(xid, hash, headerRidB); */
/* headerHashBits and headerHashSplit are #defined to refer to headerRidB. */
headerHashBits = i;
headerNextSplit = next_split;
Tset(xid, hash, headerRidB);
TinstantSet(xid, hash, headerRidB);
}
void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) {
recordid bucket = hashRid;
bucket.slot = next_split;
/*recordid headerRidA;
Tread(xid, hashRid, &headerRidA); */
/* recordid oldRid;
oldRid.page = 0;
oldRid.slot = 0;
oldRid.size = 0; */
hashEntry * e = calloc(1,sizeof(hashEntry) + keySize + valSize /* headerValSize + headerKeySize */);
void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) {
if(bucket.size) {
Tread(xid, bucket, &bucket);
}
/* Picture of what's going on:
while(bucket.size > 0) {
Tread(xid, bucket, e);
[BucketA]->ba_contents-> ... -> A -> B -> C -> ...
/* printf("#%d", *(int*)(e+1)); */
[BucketB]->bb_contents-> ... -> D -> NULL
We want to move to another bucket, but can't allow the page
file to contain a set of pages where B, C, or D are
inaccesible.
Here is the set of pointers that we want:
[BucketA]->ba_contents-> ... -> A -> C -> ...
[BucketB]->bb_contents-> D -> B
Here is the order in which we must write pages:
D -> B
A -> C
B -> NULL
We do this in a while loop until the buckets are split.
Once this is queued up for the log, we can write the new
metadate for the hash table, and release our locks on A and
B.
On recovery, if the metadata is stale, then we look for the
first entry in BucketB that is also in BucketA and do this:
(Duplicate, B, should be in bucket B)
[BucketA]->ba_contents-> ... -> A -> B -> C
[BucketB]->ba_contents-> ... -> D -> B -> C
A -> C
B -> NULL
Next case (Duplicate, C, should be in bucket A):
[BucketA]->ba_contents-> ... -> A -> C -> ...
[BucketB]->ba_contents-> ... -> D -> B -> C -> ...
B -> NULL
Now that there are no duplicates, we simply re-run split
(whether we found a duplicate, or not).
*/
/** @todo Actually implement recovery for linearHash. */
int firstA = 1; /* Is 'A' the recordid of a bucket? */
int firstD = 1; /* What about 'D'? */
assert(hashRid.size == sizeof(recordid));
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = 0;
recordid ba_contents; Tread(xid, ba, &ba_contents);
recordid bb_contents = NULLRID;
TinstantSet(xid, bb, &bb_contents);/*Tread(xid, bb, &bb_contents);*/
recordid A = ba; /*ba_contents; */
recordid D = bb; /*bb_contents; */
recordid B = ba_contents;
recordid C;
hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
while(B.size) {
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
C = B_contents->next;
int old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2;
int new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2;
int old_hash = hash(e+1, keySize, i-1, ULONG_MAX) + 2;
assert(next_split == old_hash);
assert(new_hash == old_hash || new_hash == old_hash + twoToThe(i-1));
int new_hash = hash(e+1, keySize, i, ULONG_MAX) + 2;
bucket = e->next;
assert((!bucket.size )|| bucket.size == sizeof(hashEntry) + keySize + valSize /*headerValSize + headerKeySize */);
if(new_hash != next_split) {
assert(new_hash == next_split + powl(2, i-1));
recordid oldEntry;
/** @todo could be optimized. Why deleteFromBucket, then
insertIntoBucket? Causes us to travers the bucket list an
extra time... */
recordid next_split_contents, new_hash_contents;
recordid tmp = hashRid;
tmp.slot = next_split;
Tread(xid, tmp, &next_split_contents);
tmp.slot = new_hash;
Tread(xid, tmp, &new_hash_contents);
assert(deleteFromBucket(xid, hashRid, next_split, next_split_contents, e+1, keySize,/* valSize, headerKeySize,*/ &oldEntry));
insertIntoBucket(xid, hashRid, new_hash, new_hash_contents, e, keySize, valSize, /*headerKeySize, headerValSize, */oldEntry, 1);
if(new_hash == old_hash) {
A = B;
B = C;
C.size = -1;
firstA = 0;
} else {
if(firstD) {
/* D is a bucket entry */
assert(B.size == sizeof(hashEntry) + keySize + valSize);
assert(D.size == sizeof(recordid));
TinstantSet(xid, D, &B);
} else {
/* D is the tail of our list. */
assert(D.size == sizeof(hashEntry) + keySize + valSize);
assert(B.size == 0 || B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, D, D_contents);
D_contents->next = B;
TinstantSet(xid, D, D_contents);
}
if(firstA) {
assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize);
assert(A.size == sizeof(recordid));
TinstantSet(xid, A, &C);
} else {
/* A is somewhere in the first list. */
assert(A.size == sizeof(hashEntry) + keySize + valSize);
assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, A, A_contents);
A_contents->next = C;
TinstantSet(xid, A, A_contents);
}
free(e);
/* B _can't_ be a bucket.*/
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
B_contents->next = NULLRID;
TinstantSet(xid, B, B_contents);
/* Update Loop State */
D = B;
B = C;
C.size = -1;
firstD = 0;
}
}
free(D_contents);
free(A_contents);
free(B_contents);
}
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) {
/** @todo fine grained locking for insertIntoBucket and the other operations in this file. */
void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) {
assert(newEntry.size == (sizeof(hashEntry) + keySize + valSize));
recordid deleteMe;
if(!skipDelete) {
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) {
Tdealloc(xid, deleteMe);
if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) {
/* Tdealloc(xid, deleteMe); */
hashRid.slot = bucket_number;
Tread(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
@ -164,22 +364,24 @@ void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid buc
}
/*@todo consider recovery for insertIntoBucket. */
/* recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize); */
recordid bucket = hashRid;
bucket.slot = bucket_number;
/* Tread(xid, bucket, &(e->next)); */
assert(bucket_contents.size == 0 || bucket_contents.size == sizeof(hashEntry) + keySize + valSize);
assert(newEntry.size == sizeof(hashEntry) + keySize + valSize);
e->next = bucket_contents;
Tset(xid, newEntry, e);
Tset(xid, bucket, &newEntry);
TinstantSet(xid, newEntry, e);
TinstantSet(xid, bucket, &newEntry);
}
int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_contents, void * key, int keySize, recordid * deletedEntry) {
int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_contents,
const void * key, int keySize, recordid * deletedEntry) {
hashEntry * e;
recordid bucket = hash;
bucket.slot = bucket_number;
recordid nextEntry;
nextEntry = bucket_contents;
/* Tread(xid, bucket, &nextEntry); */
if(nextEntry.size) {
e = calloc(1,nextEntry.size);
} else {
@ -192,17 +394,16 @@ int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_
Tread(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize)) {
if(first) {
assert(e->next.size < 1000);
Tset(xid, bucket, &(e->next));
assert(e->next.size < 40);
TinstantSet(xid, bucket, &(e->next));
} else {
recordid next = e->next;
Tread(xid, lastEntry, e);
assert(next.size < 1000);
assert(next.size < 40);
e->next = next;
Tset(xid, lastEntry, e);
TinstantSet(xid, lastEntry, e);
}
*deletedEntry = nextEntry;
/* Tdealloc(xid, nextEntry); */
found = 1;
break;
}
@ -210,12 +411,15 @@ int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_
first = 0;
nextEntry = e->next;
}
free(e);
return found;
}
/* Same as normal implementation, so commented it out.
/ *
recordid ThashAlloc(int xid, int keySize, int valSize) {
/* Want 16 buckets + 2 header rids, doubling on overflow. */
recordid rid = TarrayListAlloc(xid, 16 + 2, 2, sizeof(recordid));
/ * Want 16 buckets, doubling on overflow. * /
recordid rid = TarrayListAlloc(xid, 16, 2, sizeof(recordid));
TarrayListExtend(xid, rid, 32+2);
recordid headerRidA;
@ -237,7 +441,8 @@ recordid ThashAlloc(int xid, int keySize, int valSize) {
rid.slot =0;
return rid;
}
*/
/*
void ThashInit() {
openHashes = pblHtCreate();
}
@ -245,21 +450,14 @@ void ThashInit() {
void ThashDeinit() {
pblHtDelete(openHashes);
}
*/
void ThashInstantInsert(int xid, recordid hashRid,
const void * key, int keySize,
const void * val, int valSize) {
void ThashInsert(int xid, recordid hashRid,
void * key, int keySize,
void * val, int valSize) {
/* recordid headerRidA; */
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
/* recordid tmp = hashRid; */
/* tmp.slot = 0;
Tread(xid, tmp, &headerRidA);
assert(headerKeySize == keySize);
tmp.slot = 1; */
/* Tread(xid, tmp, &headerRidB); */
/* assert(headerValSize == valSize); */
assert(headerRidB);
int bucket = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
@ -267,38 +465,38 @@ void ThashInsert(int xid, recordid hashRid,
memcpy(e+1, key, keySize);
memcpy(((byte*)(e+1)) + keySize, val, valSize);
/** @todo instantAlloc / instantDealloc */
recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize);
/* printf("%d -> %d\n", *(int*)(e+1), bucket); */
recordid bucket_contents;
hashRid.slot = bucket;
Tread(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, newEntry, 0);
expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize);
instant_insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, newEntry, 0);
free(e);
}
/** @todo hash hable probably should track the number of items in it,
so that expand can be selectively called. */
void ThashDelete(int xid, recordid hashRid,
void * key, int keySize) {
void ThashInstantDelete(int xid, recordid hashRid,
const void * key, int keySize) {
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
recordid tmp = hashRid;
tmp.slot = 1;
/* Tread(xid, tmp, headerRidB); */
int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
recordid deleteMe;
hashRid.slot = bucket_number;
recordid bucket_contents;
Tread(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, &deleteMe)) {
Tdealloc(xid, deleteMe);
if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, &deleteMe)) {
/* Tdealloc(xid, deleteMe); */
}
}
int ThashOpen(int xid, recordid hashRid) {
/*int ThashOpen(int xid, recordid hashRid) {
recordid * headerRidB = malloc(sizeof(recordid));
hashRid.slot = 1;
Tread(xid, hashRid, headerRidB);
@ -306,14 +504,16 @@ int ThashOpen(int xid, recordid hashRid) {
pblHtInsert(openHashes, &hashRid.page, sizeof(int), headerRidB);
return 0;
}
}*/
void ThashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) {
ThashDelete(xid, hashRid, key, keySize);
ThashInsert(xid, hashRid, key, keySize, val, valSize);
void TlogicalHashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) {
void * dummy = malloc(valSize);
TlogicalHashDelete(xid, hashRid, key, keySize, dummy, valSize);
free(dummy);
TlogicalHashInsert(xid, hashRid, key, keySize, val, valSize);
}
/*
int ThashClose(int xid, recordid hashRid) {
recordid * freeMe = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
@ -323,12 +523,13 @@ int ThashClose(int xid, recordid hashRid) {
}
int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize) {
/* recordid headerRidB; */
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
recordid tmp = hashRid;
tmp.slot = 1;
int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
/* printf("look in %d\n", bucket_number); */
int ret = findInBucket(xid, hashRid, bucket_number, key, keySize, buf, valSize);
return ret;
}
*/

View file

@ -0,0 +1,389 @@
#include <lladd/operations/linearHash.h>
#include <lladd/hash.h>
#include <limits.h>
#include <assert.h>
/**
A from-scratch implementation of linear hashing. Uses the
arrayList operations to implement its hashbuckets.
*/
#define BUCKETS_OFFSET (2)
#define headerKeySize (headerRidA.page)
#define headerValSize (headerRidA.slot)
#define headerHashBits (headerRidB->page)
#define headerNextSplit (headerRidB->slot)
#include <math.h>
#include <malloc.h>
#include <string.h>
#include <lladd/operations/linearHash.h>
#include <pbl/pbl.h>
typedef struct {
recordid next;
} hashEntry;
pblHashTable_t * openHashes = NULL;
void rehash(int xid, recordid hash, int next_split, int i, int keySize, int valSize);
void update_hash_header(int xid, recordid hash, int i, int next_split);
int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_rid, void * key, int keySize, recordid * deletedEntry);
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_rid, hashEntry * e, int keySize, int valSize, recordid deletedEntry, int skipDelete);
int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize);
int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize) {
hashEntry * e = malloc(sizeof(hashEntry) + keySize + valSize);
recordid nextEntry;
hashRid.slot = bucket_number;
Tread(xid, hashRid, &nextEntry);
int found = 0;
while(nextEntry.size > 0) {
assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize)) {
memcpy(val, ((byte*)(e+1))+keySize, valSize);
found = 1;
break;
}
nextEntry = e->next;
}
free(e);
return found;
}
void expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
TarrayListExtend(xid, hash, 1);
if(next_split >= twoToThe(i-1)+2) {
i++;
next_split = 2;
}
rehash(xid, hash, next_split, i, keySize, valSize);
next_split++;
update_hash_header(xid, hash, i, next_split);
}
void update_hash_header(int xid, recordid hash, int i, int next_split) {
recordid * headerRidB = pblHtLookup(openHashes, &hash.page, sizeof(int));
headerHashBits = i;
headerNextSplit = next_split;
Tset(xid, hash, headerRidB);
}
/*void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) {
recordid bucket = hashRid;
bucket.slot = next_split;
hashEntry * e = calloc(1,sizeof(hashEntry) + keySize + valSize);
if(bucket.size) {
Tread(xid, bucket, &bucket);
}
while(bucket.size > 0) {
Tread(xid, bucket, e);
int old_hash = hash(e+1, keySize, i-1, ULONG_MAX) + 2;
int new_hash = hash(e+1, keySize, i, ULONG_MAX) + 2;
bucket = e->next;
assert(next_split == old_hash);
assert((!bucket.size )|| bucket.size == sizeof(hashEntry) + keySize + valSize);
if(new_hash != next_split) {
assert(new_hash == next_split + twoToThe(i-1));
recordid oldEntry;
// @todo could be optimized. Why deleteFromBucket, then
// insertIntoBucket? Causes us to traverse the bucket list an
// extra time...
recordid next_split_contents, new_hash_contents;
recordid tmp = hashRid;
tmp.slot = next_split;
Tread(xid, tmp, &next_split_contents);
tmp.slot = new_hash;
Tread(xid, tmp, &new_hash_contents);
assert(deleteFromBucket(xid, hashRid, next_split, next_split_contents, e+1, keySize, &oldEntry));
insertIntoBucket(xid, hashRid, new_hash, new_hash_contents, e, keySize, valSize, oldEntry, 1);
} else {
}
}
free(e);
}
*/
void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) {
int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
assert(hashRid.size == sizeof(recordid));
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = 0;
recordid ba_contents; Tread(xid, ba, &ba_contents);
recordid bb_contents = NULLRID;
Tset(xid, bb, &bb_contents);//Tread(xid, bb, &bb_contents);
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid B = ba_contents;
recordid C;
hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
while(B.size) {
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
C = B_contents->next;
int old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2;
int new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2;
assert(next_split == old_hash);
assert(new_hash == old_hash || new_hash == old_hash + twoToThe(i-1));
if(new_hash == old_hash) {
A = B;
B = C;
C.size = -1;
firstA = 0;
} else {
if(firstD) {
// D is a bucket entry
assert(B.size == sizeof(hashEntry) + keySize + valSize);
assert(D.size == sizeof(recordid));
Tset(xid, D, &B);
} else {
// D is the tail of our list.
assert(D.size == sizeof(hashEntry) + keySize + valSize);
assert(B.size == 0 || B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, D, D_contents);
D_contents->next = B;
Tset(xid, D, D_contents);
}
if(firstA) {
assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize);
assert(A.size == sizeof(recordid));
Tset(xid, A, &C);
} else {
// A is somewhere in the first list.
assert(A.size == sizeof(hashEntry) + keySize + valSize);
assert(C.size == 0 || C.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, A, A_contents);
A_contents->next = C;
Tset(xid, A, A_contents);
}
// B _can't_ be a bucket.
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
B_contents->next = NULLRID;
Tset(xid, B, B_contents);
// Update Loop State
D = B;
B = C;
C.size = -1;
firstD = 0;
}
}
free(D_contents);
free(A_contents);
free(B_contents);
}
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, recordid bucket_contents, hashEntry * e, int keySize, int valSize, recordid newEntry, int skipDelete) {
recordid deleteMe;
if(!skipDelete) {
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, &deleteMe)) {
Tdealloc(xid, deleteMe);
hashRid.slot = bucket_number;
Tread(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
}
}
/*@todo consider recovery for insertIntoBucket. */
recordid bucket = hashRid;
bucket.slot = bucket_number;
e->next = bucket_contents;
Tset(xid, newEntry, e);
Tset(xid, bucket, &newEntry);
}
int deleteFromBucket(int xid, recordid hash, int bucket_number, recordid bucket_contents, void * key, int keySize, recordid * deletedEntry) {
hashEntry * e;
recordid bucket = hash;
bucket.slot = bucket_number;
recordid nextEntry;
nextEntry = bucket_contents;
if(nextEntry.size) {
e = calloc(1,nextEntry.size);
} else {
e = calloc(1,1);
}
int first = 1;
int found = 0;
recordid lastEntry;
while(nextEntry.size > 0) {
Tread(xid, nextEntry, e);
if(!memcmp(key, e+1, keySize)) {
if(first) {
assert(e->next.size < 1000);
Tset(xid, bucket, &(e->next));
} else {
recordid next = e->next;
Tread(xid, lastEntry, e);
assert(next.size < 1000);
e->next = next;
Tset(xid, lastEntry, e);
}
*deletedEntry = nextEntry;
found = 1;
break;
}
lastEntry = nextEntry;
first = 0;
nextEntry = e->next;
}
return found;
}
recordid ThashAlloc(int xid, int keySize, int valSize) {
/* Want 16 buckets, doubling on overflow. */
recordid rid = TarrayListAlloc(xid, 16, 2, sizeof(recordid));
TarrayListExtend(xid, rid, 32+2);
recordid headerRidA;
recordid * headerRidB = malloc (sizeof(recordid));
headerKeySize = keySize;
headerValSize = valSize;
headerNextSplit = INT_MAX;
headerHashBits = 4;
rid.slot =0;
Tset(xid, rid, &headerRidA);
rid.slot =1;
Tset(xid, rid, headerRidB);
pblHtInsert(openHashes, &rid.page, sizeof(int), headerRidB);
rid.slot =0;
return rid;
}
void ThashInit() {
openHashes = pblHtCreate();
}
void ThashDeinit() {
pblHtDelete(openHashes);
}
void ThashInsert(int xid, recordid hashRid,
void * key, int keySize,
void * val, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
int bucket = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
hashEntry * e = calloc(1,sizeof(hashEntry) + keySize + valSize);
memcpy(e+1, key, keySize);
memcpy(((byte*)(e+1)) + keySize, val, valSize);
recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize);
recordid bucket_contents;
hashRid.slot = bucket;
Tread(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, newEntry, 0);
expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize);
free(e);
}
/** @todo hash hable probably should track the number of items in it,
so that expand can be selectively called. */
void ThashDelete(int xid, recordid hashRid,
void * key, int keySize) {
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
recordid tmp = hashRid;
tmp.slot = 1;
int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
recordid deleteMe;
hashRid.slot = bucket_number;
recordid bucket_contents;
Tread(xid, hashRid, &bucket_contents);
hashRid.slot = 0;
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, &deleteMe)) {
Tdealloc(xid, deleteMe);
}
}
int ThashOpen(int xid, recordid hashRid) {
recordid * headerRidB = malloc(sizeof(recordid));
hashRid.slot = 1;
Tread(xid, hashRid, headerRidB);
pblHtInsert(openHashes, &hashRid.page, sizeof(int), headerRidB);
return 0;
}
void ThashUpdate(int xid, recordid hashRid, void * key, int keySize, void * val, int valSize) {
ThashDelete(xid, hashRid, key, keySize);
ThashInsert(xid, hashRid, key, keySize, val, valSize);
}
int ThashClose(int xid, recordid hashRid) {
recordid * freeMe = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
pblHtRemove(openHashes, &hashRid.page, sizeof(int));
free(freeMe);
return 0;
}
int ThashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
recordid tmp = hashRid;
tmp.slot = 1;
int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
int ret = findInBucket(xid, hashRid, bucket_number, key, keySize, buf, valSize);
return ret;
}

View file

@ -241,8 +241,8 @@ int TpageAlloc(int xid /*, int type */) {
#ifdef REUSE_PAGES
if(freelist) {
printf("Re-using old page: %d\n", freelist);
fflush(NULL);
DEBUG("Re-using old page: %d\n", freelist);
newpage = freelist;
Page * p = loadPage(newpage); /* Could obtain write lock here,
@ -343,7 +343,7 @@ Operation getUpdateFreespace() {
Operation o = {
OPERATION_UPDATE_FREESPACE,
sizeof(update_tuple),
OPERATION_UPDATE_FREESPACE_INVERSE,
/* OPERATION_UPDATE_FREESPACE_INVERSE, */ OPERATION_NOOP,
&__update_freespace
};
return o;
@ -364,7 +364,7 @@ Operation getUpdateFreelist() {
Operation o = {
OPERATION_UPDATE_FREELIST,
sizeof(update_tuple),
OPERATION_UPDATE_FREELIST_INVERSE,
OPERATION_NOOP,
&__update_freelist
};
return o;

View file

@ -209,8 +209,8 @@ static void Undo(int recovery) {
switch(e->type) {
case UPDATELOG:
{
/* Need write lock for undo.. */
Page * p = getPage(e->contents.update.rid.page, RW);
/* Need write lock for undo.. (Why??) */
Page * p = getPage(e->contents.update.rid.page, RO);
this_lsn= pageReadLSN(p); /* e->contents.update.rid.page); */

View file

@ -60,6 +60,11 @@ void setupOperationsTable() {
operationsTable[OPERATION_INITIALIZE_FIXED_PAGE] = getInitFixed();
operationsTable[OPERATION_UNINITIALIZE_PAGE] = getUnInitPage();
operationsTable[OPERATION_LINEAR_INSERT] = getLinearInsert();
operationsTable[OPERATION_UNDO_LINEAR_INSERT] = getUndoLinearInsert();
operationsTable[OPERATION_LINEAR_DELETE] = getLinearDelete();
operationsTable[OPERATION_UNDO_LINEAR_DELETE] = getUndoLinearDelete();
}
@ -133,7 +138,12 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
releasePage(p);
rid = dereferenceRID(rid);
p = loadPage(rid.page);
} else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) {
/** @todo Kludge! Shouldn't special case operations in transactional2. */
} else if(*page_type_ptr(p) == ARRAY_LIST_PAGE &&
op != OPERATION_LINEAR_INSERT &&
op != OPERATION_UNDO_LINEAR_INSERT &&
op != OPERATION_LINEAR_DELETE &&
op != OPERATION_UNDO_LINEAR_DELETE ) {
rid = dereferenceArrayListRid(p, rid.slot);
releasePage(p);
p = loadPage(rid.page);
@ -155,7 +165,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
void Tread(int xid, recordid rid, void * dat) {
Page * p = loadPage(rid.page);
int page_type = *page_type_ptr(p);
if(page_type == SLOTTED_PAGE) {
if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE ) {
} else if(page_type == INDIRECT_PAGE) {
releasePage(p);

View file

@ -24,6 +24,9 @@
please see: http://mission.base.com/.
$Log$
Revision 1.4 2004/10/18 18:24:51 sears
Preliminary version of logical logging linear hash. (No latching yet, and there are some bugs re-opening a closed hash.)
Revision 1.3 2004/10/17 02:17:00 sears
Optimized the 'naive' linear hash table, and moved to sourceforge. :)
@ -71,7 +74,8 @@ static int rcsid_fct() { return( rcsid ? 0 : rcsid_fct() ); }
/*****************************************************************************/
/* #defines */
/*****************************************************************************/
#define PBL_HASHTABLE_SIZE 1019
/*#define PBL_HASHTABLE_SIZE 1019*/
#define PBL_HASHTABLE_SIZE 2017
/*#define PBL_HASHTABLE_SIZE 100003 */
/*****************************************************************************/

View file

@ -1,7 +1,7 @@
INCLUDES = @CHECK_CFLAGS@
if HAVE_CHECK
## Had to disable check_lht because lht needs to be rewritten.
TESTS = check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_lladdhash check_pageOperations check_linearHash
TESTS = check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_lladdhash check_pageOperations check_linearHash check_logicalLinearHash
else
TESTS =
endif

View file

@ -56,11 +56,15 @@ terms specified in this license.
/** @test
executes each of the insert / remove / lookup operations a few times.
*/
#define NUM_ENTRIES 100000
/* #define NUM_ENTRIES 2000*/
//#define NUM_ENTRIES 100000
#define NUM_ENTRIES 2001
/* #define NUM_ENTRIES 1000 */
/*#define NUM_ENTRIES 100 */
/**
@test Runs some simple tests on the hash() function. Not comprehensive enough.
@todo the checkHashFcn test is broken.
*/
START_TEST(checkHashFcn) {
int i;
srandom(12312313);
@ -77,6 +81,10 @@ START_TEST(checkHashFcn) {
}
} END_TEST
/**
@test Insert some stuff into a linear hash, delete some stuff, and
make sure that abort() and commit() work.
*/
START_TEST(simpleLinearHashTest)
{
Tinit();
@ -93,11 +101,8 @@ START_TEST(simpleLinearHashTest)
/* assert(isNullRecord(lHtInsert(xid, hash, &i, sizeof(int), rid)));
assert(!isNullRecord(lHtInsert(xid, hash, &i, sizeof(int), rid))); */
if(i == 4090) {
printf("here");
}
ThashInsert(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid));
/* printf("%d\n", i); */
assert(ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
assert(rid.page == i+1);

View file

@ -0,0 +1,223 @@
/*---
This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
and that this notice is included verbatim in any distributions. No
written agreement, license, or royalty fee is required for any of the
authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are
acquiring the software on behalf of the Department of Defense, the
software shall be classified as "Commercial Computer Software" and the
Government shall have only "Restricted Rights" as defined in Clause
252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the
authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include <config.h>
#include <check.h>
#include "../check_includes.h"
#include <lladd/transactional.h>
#include <assert.h>
#include <lladd/hash.h>
#include <limits.h>
#include <math.h>
#define LOG_NAME "check_linearHash.log"
/** @test
executes each of the insert / remove / lookup operations a few times.
*/
//#define NUM_ENTRIES 100000
#define NUM_ENTRIES 2001
/* #define NUM_ENTRIES 1000 */
/*#define NUM_ENTRIES 100 */
/**
@test Runs some simple tests on the hash() function. Not comprehensive enough.
@todo the checkHashFcn test is broken.
*/
START_TEST(checkHashFcn) {
int i;
srandom(12312313);
for(i = 0; i < 100000;i++) {
int j = (int) (100000.0*random()/(RAND_MAX+1.0)); /* int for CRC. */
int k = (int) 2+(30.0*random()/(RAND_MAX+1.0)); /* number of bits in result. */
unsigned long first = hash(&j, sizeof(int), k, ULONG_MAX);
int boundary = first + 10;
unsigned long second = hash(&j, sizeof(int), k, boundary);
assert(first == second);
unsigned long third = hash(&j, sizeof(int), k+1, ULONG_MAX);
assert((first == third) || (powl(2,k)+ first == third));
}
} END_TEST
/**
@test Insert some stuff into a linear hash, delete some stuff, and
make sure that abort() and commit() work.
*/
START_TEST(simpleLinearHashTest)
{
Tinit();
int xid = Tbegin();
recordid hashRoot = ThashAlloc(xid, sizeof(int), sizeof(recordid));
for(int i = 0; i < NUM_ENTRIES; i++) {
recordid rid;
rid.page=i+1;
rid.slot=i+2;
rid.size=i+3;
/* assert(isNullRecord(lHtInsert(xid, hash, &i, sizeof(int), rid)));
assert(!isNullRecord(lHtInsert(xid, hash, &i, sizeof(int), rid))); */
TlogicalHashInsert(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid));
assert(ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
assert(rid.page == i+1);
assert(rid.slot == i+2);
assert(rid.size == i+3);
if(! (i % 1000)) {
printf("%d\n", i);
/* flush(NULL); */
}
}
printf("Done inserting.\n");
/* fflush(NULL); */
for(int i = 0; i < NUM_ENTRIES; i+=10) {
/*recordid rid = lHtRemove(xid, hash, &i, sizeof(int)); */
recordid rid;
assert(ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
assert(rid.page == (i+1));
assert(rid.slot == (i+2));
assert(rid.size == (i+3));
TlogicalHashDelete(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid));
}
printf("Done deleting mod 10.\n");
/* fflush(NULL); */
for(int i = 0; i < NUM_ENTRIES; i++) {
recordid rid;
if(i % 10) {
assert(ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
assert(rid.page == (i+1));
assert(rid.slot == (i+2));
assert(rid.size == (i+3));
} else {
assert(!ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
}
}
printf("Done checking mod 10.\n");
Tcommit(xid);
xid = Tbegin();
recordid rid;
for(int i = 0; i < NUM_ENTRIES; i++) {
if(i % 10) {
assert(ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
TlogicalHashDelete(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid));
assert(rid.page == (i+1));
assert(rid.slot == (i+2));
assert(rid.size == (i+3));
} else {
assert(!ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
TlogicalHashDelete(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid));
}
}
printf("Done deleting rest.\n");
/* fflush(NULL);*/
for(int i = 0; i < NUM_ENTRIES; i++) {
assert(!ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
}
printf("Aborting..\n");
/* fflush(NULL); */
Tabort(xid);
printf("done aborting..\n");
/* fflush(NULL); */
xid = Tbegin();
for(int i = 0; i < NUM_ENTRIES; i++) {
if(i % 10) {
assert( ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
assert(rid.page == (i+1));
assert(rid.slot == (i+2));
assert(rid.size == (i+3));
} else {
assert(!ThashLookup(xid, hashRoot, &i, sizeof(int), &rid, sizeof(recordid)));
}
}
printf("done checking..\n");
/* fflush(NULL); */
Tcommit(xid);
Tdeinit();
}
END_TEST
Suite * check_suite(void) {
Suite *s = suite_create("linearHash");
/* Begin a new test */
TCase *tc = tcase_create("simple");
/* Sub tests are added, one per line, here */
/* tcase_add_test(tc, checkHashFcn); */
tcase_add_test(tc, simpleLinearHashTest);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);
return s;
}
#include "../check_setup.h"