Error handling for linked list.

This commit is contained in:
Sears Russell 2005-02-22 23:11:03 +00:00
parent 41fb85eef0
commit 29dacbe7ed
2 changed files with 217 additions and 162 deletions

View file

@ -29,23 +29,23 @@ typedef struct {
recordid listRoot;
} lladd_linkedList_iterator;
int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value);
int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize);
compensated_function int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
compensated_function int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value);
compensated_function int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
compensated_function int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize);
/** The linked list iterator can tolerate the concurrent removal of values that
it has already returned. In the presence of such removals, the iterator
will return the keys and values present in the list as it existed when next()
was first called.
@return a new iterator initialized to the head of the list. */
lladd_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize);
compensated_function lladd_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize);
/** @return 1 if there was another entry to be iterated over. 0 otherwise.
If this function returns 1, the caller must free() the malloced memory
returned via the key and value arguments.*/
int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
recordid TlinkedListCreate(int xid, int keySize, int ValueSize);
void TlinkedListDelete(int xid, recordid list);
compensated_function int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
compensated_function recordid TlinkedListCreate(int xid, int keySize, int ValueSize);
compensated_function void TlinkedListDelete(int xid, recordid list);
Operation getLinkedListInsert();
Operation getLinkedListRemove();
#endif //__LINKED_LIST_NTA_H

View file

@ -43,8 +43,8 @@
static pthread_mutex_t linked_list_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
compensated_function static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
compensated_function static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
typedef struct {
recordid list;
int keySize;
@ -67,12 +67,14 @@ compensated_function static int operateInsert(int xid, Page *p, lsn_t lsn, reco
valueSize = log->valueSize;
key = (byte*)(log+1);
value = ((byte*)(log+1))+keySize;
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
// printf("Operate insert called: rid.page = %d keysize = %d valuesize = %d %d {%d %d %d}\n", rid.page, log->keySize, log->valueSize, *(int*)key, value->page, value->slot, value->size);
// Skip writing the undo! Recovery will write a CLR after we're done, effectively
// wrapping this in a nested top action, so we needn't worry about that either.
__TlinkedListInsert(xid, log->list, key, keySize, value, valueSize);
pthread_mutex_unlock(&linked_list_mutex);
__TlinkedListInsert(xid, log->list, key, keySize, value, valueSize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
return 0;
}
@ -85,30 +87,37 @@ compensated_function static int operateRemove(int xid, Page *p, lsn_t lsn, reco
keySize = log->keySize;
key = (byte*)(log+1);
pthread_mutex_lock(&linked_list_mutex);
// printf("Operate remove called: %d\n", *(int*)key);
// Don't call the version that writes an undo entry!
__TlinkedListRemove(xid, log->list, key, keySize);
pthread_mutex_unlock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
// printf("Operate remove called: %d\n", *(int*)key);
// Don't call the version that writes an undo entry!
__TlinkedListRemove(xid, log->list, key, keySize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
return 0;
}
int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int ret = TlinkedListRemove(xid, list, key, keySize);
compensated_function int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int ret;
try_ret(compensation_error()) {
ret = TlinkedListRemove(xid, list, key, keySize);
} end_ret(compensation_error());
lladd_linkedListInsert_log * undoLog = malloc(sizeof(lladd_linkedListInsert_log) + keySize);
undoLog->list = list;
undoLog->keySize = keySize;
memcpy(undoLog+1, key, keySize);
pthread_mutex_lock(&linked_list_mutex);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
(byte*)undoLog, sizeof(lladd_linkedListInsert_log) + keySize);
free(undoLog);
__TlinkedListInsert(xid, list, key, keySize, value, valueSize);
TendNestedTopAction(xid, handle);
pthread_mutex_unlock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
(byte*)undoLog, sizeof(lladd_linkedListInsert_log) + keySize);
free(undoLog);
__TlinkedListInsert(xid, list, key, keySize, value, valueSize);
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
return ret;
}
@ -131,8 +140,8 @@ Operation getLinkedListRemove() {
};
return o;
}
static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
//int ret = TlinkedListRemove(xid, list, key, keySize);
compensated_function static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
//int ret = Tli nkedListRemove(xid, list, key, keySize);
try {
@ -161,7 +170,7 @@ static void __TlinkedListInsert(int xid, recordid list, const byte * key, int ke
} end;
}
int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
compensated_function int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
lladd_linkedList_entry * entry = malloc(list.size);
@ -205,43 +214,52 @@ int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte
int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
compensated_function int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
byte * value;
int valueSize;
pthread_mutex_lock(&linked_list_mutex);
int ret = TlinkedListFind(xid, list, key, keySize, &value);
int ret;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
ret = TlinkedListFind(xid, list, key, keySize, &value);
} end_action_ret(compensation_error());
if(ret != -1) {
valueSize = ret;
} else {
pthread_mutex_unlock(&linked_list_mutex);
return 0;
}
int entrySize = sizeof(lladd_linkedListRemove_log) + keySize + valueSize;
lladd_linkedListRemove_log * undoLog = malloc(entrySize);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
int entrySize = sizeof(lladd_linkedListRemove_log) + keySize + valueSize;
lladd_linkedListRemove_log * undoLog = malloc(entrySize);
undoLog->list = list;
undoLog->keySize = keySize;
undoLog->valueSize = valueSize;
memcpy(undoLog+1, key, keySize);
memcpy(((byte*)(undoLog+1))+keySize, value, valueSize);
// printf("entry size %d sizeof(remove_log)%d keysize %d valuesize %d sizeof(rid) %d key %d value {%d %d %ld}\n",
// entrySize, sizeof(lladd_linkedListRemove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
(byte*)undoLog, entrySize);
free(value);
free(undoLog);
__TlinkedListRemove(xid, list, key, keySize);
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
undoLog->list = list;
undoLog->keySize = keySize;
undoLog->valueSize = valueSize;
memcpy(undoLog+1, key, keySize);
memcpy(((byte*)(undoLog+1))+keySize, value, valueSize);
// printf("entry size %d sizeof(remove_log)%d keysize %d valuesize %d sizeof(rid) %d key %d value {%d %d %ld}\n",
// entrySize, sizeof(lladd_linkedListRemove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
(byte*)undoLog, entrySize);
free(value);
free(undoLog);
__TlinkedListRemove(xid, list, key, keySize);
TendNestedTopAction(xid, handle);
pthread_mutex_unlock(&linked_list_mutex);
return 1;
}
static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
compensated_function static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
lladd_linkedList_entry * entry = malloc(list.size);
pthread_mutex_lock(&linked_list_mutex);
Tread(xid, list, entry);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
Tread(xid, list, entry);
} end_action_ret(compensation_error());
if(entry->next.size == 0) {
//Empty List.
free(entry);
@ -252,93 +270,114 @@ static int __TlinkedListRemove(int xid, recordid list, const byte * key, int key
recordid lastRead = list;
recordid oldLastRead;
oldLastRead.size = -2;
while(1) {
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
if(listRoot) {
if(entry->next.size == -1) {
memset(entry, 0, list.size);
Tset(xid, lastRead, entry);
} else {
assert(entry->next.size == list.size); // Otherwise, sometihng strange is happening, or the list contains entries with variable sizes.
lladd_linkedList_entry * entry2 = malloc(list.size);
Tread(xid, entry->next, entry2);
Tdealloc(xid, entry->next); // could break iterator, since it writes one entry ahead.
Tset(xid, lastRead, entry2);
free(entry2);
int ret = 0;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
while(1) {
if(compensation_error()) { break; }
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
if(listRoot) {
if(entry->next.size == -1) {
memset(entry, 0, list.size);
Tset(xid, lastRead, entry);
} else {
assert(entry->next.size == list.size); // Otherwise, something strange is happening, or the list contains entries with variable sizes.
lladd_linkedList_entry * entry2 = malloc(list.size);
Tread(xid, entry->next, entry2);
Tdealloc(xid, entry->next); // could break iterator, since it writes one entry ahead.
Tset(xid, lastRead, entry2);
free(entry2);
}
} else {
lladd_linkedList_entry * entry2 = malloc(list.size);
assert(oldLastRead.size != -2);
Tread(xid, oldLastRead, entry2);
memcpy(&(entry2->next), &(entry->next), sizeof(recordid));
Tset(xid, oldLastRead, entry2);
Tdealloc(xid, lastRead);
free (entry2);
} else {
lladd_linkedList_entry * entry2 = malloc(list.size);
assert(oldLastRead.size != -2);
Tread(xid, oldLastRead, entry2);
memcpy(&(entry2->next), &(entry->next), sizeof(recordid));
Tset(xid, oldLastRead, entry2);
Tdealloc(xid, lastRead);
free (entry2);
}
// free(entry);
// pthread_mutex_unlock(&linked_list_mutex);
// return 1;
ret = 1;
break;
} else { // Entry doesn't match the key we're looking for.
if(entry->next.size != -1) {
assert(entry->next.size == list.size); // Don't handle lists with variable length records for now
oldLastRead = lastRead;
lastRead = entry->next;
Tread(xid, entry->next, entry);
listRoot = 0;
} else {
break;
}
}
free(entry);
pthread_mutex_unlock(&linked_list_mutex);
return 1;
} else { // Entry doesn't match the key we're looking for.
if(entry->next.size != -1) {
assert(entry->next.size == list.size); // Don't handle lists with variable length records for now
oldLastRead = lastRead;
lastRead = entry->next;
Tread(xid, entry->next, entry);
listRoot = 0;
} else {
break;
}
}
}
free(entry);
pthread_mutex_unlock(&linked_list_mutex);
return 0;
}
/*** @todo TlinkedListMove could be much faster, but this is good enough for a first pass */
int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize) {
byte * value;
pthread_mutex_lock(&linked_list_mutex);
int valueSize = TlinkedListFind(xid, start_list, key, keySize, &value);
if(valueSize == -1) {
pthread_mutex_unlock(&linked_list_mutex);
return 0;
} else {
TlinkedListRemove(xid, start_list, key, keySize);
TlinkedListInsert(xid, end_list, key, keySize, value, valueSize);
free(value);
pthread_mutex_unlock(&linked_list_mutex);
return 1;
}
}
recordid TlinkedListCreate(int xid, int keySize, int valueSize) {
recordid ret = Talloc(xid, sizeof(lladd_linkedList_entry) + keySize + valueSize);
byte * cleared = calloc(sizeof(lladd_linkedList_entry) + keySize + valueSize, sizeof(byte));
Tset(xid, ret, cleared);
free(cleared);
}
free(entry);
} compensate_ret(compensation_error());
return ret;
}
void TlinkedListDelete(int xid, recordid list) {
lladd_linkedList_entry * entry = malloc(list.size);
Tread(xid, list, entry);
Tdealloc(xid, list);
if(entry->next.size == 0) {
return;
}
while(entry->next.size != -1) {
recordid nextEntry;
Tread(xid, nextEntry, entry);
assert(!memcmp(&nextEntry, &(entry->next), sizeof(recordid)));
Tdealloc(xid, nextEntry);
}
free(entry);
/*** @todo TlinkedListMove could be much faster, but this is good enough for a first pass */
compensated_function int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize) {
byte * value = 0;
int ret;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
int valueSize = TlinkedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
// pthread_mutex_unlock(&linked_list_mutex);
// return 0;
ret = 0;
} else {
TlinkedListRemove(xid, start_list, key, keySize);
TlinkedListInsert(xid, end_list, key, keySize, value, valueSize);
// pthread_mutex_unlock(&linked_list_mutex);
// return 1;
ret = 1;
}
if(value) { free(value); }
} compensate_ret(compensation_error());
return ret;
}
compensated_function recordid TlinkedListCreate(int xid, int keySize, int valueSize) {
recordid ret;
try_ret(NULLRID) {
ret = Talloc(xid, sizeof(lladd_linkedList_entry) + keySize + valueSize);
byte * cleared = calloc(sizeof(lladd_linkedList_entry) + keySize + valueSize, sizeof(byte));
Tset(xid, ret, cleared);
free(cleared);
} end_ret(NULLRID);
return ret;
}
compensated_function void TlinkedListDelete(int xid, recordid list) {
try {
lladd_linkedList_entry * entry = malloc(list.size);
Tread(xid, list, entry);
Tdealloc(xid, list);
if(entry->next.size == 0) {
return;
}
while(entry->next.size != -1) {
if(compensation_error()) { break; }
recordid nextEntry;
Tread(xid, nextEntry, entry);
assert(!memcmp(&nextEntry, &(entry->next), sizeof(recordid)));
Tdealloc(xid, nextEntry);
}
free(entry);
} end;
}
lladd_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize) {
compensated_function lladd_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize) {
lladd_linkedList_iterator * it = malloc(sizeof(lladd_linkedList_iterator));
it->keySize = keySize;
it->valueSize = valueSize;
@ -348,36 +387,51 @@ lladd_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keyS
return it;
}
int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int * keySize, byte **value, int * valueSize) {
compensated_function int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int * keySize, byte **value, int * valueSize) {
if(it->next.size == -1) { free(it); return 0; }
pthread_mutex_lock(&linked_list_mutex);
if(it->first == -1) {
it->first = 1;
} else if(it->first) {
lladd_linkedList_entry * entry = malloc(it->next.size);
Tread(xid, it->listRoot, entry);
int listTouched;
listTouched = memcmp(&(entry->next), &(it->next), sizeof(recordid));
free(entry);
if(listTouched) {
//The root entry was removed. Reset the iterator.
it->first = -1;
it->next = it->listRoot;
int ret = TlinkedListNext(xid, it, key, keySize, value, valueSize);
pthread_mutex_unlock(&linked_list_mutex);
return ret;
} else {
//continue as normal.
it->first = 0;
}
}
assert(it->keySize + it->valueSize + sizeof(lladd_linkedList_entry) == it->next.size);
lladd_linkedList_entry * entry = malloc(it->next.size);
Tread(xid, it->next, entry);
int done = 0;
int ret;
lladd_linkedList_entry * entry;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
if(it->first == -1) {
it->first = 1;
} else if(it->first) {
entry = malloc(it->next.size);
Tread(xid, it->listRoot, entry);
int listTouched;
listTouched = memcmp(&(entry->next), &(it->next), sizeof(recordid));
free(entry);
if(listTouched) {
//The root entry was removed. Reset the iterator.
it->first = -1;
it->next = it->listRoot;
ret = TlinkedListNext(xid, it, key, keySize, value, valueSize);
// pthread_mutex_unlock(&linked_list_mutex);
done = 1;
// return ret;
} else {
//continue as normal.
it->first = 0;
}
}
} end_action_ret(compensation_error());
if(done) {
pthread_mutex_unlock(&linked_list_mutex);
return ret;
}
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
assert(it->keySize + it->valueSize + sizeof(lladd_linkedList_entry) == it->next.size);
entry = malloc(it->next.size);
Tread(xid, it->next, entry);
} end_action_ret(compensation_error());
if(entry->next.size) {
*keySize = it->keySize;
*valueSize = it->valueSize;
@ -385,7 +439,7 @@ int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int *
*value = malloc(*valueSize);
it->next = entry->next;
memcpy(*key, entry+1, *keySize);
memcpy(*value, ((byte*)(entry + 1))+*keySize, *valueSize);
@ -393,10 +447,11 @@ int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int *
} else {
// This entry was empty (this case occurs with empty lists)
free(it);
ret = 0;
}
free(entry);
pthread_mutex_unlock(&linked_list_mutex);
return ret;
}