The linked list implementation now uses nested top actions. (Also, many bugfixes regarding nested top actions were introduced.)

This commit is contained in:
Sears Russell 2005-01-14 10:08:10 +00:00
parent 360f0d15e2
commit 7a990886d2
13 changed files with 205 additions and 44 deletions

View file

@ -78,8 +78,8 @@ terms specified in this license.
/* #define MAX_BUFFER_SIZE 100003 */ /* #define MAX_BUFFER_SIZE 100003 */
/*#define MAX_BUFFER_SIZE 20029 */ /*#define MAX_BUFFER_SIZE 20029 */
/*#define MAX_BUFFER_SIZE 10007 */ /*#define MAX_BUFFER_SIZE 10007 */
#define MAX_BUFFER_SIZE 5003 /*#define MAX_BUFFER_SIZE 5003*/
/*#define MAX_BUFFER_SIZE 2003 */ #define MAX_BUFFER_SIZE 2003
/* #define MAX_BUFFER_SIZE 71 */ /* #define MAX_BUFFER_SIZE 71 */
/*#define MAX_BUFFER_SIZE 7 */ /*#define MAX_BUFFER_SIZE 7 */
/*#define BUFFER_ASOOCIATIVE 2 */ /*#define BUFFER_ASOOCIATIVE 2 */
@ -120,6 +120,8 @@ terms specified in this license.
#define OPERATION_UNDO_LINEAR_DELETE 26 #define OPERATION_UNDO_LINEAR_DELETE 26
#define OPERATION_SET_RANGE 27 #define OPERATION_SET_RANGE 27
#define OPERATION_SET_RANGE_INVERSE 28 #define OPERATION_SET_RANGE_INVERSE 28
#define OPERATION_LINKED_LIST_INSERT 29
#define OPERATION_LINKED_LIST_REMOVE 30
/* number above should be less than number below */ /* number above should be less than number below */
#define MAX_OPERATIONS 40 #define MAX_OPERATIONS 40

View file

@ -76,6 +76,17 @@ typedef int (*Function)(int xid, Page * p, lsn_t lsn, recordid r, const void *d)
argument passed into the operation. argument passed into the operation.
*/ */
#define SIZEOF_RECORD -1 #define SIZEOF_RECORD -1
/**
Logical log entries (such as those used by nested top actions
have a null recordid, as they are not assoicated with a specific page
If a log entry is not associated with a specific page, the page id can
be overloaded to hold the size of the associated log entry. Contrast
this with the description of SIZEOF_RECORD, which is used when the
operation uses a variable length argument, but is associated with
a specfic page.
*/
#define SIZEIS_PAGEID -2
/** If the Operation struct's undo field is set to this value, then /** If the Operation struct's undo field is set to this value, then
physical logging is used in lieu of logical logging. physical logging is used in lieu of logical logging.
*/ */

View file

@ -32,5 +32,6 @@ lladd_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keyS
int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize); int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
recordid TlinkedListCreate(int xid, int keySize, int ValueSize); recordid TlinkedListCreate(int xid, int keySize, int ValueSize);
void TlinkedListDelete(int xid, recordid list); void TlinkedListDelete(int xid, recordid list);
Operation getLinkedListInsert();
Operation getLinkedListRemove();
#endif //__LINKED_LIST_NTA_H #endif //__LINKED_LIST_NTA_H

View file

@ -61,6 +61,6 @@ terms specified in this license.
#include <lladd/operations.h> #include <lladd/operations.h>
void initNestedTopActions(); void initNestedTopActions();
void TbeginNestedTopAction(int xid); void TbeginNestedTopAction(int xid, int op, const byte* log_arguments, int log_arguments_length);
lsn_t TendNestedTopAction(int xid); lsn_t TendNestedTopAction(int xid);
#endif #endif

View file

@ -48,6 +48,7 @@ terms specified in this license.
#include "page.h" #include "page.h"
/*#include <lladd/bufferManager.h>*/ /*#include <lladd/bufferManager.h>*/
#include <stdio.h> #include <stdio.h>
#include <assert.h>
TransactionLog LogTransBegin(int xid) { TransactionLog LogTransBegin(int xid) {
TransactionLog tl; TransactionLog tl;
tl.xid = xid; tl.xid = xid;
@ -91,6 +92,9 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
if(operationsTable[operation].sizeofData == SIZEOF_RECORD) { if(operationsTable[operation].sizeofData == SIZEOF_RECORD) {
argSize = rid.size; argSize = rid.size;
} else if(operationsTable[operation].sizeofData == SIZEIS_PAGEID) {
argSize = rid.page;
// printf("argsize (page) %d, %d\n", argSize, sizeof(recordid) * 2 + sizeof(int) * 3);
} else { } else {
argSize = operationsTable[operation].sizeofData; argSize = operationsTable[operation].sizeofData;
} }
@ -109,11 +113,6 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
DEBUG("got preimage"); DEBUG("got preimage");
} }
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage); e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage);
writeLogEntry(e); writeLogEntry(e);

View file

@ -111,16 +111,20 @@ void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
recordid rid = e->contents.update.rid; recordid rid = e->contents.update.rid;
#endif #endif
/* lsn_t page_lsn = readLSN(e->contents.update.rid.page); */ /* lsn_t page_lsn = readLSN(e->contents.update.rid.page); */
lsn_t page_lsn = pageReadLSN(p); lsn_t page_lsn = -1;
if(e->LSN <= page_lsn) { if(p) {
page_lsn = pageReadLSN(p);
}
if(e->LSN <= page_lsn || !p) {
/* Actually execute the undo */ /* Actually execute the undo */
if(undo == NO_INVERSE) { if(undo == NO_INVERSE) {
/* Physical undo */ /* Physical undo */
assert(p); // Must be provided wiht a page in order to perform a physical undo!
DEBUG("OPERATION %d Physical undo, %ld {%d %d %ld}\n", undo, e->LSN, e->contents.update.rid.page, e->contents.update.rid.slot, e->contents.update.rid.size); DEBUG("OPERATION %d Physical undo, %ld {%d %d %ld}\n", undo, e->LSN, e->contents.update.rid.page, e->contents.update.rid.slot, e->contents.update.rid.size);
writeRecord(e->xid, p, clr_lsn, e->contents.update.rid, getUpdatePreImage(e)); writeRecord(e->xid, p, clr_lsn, e->contents.update.rid, getUpdatePreImage(e));
} else if(undo == NO_INVERSE_WHOLE_PAGE) { } else if(undo == NO_INVERSE_WHOLE_PAGE) {
assert(p);
DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN, e->contents.update.rid.page); DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN, e->contents.update.rid.page);
memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE); memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE);
pageWriteLSN(p, clr_lsn); pageWriteLSN(p, clr_lsn);

View file

@ -28,8 +28,94 @@
@file @file
*/ */
static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
typedef struct {
recordid list;
int keySize;
} lladd_linkedListInsert_log;
typedef struct {
recordid list;
int keySize;
int valueSize;
} lladd_linkedListRemove_log;
static int operateInsert(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
assert(!p);
lladd_linkedListRemove_log * log = (lladd_linkedListRemove_log*)dat;
byte * key;
byte * value;
int keySize, valueSize;
keySize = log->keySize;
valueSize = log->valueSize;
key = (byte*)(log+1);
value = ((byte*)(log+1))+keySize;
// printf("Operate insert called: rid.page = %d keysize = %d valuesize = %d %d {%d %d %d}\n", rid.page, log->keySize, log->valueSize, *(int*)key, value->page, value->slot, value->size);
// Skip writing the undo! Recovery will write a CLR after we're done, effectively
// wrapping this in a nested top action, so we needn't worry about that either.
__TlinkedListInsert(xid, log->list, key, keySize, value, valueSize);
return 0;
}
static int operateRemove(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
assert(!p);
lladd_linkedListRemove_log * log = (lladd_linkedListRemove_log*)dat;
byte * key;
int keySize;
keySize = log->keySize;
key = (byte*)(log+1);
// printf("Operate remove called: %d\n", *(int*)key);
// Don't call the version that writes an undo entry!
__TlinkedListRemove(xid, log->list, key, keySize);
return 0;
}
int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) { int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int ret = TlinkedListRemove(xid, list, key, keySize); int ret = TlinkedListRemove(xid, list, key, keySize);
lladd_linkedListInsert_log * undoLog = malloc(sizeof(lladd_linkedListInsert_log) + keySize);
undoLog->list = list;
undoLog->keySize = keySize;
memcpy(undoLog+1, key, keySize);
TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
(byte*)undoLog, sizeof(lladd_linkedListInsert_log) + keySize);
__TlinkedListInsert(xid, list, key, keySize, value, valueSize);
TendNestedTopAction(xid);
return ret;
}
Operation getLinkedListInsert() {
Operation o = {
OPERATION_NOOP,
SIZEIS_PAGEID,
OPERATION_LINKED_LIST_REMOVE,
&operateInsert
};
return o;
}
Operation getLinkedListRemove() {
Operation o = {
OPERATION_NOOP,
SIZEIS_PAGEID,
OPERATION_LINKED_LIST_INSERT,
&operateRemove
};
return o;
}
static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
//int ret = TlinkedListRemove(xid, list, key, keySize);
lladd_linkedList_entry * entry = malloc(sizeof(lladd_linkedList_entry) + keySize + valueSize); lladd_linkedList_entry * entry = malloc(sizeof(lladd_linkedList_entry) + keySize + valueSize);
Tread(xid, list, entry); Tread(xid, list, entry);
if(!entry->next.size) { if(!entry->next.size) {
@ -51,7 +137,7 @@ int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, con
free(newEntry); free(newEntry);
} }
free(entry); free(entry);
return ret; //return ret;
} }
int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) { int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
@ -81,7 +167,42 @@ int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte
free(entry); free(entry);
return -1; return -1;
} }
static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) { int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
byte * value;
int valueSize;
int ret = TlinkedListFind(xid, list, key, keySize, &value);
if(ret != -1) {
valueSize = ret;
} else {
return 0;
}
int entrySize = sizeof(lladd_linkedListRemove_log) + keySize + valueSize;
lladd_linkedListRemove_log * undoLog = malloc(entrySize);
undoLog->list = list;
undoLog->keySize = keySize;
undoLog->valueSize = valueSize;
memcpy(undoLog+1, key, keySize);
memcpy(((byte*)(undoLog+1))+keySize, value, valueSize);
// printf("entry size %d sizeof(remove_log)%d keysize %d valuesize %d sizeof(rid) %d key %d value {%d %d %ld}\n",
// entrySize, sizeof(lladd_linkedListRemove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
(byte*)undoLog, entrySize);
free(value);
__TlinkedListRemove(xid, list, key, keySize);
TendNestedTopAction(xid);
return 1;
}
static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
lladd_linkedList_entry * entry = malloc(list.size); lladd_linkedList_entry * entry = malloc(list.size);
Tread(xid, list, entry); Tread(xid, list, entry);

View file

@ -53,7 +53,7 @@ terms specified in this license.
#include <string.h> #include <string.h>
#include <malloc.h> #include <malloc.h>
#include <pthread.h> #include <pthread.h>
#include <assert.h>
/** @todo Remove extern declaration of transactional_2_mutex from nestedTopActions.c */ /** @todo Remove extern declaration of transactional_2_mutex from nestedTopActions.c */
extern pthread_mutex_t transactional_2_mutex; extern pthread_mutex_t transactional_2_mutex;
@ -72,13 +72,21 @@ void initNestedTopActions() {
NULLRID.slot = 0; NULLRID.slot = 0;
NULLRID.size = -1; NULLRID.size = -1;
} }
/** @todo TbeginNestedTopAction's API might not be quite right.
void TbeginNestedTopAction(int xid) { Are there cases where we need to pass a recordid in?
*/
void TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
recordid rid = NULLRID;
rid.page = datSize;
LogEntry * e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], NULL, rid, op, dat);
DEBUG("Begin Nested Top Action e->LSN: %ld\n", e->LSN);
lsn_t * prevLSN = malloc(sizeof(lsn_t)); lsn_t * prevLSN = malloc(sizeof(lsn_t));
*prevLSN = e->LSN;
pthread_mutex_lock(&transactional_2_mutex); pthread_mutex_lock(&transactional_2_mutex);
*prevLSN = XactionTable[xid].prevLSN; assert(!pblHtLookup(nestedTopActions, &xid, sizeof(int)));
pblHtInsert(nestedTopActions, &xid, sizeof(int), prevLSN); pblHtInsert(nestedTopActions, &xid, sizeof(int), prevLSN);
pthread_mutex_unlock(&transactional_2_mutex); pthread_mutex_unlock(&transactional_2_mutex);
free(e);
} }
/** /**
@ -94,7 +102,7 @@ lsn_t TendNestedTopAction(int xid) {
pthread_mutex_lock(&transactional_2_mutex); pthread_mutex_lock(&transactional_2_mutex);
lsn_t * prevLSN = pblHtLookup(nestedTopActions, &xid, sizeof(int)); lsn_t * prevLSN = pblHtLookup(nestedTopActions, &xid, sizeof(int));
pblHtRemove(nestedTopActions, &xid, sizeof(int));
// This action wasn't really undone -- This is a nested top action! // This action wasn't really undone -- This is a nested top action!
lsn_t undoneLSN = XactionTable[xid].prevLSN; lsn_t undoneLSN = XactionTable[xid].prevLSN;
recordid undoneRID = NULLRID; // Not correct, but this field is unused anyway. ;) recordid undoneRID = NULLRID; // Not correct, but this field is unused anyway. ;)
@ -111,6 +119,7 @@ lsn_t TendNestedTopAction(int xid) {
lsn_t ret = e->LSN; lsn_t ret = e->LSN;
free(e); free(e);
free(prevLSN);
pthread_mutex_unlock(&transactional_2_mutex); pthread_mutex_unlock(&transactional_2_mutex);
return ret; return ret;

View file

@ -49,7 +49,10 @@ terms specified in this license.
#include "../page.h" #include "../page.h"
static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) { static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
pageWriteLSN(p, lsn); /* If p is null, then this is a logical no-op that spans pages, so do nothing.
Otherwise, write the LSN to the appropriate page (to keep recovery happy)
and return */
if(p) pageWriteLSN(p, lsn);
return 0; return 0;
} }

View file

@ -210,6 +210,8 @@ static void Undo(int recovery) {
case UPDATELOG: case UPDATELOG:
{ {
/* Need write lock for undo.. (Why??) */ /* Need write lock for undo.. (Why??) */
if(e->contents.update.rid.size != -1) {
Page * p = getPage(e->contents.update.rid.page, RO); Page * p = getPage(e->contents.update.rid.page, RO);
this_lsn= pageReadLSN(p); /* e->contents.update.rid.page); */ this_lsn= pageReadLSN(p); /* e->contents.update.rid.page); */
@ -233,6 +235,12 @@ static void Undo(int recovery) {
/* printf("1b"); fflush(NULL); */ /* printf("1b"); fflush(NULL); */
releasePage(p); releasePage(p);
} else {
// The log entry is not associated with a particular page.
// (Therefore, it must be an idempotent logical log entry.)
clr_lsn = LogCLR(e);
undoUpdate(e, NULL, clr_lsn);
}
break; break;
} }
case CLRLOG: case CLRLOG:

View file

@ -68,6 +68,9 @@ void setupOperationsTable() {
operationsTable[OPERATION_SET_RANGE] = getSetRange(); operationsTable[OPERATION_SET_RANGE] = getSetRange();
operationsTable[OPERATION_SET_RANGE_INVERSE] = getSetRangeInverse(); operationsTable[OPERATION_SET_RANGE_INVERSE] = getSetRangeInverse();
operationsTable[OPERATION_LINKED_LIST_INSERT] = getLinkedListInsert();
operationsTable[OPERATION_LINKED_LIST_REMOVE] = getLinkedListRemove();
} }

View file

@ -368,7 +368,7 @@ START_TEST(operation_nestedTopAction) {
*dat = 10; *dat = 10;
Tset(xid, rid1, dat); Tset(xid, rid1, dat);
TbeginNestedTopAction(xid); TbeginNestedTopAction(xid, OPERATION_NOOP, NULL, 0);
*dat = 20; *dat = 20;
Tset(xid, rid2, dat); Tset(xid, rid2, dat);