2005-01-29 01:17:37 +00:00
|
|
|
#define __USE_GNU
|
|
|
|
#define _GNU_SOURCE
|
|
|
|
#include <pthread.h>
|
2005-01-14 01:52:53 +00:00
|
|
|
#include <lladd/transactional.h>
|
|
|
|
#include <lladd/hash.h>
|
2005-01-29 01:17:37 +00:00
|
|
|
#include "../page.h"
|
|
|
|
#include "../page/slotted.h"
|
2005-01-14 01:52:53 +00:00
|
|
|
#include <stdlib.h>
|
|
|
|
#include <assert.h>
|
2005-01-15 01:45:27 +00:00
|
|
|
#include <string.h>
|
2005-03-01 07:32:02 +00:00
|
|
|
#include <lladd/operations/noop.h>
|
2005-03-20 05:17:25 +00:00
|
|
|
#include <lladd/fifo.h>
|
|
|
|
#include <lladd/multiplexer.h>
|
|
|
|
#include "../logger/logMemory.h"
|
2005-01-15 01:45:27 +00:00
|
|
|
/**
|
|
|
|
re-entrant implementation of a linear hash hable, using nensted top actions.
|
|
|
|
|
|
|
|
@file
|
|
|
|
|
|
|
|
@todo Improve concurrency of linearHashNTA and linkedListNTA.
|
|
|
|
*/
|
2005-01-14 01:52:53 +00:00
|
|
|
|
2005-03-02 05:47:38 +00:00
|
|
|
static pthread_mutex_t linear_hash_mutex;// = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
|
|
|
|
|
|
|
|
|
2005-01-14 01:52:53 +00:00
|
|
|
typedef struct {
|
|
|
|
recordid buckets;
|
|
|
|
int keySize;
|
|
|
|
int valueSize;
|
|
|
|
long nextSplit;
|
|
|
|
int bits;
|
|
|
|
long numEntries;
|
|
|
|
} lladd_hash_header;
|
|
|
|
|
|
|
|
|
2005-03-02 05:47:38 +00:00
|
|
|
void LinearHashNTAInit() {
|
|
|
|
// only need this function since PTHREAD_RECURSIVE_MUTEX_INITIALIZER is really broken...
|
|
|
|
pthread_mutexattr_t attr;
|
|
|
|
pthread_mutexattr_init(&attr);
|
|
|
|
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
|
|
|
pthread_mutex_init(&linear_hash_mutex, &attr);
|
|
|
|
}
|
|
|
|
|
2005-01-14 01:52:53 +00:00
|
|
|
|
|
|
|
/* private methods... */
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh);
|
2005-01-14 01:52:53 +00:00
|
|
|
|
|
|
|
|
|
|
|
#define HASH_INIT_ARRAY_LIST_COUNT twoToThe(HASH_INIT_BITS)
|
|
|
|
#define HASH_INIT_ARRAY_LIST_MULT 2
|
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function recordid ThashCreate(int xid, int keySize, int valueSize) {
|
|
|
|
recordid hashHeader;
|
2005-01-14 01:52:53 +00:00
|
|
|
lladd_hash_header lhh;
|
2005-02-22 03:10:54 +00:00
|
|
|
|
|
|
|
try_ret(NULLRID) {
|
|
|
|
hashHeader = Talloc(xid, sizeof(lladd_hash_header));
|
|
|
|
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
|
2006-04-04 22:34:35 +00:00
|
|
|
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(recordid));
|
2005-02-22 03:10:54 +00:00
|
|
|
} else {
|
|
|
|
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(lladd_linkedList_entry) + keySize + valueSize);
|
|
|
|
}
|
|
|
|
} end_ret(NULLRID);
|
|
|
|
try_ret(NULLRID) {
|
|
|
|
TarrayListExtend(xid, lhh.buckets, HASH_INIT_ARRAY_LIST_COUNT);
|
|
|
|
} end_ret(NULLRID);
|
2005-01-14 01:52:53 +00:00
|
|
|
int i;
|
|
|
|
recordid bucket = lhh.buckets;
|
2005-01-28 21:28:23 +00:00
|
|
|
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
|
|
|
|
for(i = 0; i < HASH_INIT_ARRAY_LIST_COUNT; i++) {
|
2005-02-22 03:10:54 +00:00
|
|
|
try_ret(NULLRID) {
|
|
|
|
recordid rid = TpagedListAlloc(xid);
|
|
|
|
bucket.slot = i;
|
|
|
|
Tset(xid, bucket, &rid);
|
2005-03-01 07:32:02 +00:00
|
|
|
// printf("paged list alloced at rid {%d %d %d}\n", rid.page, rid.slot, rid.size);
|
2005-02-22 03:10:54 +00:00
|
|
|
} end_ret(NULLRID);
|
2005-03-01 07:32:02 +00:00
|
|
|
|
2005-01-28 21:28:23 +00:00
|
|
|
}
|
2005-03-01 07:32:02 +00:00
|
|
|
|
2005-01-28 21:28:23 +00:00
|
|
|
} else {
|
|
|
|
byte * entry = calloc(1, lhh.buckets.size);
|
|
|
|
for(i = 0; i < HASH_INIT_ARRAY_LIST_COUNT; i++) {
|
|
|
|
bucket.slot = i;
|
2005-02-22 03:10:54 +00:00
|
|
|
begin_action_ret(free, entry, NULLRID) {
|
|
|
|
Tset(xid, bucket, entry);
|
|
|
|
} end_action_ret(NULLRID);
|
2005-01-28 21:28:23 +00:00
|
|
|
}
|
|
|
|
free (entry);
|
2005-01-14 01:52:53 +00:00
|
|
|
}
|
|
|
|
lhh.keySize = keySize;
|
|
|
|
lhh.valueSize = valueSize;
|
|
|
|
lhh.nextSplit = 0;
|
|
|
|
lhh.bits = HASH_INIT_BITS;
|
|
|
|
lhh.numEntries = 0;
|
2005-02-22 03:10:54 +00:00
|
|
|
try_ret(NULLRID) {
|
|
|
|
Tset(xid, hashHeader, &lhh);
|
|
|
|
} end_ret(NULLRID);
|
2005-01-14 01:52:53 +00:00
|
|
|
return hashHeader;
|
|
|
|
}
|
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function void ThashDelete(int xid, recordid hash) {
|
2005-01-14 01:52:53 +00:00
|
|
|
abort();
|
|
|
|
}
|
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize);
|
|
|
|
compensated_function static int __ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize);
|
2005-01-15 01:45:27 +00:00
|
|
|
|
2005-03-14 00:25:29 +00:00
|
|
|
/*typedef struct {
|
2005-01-15 01:45:27 +00:00
|
|
|
recordid hashHeader;
|
|
|
|
int keySize;
|
|
|
|
} linearHash_insert_arg;
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
recordid hashHeader;
|
|
|
|
int keySize;
|
|
|
|
int valueSize;
|
2005-03-14 00:25:29 +00:00
|
|
|
} linearHash_remove_arg;*/
|
2005-01-15 01:45:27 +00:00
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function static int operateInsert(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
|
2005-01-15 01:45:27 +00:00
|
|
|
const linearHash_remove_arg * args = dat;
|
|
|
|
recordid hashHeader = args->hashHeader;
|
|
|
|
int keySize = args->keySize;
|
|
|
|
int valueSize = args->valueSize;
|
|
|
|
|
2005-03-01 07:32:02 +00:00
|
|
|
assert(valueSize >= 0);
|
|
|
|
|
2005-01-15 01:45:27 +00:00
|
|
|
byte * key = (byte*)(args+1);
|
|
|
|
byte * value = ((byte*)(args+1))+ keySize;
|
2005-02-22 03:10:54 +00:00
|
|
|
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
|
|
|
|
pthread_mutex_lock(&linear_hash_mutex);
|
|
|
|
__ThashInsert(xid, hashHeader, key, keySize, value, valueSize);
|
|
|
|
} compensate_ret(compensation_error());
|
2005-01-15 01:45:27 +00:00
|
|
|
return 0;
|
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function static int operateRemove(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
|
2005-01-15 01:45:27 +00:00
|
|
|
const linearHash_insert_arg * args = dat;
|
|
|
|
recordid hashHeader = args->hashHeader;
|
|
|
|
int keySize = args->keySize;
|
|
|
|
|
|
|
|
byte * key = (byte*)(args + 1);
|
2005-02-22 03:10:54 +00:00
|
|
|
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
|
|
|
|
pthread_mutex_lock(&linear_hash_mutex);
|
|
|
|
__ThashRemove(xid, hashHeader, key, keySize);
|
|
|
|
} compensate_ret(compensation_error());
|
2005-01-15 01:45:27 +00:00
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
Operation getLinearHashInsert() {
|
|
|
|
Operation o = {
|
2005-03-01 07:32:02 +00:00
|
|
|
// OPERATION_LINEAR_HASH_INSERT,
|
|
|
|
OPERATION_NOOP,
|
2005-01-15 01:45:27 +00:00
|
|
|
SIZEIS_PAGEID,
|
|
|
|
OPERATION_LINEAR_HASH_REMOVE,
|
|
|
|
&operateInsert
|
2005-03-01 07:32:02 +00:00
|
|
|
// &noop
|
2005-01-15 01:45:27 +00:00
|
|
|
};
|
|
|
|
return o;
|
|
|
|
}
|
|
|
|
Operation getLinearHashRemove() {
|
|
|
|
Operation o = {
|
2005-03-01 07:32:02 +00:00
|
|
|
// OPERATION_LINEAR_HASH_REMOVE,
|
|
|
|
OPERATION_NOOP,
|
2005-01-15 01:45:27 +00:00
|
|
|
SIZEIS_PAGEID,
|
|
|
|
OPERATION_LINEAR_HASH_INSERT,
|
|
|
|
&operateRemove
|
2005-03-01 07:32:02 +00:00
|
|
|
//&noop
|
2005-01-15 01:45:27 +00:00
|
|
|
};
|
|
|
|
return o;
|
|
|
|
}
|
2005-01-14 01:52:53 +00:00
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function int ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize) {
|
2005-01-15 01:45:27 +00:00
|
|
|
pthread_mutex_lock(&linear_hash_mutex);
|
|
|
|
int argSize = sizeof(linearHash_insert_arg)+keySize;
|
|
|
|
linearHash_insert_arg * arg = malloc(argSize);
|
|
|
|
arg->hashHeader = hashHeader;
|
|
|
|
arg->keySize = keySize;
|
|
|
|
memcpy(arg+1, key, keySize);
|
2005-03-01 07:32:02 +00:00
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
int ret;
|
|
|
|
|
|
|
|
/** @todo MEMORY LEAK arg, handle on pthread_cancel.. */
|
|
|
|
void * handle;
|
|
|
|
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
|
|
|
|
handle = TbeginNestedTopAction(xid, OPERATION_LINEAR_HASH_INSERT, (byte*)arg, argSize);
|
|
|
|
free(arg);
|
|
|
|
ret = __ThashInsert(xid, hashHeader, key, keySize, value, valueSize);
|
|
|
|
} end_action_ret(compensation_error());
|
2005-03-14 00:25:29 +00:00
|
|
|
|
|
|
|
TendNestedTopAction(xid, handle);
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&linear_hash_mutex);
|
|
|
|
|
2005-01-15 01:45:27 +00:00
|
|
|
return ret;
|
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keySize, const byte* value, int valueSize) {
|
2005-01-14 01:52:53 +00:00
|
|
|
lladd_hash_header lhh;
|
2005-02-22 03:10:54 +00:00
|
|
|
try_ret(compensation_error()) {
|
|
|
|
Tread(xid, hashHeader, &lhh);
|
|
|
|
} end_ret(compensation_error());
|
2005-01-14 01:52:53 +00:00
|
|
|
lhh.numEntries ++;
|
2005-02-22 03:10:54 +00:00
|
|
|
try_ret(compensation_error()) {
|
|
|
|
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
|
|
|
|
if(lhh.numEntries > (int)((double)(lhh.nextSplit + twoToThe(lhh.bits-1)) * (HASH_FILL_FACTOR))) {
|
|
|
|
ThashSplitBucket(xid, hashHeader, &lhh);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if(lhh.numEntries > (int)((double)(lhh.nextSplit + twoToThe(lhh.bits-1)) * HASH_FILL_FACTOR)) {
|
|
|
|
ThashSplitBucket(xid, hashHeader, &lhh);
|
|
|
|
}
|
2005-01-28 21:28:23 +00:00
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
} end_ret(compensation_error());
|
|
|
|
|
2005-01-14 01:52:53 +00:00
|
|
|
recordid bucket = lhh.buckets;
|
|
|
|
bucket.slot = hash(key, keySize, lhh.bits, lhh.nextSplit);
|
2005-02-22 03:10:54 +00:00
|
|
|
|
2005-01-28 21:28:23 +00:00
|
|
|
int ret;
|
2005-02-22 03:10:54 +00:00
|
|
|
try_ret(compensation_error()) {
|
2005-01-28 21:28:23 +00:00
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
|
|
|
|
|
|
|
|
recordid bucketList;
|
|
|
|
|
|
|
|
Tread(xid, bucket, &bucketList);
|
|
|
|
|
|
|
|
// int before = TpagedListSpansPages(xid, bucketList);
|
2005-03-10 20:10:49 +00:00
|
|
|
ret = TpagedListRemove(xid, bucketList, key, keySize);
|
|
|
|
TpagedListInsert(xid, bucketList, key, keySize, value, valueSize);
|
2005-02-22 03:10:54 +00:00
|
|
|
|
|
|
|
// int after = TpagedListSpansPages(xid, bucketList);
|
|
|
|
// if(before != after) { // Page overflowed...
|
|
|
|
// T hashSplitBucket(xid, hashHeader, &lhh);
|
|
|
|
// T hashSplitBucket(xid, hashHeader, &lhh);
|
|
|
|
// }
|
2005-01-28 21:28:23 +00:00
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
} else {
|
|
|
|
assert(lhh.keySize == keySize); assert(lhh.valueSize == valueSize);
|
2005-03-10 20:10:49 +00:00
|
|
|
ret = TlinkedListRemove(xid, bucket, key, keySize);
|
|
|
|
TlinkedListInsert(xid, bucket, key, keySize, value, valueSize);
|
2005-02-22 03:10:54 +00:00
|
|
|
}
|
|
|
|
if(ret) { lhh.numEntries--; }
|
|
|
|
Tset(xid, hashHeader, &lhh);
|
|
|
|
|
|
|
|
} end_ret(compensation_error());
|
2005-01-15 01:45:27 +00:00
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function int ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize) {
|
2005-01-14 01:52:53 +00:00
|
|
|
|
2005-01-15 01:45:27 +00:00
|
|
|
byte * value;
|
2005-02-22 03:10:54 +00:00
|
|
|
int valueSize;
|
|
|
|
int ret;
|
|
|
|
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
|
|
|
|
pthread_mutex_lock(&linear_hash_mutex);
|
|
|
|
valueSize = ThashLookup(xid, hashHeader, key, keySize, &value);
|
|
|
|
} end_action_ret(compensation_error());
|
|
|
|
|
2005-01-15 01:45:27 +00:00
|
|
|
if(valueSize == -1) {
|
|
|
|
pthread_mutex_unlock(&linear_hash_mutex);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
|
|
|
|
|
|
|
|
int argSize = sizeof(linearHash_remove_arg) + keySize + valueSize;
|
|
|
|
linearHash_remove_arg * arg = malloc(argSize);
|
|
|
|
arg->hashHeader = hashHeader;
|
|
|
|
arg->keySize = keySize;
|
|
|
|
arg->valueSize = valueSize;
|
|
|
|
memcpy(arg+1, key, keySize);
|
|
|
|
memcpy((byte*)(arg+1)+keySize, value, valueSize);
|
|
|
|
void * handle;
|
|
|
|
|
|
|
|
handle = TbeginNestedTopAction(xid, OPERATION_LINEAR_HASH_REMOVE, (byte*)arg, argSize);
|
|
|
|
free(arg);
|
|
|
|
free(value);
|
|
|
|
|
|
|
|
ret = __ThashRemove(xid, hashHeader, key, keySize);
|
|
|
|
TendNestedTopAction(xid, handle);
|
|
|
|
|
|
|
|
} compensate_ret(compensation_error());
|
|
|
|
|
2005-01-14 01:52:53 +00:00
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function static int __ThashRemove(int xid, recordid hashHeader, const byte * key, int keySize) {
|
2005-01-28 21:28:23 +00:00
|
|
|
int ret;
|
2005-02-22 03:10:54 +00:00
|
|
|
|
|
|
|
try_ret(compensation_error()) {
|
|
|
|
lladd_hash_header lhh;
|
|
|
|
Tread(xid, hashHeader, &lhh);
|
|
|
|
lhh.numEntries--;
|
|
|
|
Tset(xid, hashHeader, &lhh);
|
|
|
|
|
|
|
|
recordid bucket = lhh.buckets;
|
|
|
|
bucket.slot = hash(key, keySize, lhh.bits, lhh.nextSplit);
|
|
|
|
|
|
|
|
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
|
|
|
|
recordid bucketList;
|
|
|
|
Tread(xid, bucket, &bucketList);
|
|
|
|
ret = TpagedListRemove(xid, bucketList, key, keySize);
|
|
|
|
} else {
|
|
|
|
if(lhh.keySize != keySize) { compensation_set_error(LLADD_INTERNAL_ERROR); }
|
|
|
|
assert(lhh.keySize == keySize);
|
|
|
|
ret = TlinkedListRemove(xid, bucket, key, keySize);
|
|
|
|
}
|
|
|
|
} end_ret(compensation_error());
|
|
|
|
|
2005-01-14 01:52:53 +00:00
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function int ThashLookup(int xid, recordid hashHeader, const byte * key, int keySize, byte ** value) {
|
2005-01-14 01:52:53 +00:00
|
|
|
lladd_hash_header lhh;
|
2005-02-22 03:10:54 +00:00
|
|
|
int ret;
|
|
|
|
|
|
|
|
// This whole thing is safe since the callee's do not modify global state...
|
2005-01-14 01:52:53 +00:00
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
|
|
|
|
pthread_mutex_lock(&linear_hash_mutex);
|
|
|
|
Tread(xid, hashHeader, &lhh);
|
2005-01-14 01:52:53 +00:00
|
|
|
|
2005-02-22 03:10:54 +00:00
|
|
|
recordid bucket = lhh.buckets;
|
|
|
|
bucket.slot = hash(key, keySize, lhh.bits, lhh.nextSplit);
|
|
|
|
|
|
|
|
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
|
|
|
|
recordid bucketList;
|
|
|
|
Tread(xid, bucket, &bucketList);
|
|
|
|
ret = TpagedListFind(xid, bucketList, key, keySize, value);
|
|
|
|
} else {
|
|
|
|
assert(lhh.keySize == keySize);
|
|
|
|
ret = TlinkedListFind(xid, bucket, key, keySize, value);
|
|
|
|
}
|
|
|
|
} compensate_ret(compensation_error());
|
2005-01-14 01:52:53 +00:00
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
compensated_function static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * lhh) {
|
2005-01-29 01:17:37 +00:00
|
|
|
// if(1) { return; }
|
2005-02-22 03:10:54 +00:00
|
|
|
|
|
|
|
try {
|
|
|
|
long old_bucket = lhh->nextSplit;
|
|
|
|
long new_bucket = old_bucket + twoToThe(lhh->bits-1);
|
|
|
|
recordid old_bucket_rid = lhh->buckets;
|
|
|
|
recordid new_bucket_rid = lhh->buckets;
|
|
|
|
old_bucket_rid.slot = old_bucket;
|
|
|
|
new_bucket_rid.slot = new_bucket;
|
|
|
|
// void * handle = TbeginNestedTopAction(xid, OPERATION_NOOP, NULL, 0);
|
|
|
|
TarrayListExtend(xid, lhh->buckets, 1);
|
|
|
|
recordid new_bucket_list; // will be uninitialized if we have fixed length entries.
|
|
|
|
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
|
|
|
|
new_bucket_list = TpagedListAlloc(xid);
|
|
|
|
Tset(xid, new_bucket_rid, &new_bucket_list);
|
|
|
|
} else {
|
|
|
|
byte * entry = calloc(1, lhh->buckets.size);
|
|
|
|
Tset(xid, new_bucket_rid, entry);
|
|
|
|
free(entry);
|
2005-01-28 21:28:23 +00:00
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
if(lhh->nextSplit < twoToThe(lhh->bits-1)-1) {
|
|
|
|
lhh->nextSplit++;
|
|
|
|
} else {
|
|
|
|
lhh->nextSplit = 0;
|
|
|
|
lhh->bits++;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** @todo linearHashNTA's split bucket should use the 'move' function call. */
|
|
|
|
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
|
|
|
|
recordid old_bucket_list;
|
|
|
|
Tread(xid, old_bucket_rid, &old_bucket_list);
|
|
|
|
|
|
|
|
lladd_pagedList_iterator * pit = TpagedListIterator(xid, old_bucket_list);
|
|
|
|
|
|
|
|
byte *key, *value;
|
|
|
|
int keySize, valueSize;
|
|
|
|
while(TpagedListNext(xid, pit, &key, &keySize, &value, &valueSize)) {
|
|
|
|
if(hash(key, keySize, lhh->bits, lhh->nextSplit) != old_bucket) {
|
|
|
|
TpagedListRemove(xid, old_bucket_list, key, keySize);
|
|
|
|
TpagedListInsert(xid, new_bucket_list, key, keySize, value, valueSize);
|
|
|
|
}
|
|
|
|
free(key);
|
|
|
|
free(value);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
lladd_linkedList_iterator * it = TlinkedListIterator(xid, old_bucket_rid, lhh->keySize, lhh->valueSize);
|
|
|
|
byte * key, *value;
|
|
|
|
int keySize, valueSize;
|
|
|
|
while(TlinkedListNext(xid, it, &key, &keySize, &value, &valueSize)) {
|
|
|
|
assert(valueSize == lhh->valueSize);
|
|
|
|
assert(keySize == lhh->keySize);
|
|
|
|
if(hash(key, keySize, lhh->bits, lhh->nextSplit) != old_bucket) {
|
|
|
|
TlinkedListRemove(xid, old_bucket_rid, key, keySize);
|
|
|
|
TlinkedListInsert(xid, new_bucket_rid, key, keySize, value, valueSize);
|
|
|
|
}
|
|
|
|
free(key);
|
|
|
|
free(value);
|
2005-01-28 21:28:23 +00:00
|
|
|
}
|
2005-01-14 01:52:53 +00:00
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
} end;
|
|
|
|
|
|
|
|
// TendNestedTopAction(xid, handle);
|
2005-01-14 01:52:53 +00:00
|
|
|
return;
|
|
|
|
}
|
2005-01-15 01:45:27 +00:00
|
|
|
lladd_hash_iterator * ThashIterator(int xid, recordid hashHeader, int keySize, int valueSize) {
|
|
|
|
lladd_hash_iterator * it = malloc(sizeof(lladd_hash_iterator));
|
2005-02-22 03:10:54 +00:00
|
|
|
begin_action_ret(free, it, NULL) {
|
|
|
|
it->hashHeader = hashHeader;
|
|
|
|
lladd_hash_header lhh;
|
|
|
|
Tread(xid, hashHeader, &lhh);
|
|
|
|
it->bucket = lhh.buckets;
|
|
|
|
it->numBuckets = lhh.nextSplit +twoToThe(lhh.bits-1);
|
|
|
|
it->bucket.slot = 0;
|
2005-03-10 03:19:04 +00:00
|
|
|
/* it->keySize = keySize;
|
2005-02-22 03:10:54 +00:00
|
|
|
it->valueSize = valueSize;
|
2005-03-01 07:32:02 +00:00
|
|
|
assert(keySize == lhh.keySize);
|
2005-03-10 03:19:04 +00:00
|
|
|
assert(valueSize == lhh.valueSize); */
|
|
|
|
keySize = lhh.keySize;
|
|
|
|
it->keySize = lhh.keySize;
|
|
|
|
valueSize = lhh.valueSize;
|
|
|
|
it->valueSize = lhh.valueSize;
|
2005-02-22 03:10:54 +00:00
|
|
|
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
|
|
|
|
it->it = NULL;
|
2005-03-01 07:32:02 +00:00
|
|
|
recordid bucketList;
|
2006-04-04 22:34:35 +00:00
|
|
|
assert(it->bucket.size == sizeof(bucketList));
|
2005-03-01 07:32:02 +00:00
|
|
|
Tread(xid, it->bucket, &bucketList);
|
|
|
|
it->pit= TpagedListIterator(xid, bucketList);
|
2005-02-22 03:10:54 +00:00
|
|
|
} else {
|
|
|
|
it->pit = NULL;
|
|
|
|
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
|
|
|
|
}
|
|
|
|
} end_action_ret(NULL);
|
2005-01-15 01:45:27 +00:00
|
|
|
return it;
|
|
|
|
}
|
|
|
|
|
|
|
|
int ThashNext(int xid, lladd_hash_iterator * it, byte ** key, int * keySize, byte** value, int * valueSize) {
|
2005-02-22 03:10:54 +00:00
|
|
|
try_ret(0) {
|
|
|
|
if(it->it) {
|
|
|
|
assert(!it->pit);
|
|
|
|
while(!TlinkedListNext(xid, it->it, key, keySize, value, valueSize)) {
|
|
|
|
if(compensation_error()) { return 0; }
|
|
|
|
it->bucket.slot++;
|
|
|
|
if(it->bucket.slot < it->numBuckets) {
|
|
|
|
it->it = TlinkedListIterator(xid, it->bucket, it->keySize, it->valueSize);
|
|
|
|
} else {
|
|
|
|
free(it);
|
|
|
|
return 0;
|
|
|
|
}
|
2005-01-28 21:28:23 +00:00
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
} else {
|
|
|
|
assert(it->pit);
|
|
|
|
while(!TpagedListNext(xid, it->pit, key, keySize, value, valueSize)) {
|
|
|
|
if(compensation_error()) { return 0; }
|
|
|
|
it->bucket.slot++;
|
|
|
|
if(it->bucket.slot < it->numBuckets) {
|
|
|
|
recordid bucketList;
|
|
|
|
Tread(xid, it->bucket, &bucketList);
|
|
|
|
it->pit = TpagedListIterator(xid, bucketList);
|
|
|
|
} else {
|
|
|
|
free(it);
|
|
|
|
return 0;
|
|
|
|
}
|
2005-01-28 21:28:23 +00:00
|
|
|
}
|
2005-01-15 01:45:27 +00:00
|
|
|
}
|
2005-02-22 03:10:54 +00:00
|
|
|
} end_ret(0);
|
2005-01-15 01:45:27 +00:00
|
|
|
return 1;
|
|
|
|
}
|
2005-02-16 04:11:14 +00:00
|
|
|
|
|
|
|
void ThashDone(int xid, lladd_hash_iterator * it) {
|
|
|
|
if(it->it) {
|
|
|
|
free(it->it);
|
|
|
|
}
|
|
|
|
if(it->pit) {
|
|
|
|
free(it->pit);
|
|
|
|
}
|
|
|
|
free(it);
|
|
|
|
}
|
2005-03-10 03:19:04 +00:00
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
lladd_hash_iterator* hit;
|
|
|
|
byte * lastKey;
|
|
|
|
int lastKeySize;
|
|
|
|
byte * lastValue;
|
|
|
|
int lastValueSize;
|
|
|
|
} lladd_linearHashNTA_generic_it;
|
|
|
|
|
|
|
|
lladdIterator_t * ThashGenericIterator(int xid, recordid hash) {
|
|
|
|
lladdIterator_t * ret = malloc(sizeof(lladdIterator_t));
|
|
|
|
ret->type = LINEAR_HASH_NTA_ITERATOR;
|
|
|
|
ret->impl = malloc(sizeof(lladd_linearHashNTA_generic_it));
|
|
|
|
|
|
|
|
((lladd_linearHashNTA_generic_it*)(ret->impl))->hit = ThashIterator(xid, hash, -1, -1);
|
|
|
|
((lladd_linearHashNTA_generic_it*)(ret->impl))->lastKey = NULL;
|
|
|
|
((lladd_linearHashNTA_generic_it*)(ret->impl))->lastValue = NULL;
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
void linearHashNTAIterator_close(int xid, void * impl) {
|
|
|
|
lladd_linearHashNTA_generic_it * it = impl;
|
|
|
|
|
|
|
|
if(it->lastKey || it->lastValue) {
|
|
|
|
ThashDone(xid, it->hit); // otherwise, ThashNext returned zero, and freed it for us...
|
|
|
|
}
|
|
|
|
if(it->lastKey) {
|
|
|
|
free(it->lastKey);
|
|
|
|
}
|
|
|
|
if(it->lastValue) {
|
|
|
|
free(it->lastValue);
|
|
|
|
}
|
|
|
|
free(it);
|
|
|
|
}
|
|
|
|
|
|
|
|
int linearHashNTAIterator_next (int xid, void * impl) {
|
|
|
|
lladd_linearHashNTA_generic_it * it = impl;
|
|
|
|
|
|
|
|
if(it->lastKey) {
|
|
|
|
free(it->lastKey);
|
|
|
|
it->lastKey = NULL;
|
|
|
|
}
|
|
|
|
if(it->lastValue) {
|
|
|
|
free(it->lastValue);
|
|
|
|
it->lastValue = NULL;
|
|
|
|
}
|
|
|
|
return ThashNext(xid, it->hit, &(it->lastKey), &it->lastKeySize, &it->lastValue, &it->lastValueSize);
|
|
|
|
}
|
|
|
|
|
|
|
|
int linearHashNTAIterator_key(int xid, void * impl, byte ** key) {
|
|
|
|
lladd_linearHashNTA_generic_it * it = impl;
|
|
|
|
|
|
|
|
*key = it->lastKey;
|
|
|
|
|
|
|
|
return (it->lastKey == NULL) ? 0 : it->lastKeySize;
|
|
|
|
}
|
|
|
|
|
|
|
|
int linearHashNTAIterator_value(int xid, void * impl, byte ** value) {
|
|
|
|
lladd_linearHashNTA_generic_it * it = impl;
|
|
|
|
|
|
|
|
*value = it->lastValue;
|
|
|
|
|
|
|
|
return (it->lastValue == NULL) ? 0 : it->lastValueSize;
|
|
|
|
}
|
2005-03-20 05:17:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
//--------------------------------- async hash operations happen below here
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
// recordid hash;
|
|
|
|
int value_len;
|
|
|
|
int key_len;
|
|
|
|
} asyncHashInsert_t;
|
|
|
|
|
|
|
|
void ThashInsertAsync(int xid, lladdConsumer_t * cons, recordid hash, byte * value, int value_len, byte * key, int key_len) {
|
|
|
|
|
|
|
|
Tconsumer_push(xid, cons, key, key_len, value, value_len);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
void ThashInsertConsume(int xid, recordid hash, lladdIterator_t * it) {
|
|
|
|
|
|
|
|
while(Titerator_next(xid, it)) {
|
|
|
|
|
|
|
|
byte * key;
|
|
|
|
byte * value;
|
|
|
|
|
|
|
|
int key_len = Titerator_key(xid, it, &key);
|
|
|
|
int value_len = Titerator_value(xid, it, &value);
|
|
|
|
|
|
|
|
ThashInsert(xid, hash, key, key_len, value, value_len);
|
|
|
|
|
|
|
|
Titerator_tupleDone(xid, it);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
lladdIterator_t * it;
|
|
|
|
recordid hash;
|
|
|
|
int xid;
|
|
|
|
} hashAsyncWorker_arg;
|
|
|
|
|
|
|
|
void * ThashAsyncWorker(void * argp) {
|
|
|
|
hashAsyncWorker_arg * arg = (hashAsyncWorker_arg*)argp;
|
|
|
|
|
|
|
|
|
|
|
|
// lladdIterator_t * it = (lladdIterator_t *) arg;
|
|
|
|
// recordid hash;
|
|
|
|
while(Titerator_next(arg->xid, arg->it)) {
|
|
|
|
lladdFifo_t * fifo;
|
|
|
|
int fifo_size = Titerator_value(arg->xid, arg->it, (byte**)&fifo);
|
|
|
|
assert(fifo_size == sizeof(lladdFifo_t));
|
|
|
|
|
|
|
|
ThashInsertConsume(arg->xid, arg->hash, fifo->iterator);
|
|
|
|
|
|
|
|
Titerator_tupleDone(arg->xid, arg->it);
|
|
|
|
}
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*lladdMultiplexer_t **/
|
|
|
|
lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
|
|
|
|
int mainFifoLen, int numFifos,
|
|
|
|
int subFifoLen, int dirtyFifoLen,
|
|
|
|
lladdIterator_t ** dirtyIterator) {
|
|
|
|
|
|
|
|
lladdFifo_t * mainFifo = logMemoryFifo(mainFifoLen, 0);
|
|
|
|
lladdFifo_t * dirtyFifos = logMemoryFifo(dirtyFifoLen, 0);
|
|
|
|
lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(numFifos, subFifoLen, NULL, dirtyFifos);
|
|
|
|
lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, mainFifo->iterator, &multiplexHashLogByKey, fifoPool);
|
|
|
|
|
|
|
|
|
|
|
|
pthread_attr_t attr;
|
|
|
|
pthread_attr_init(&attr);
|
|
|
|
pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN);
|
|
|
|
|
|
|
|
lladdMultiplexer_start(mux, &attr);
|
|
|
|
|
|
|
|
int i = 0;
|
|
|
|
|
|
|
|
|
|
|
|
for(i = 0; i < numWorkerThreads; i++) {
|
|
|
|
pthread_t thread;
|
|
|
|
pthread_create(&thread, &attr, ThashAsyncWorker, mux->fifoPool->dirtyPoolFifo->iterator);
|
|
|
|
pthread_detach(thread);
|
|
|
|
}
|
|
|
|
|
|
|
|
if(dirtyIterator) {
|
|
|
|
*dirtyIterator = mux->fifoPool->dirtyPoolFifo->iterator;
|
|
|
|
}
|
|
|
|
|
|
|
|
return mainFifo->consumer;
|
|
|
|
}
|