Major refactoring:

- Changed operations to take only two arguments
     - No more hacks regarding log argument sizes

     - Set pageid = INVALID_PAGE if you want a logical operation

  - Ported operation implementations to new api; exposed + fixed a number of concurrency bugs

  - More fixes to prepare

  - Fixed to nested top actions

  - More coherent operations api / recovery implmentation

  - TnaiveHash* and Tinstant* are (and were already) broken, and are set for removal

  - Removed some instances of fail_unless

  - Fixed design flaws in blob implementation.

  - New naming convention for operation callback functions.
This commit is contained in:
Sears Russell 2008-09-28 03:11:24 +00:00
parent 14fc96e068
commit 6d17442380
53 changed files with 1098 additions and 951 deletions

View file

@ -13,7 +13,7 @@ void allocBlob(int xid, recordid rid) {
rec.size = rid.size;
recordid rid2 = rid;
rid2.size = BLOB_SLOT;
Tset(xid, rid2, &rec);
Tset(xid, rid2, (byte*)&rec);
// printf("Page start = %d, count = %d, rid.size=%d\n", rec.offset, pageCount, rid.size);
// printf("rid = {%d %d %d}\n", rid.page, rid.slot, rid.size);
}
@ -27,47 +27,55 @@ void readBlob(int xid, Page * p2, recordid rid, byte * buf) {
stasis_record_read(xid, p2, rawRid, (byte*)&rec);
for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
//printf("Chunk = %d->%lld\n", chunk, (long long)rec.offset+chunk);
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(buf + (chunk * USABLE_SIZE_OF_PAGE), pbuf, USABLE_SIZE_OF_PAGE);
}
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(buf + (chunk * USABLE_SIZE_OF_PAGE), pbuf, rid.size % USABLE_SIZE_OF_PAGE);
// printf("Chunk = %d\n", chunk);
//printf("Chunk = %d->%lld\n", chunk, (long long)rec.offset+chunk);
}
void writeBlob(int xid, Page * p2, lsn_t lsn, recordid rid, const byte * buf) {
int chunk;
recordid rawRid = rid;
rawRid.size = BLOB_SLOT;
byte * pbuf = alloca(PAGE_SIZE);
blob_record_t rec;
stasis_record_read(xid, p2, rawRid, (byte*)&rec);
Page tmp;
tmp.memAddr=pbuf;
void writeBlob(int xid, Page * p, recordid rid, const void* dat) {
blob_record_t rec;
recordid r = rid;
r.size = sizeof(blob_record_t);
stasis_record_read(xid, p, r, (byte*)&rec);
assert(rec.offset);
for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
TpageGet(xid, rec.offset+chunk, pbuf);
*stasis_page_type_ptr(&tmp) = BLOB_PAGE;
memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf);
}
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), rid.size % USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf);
// printf("Write Chunk = %d (%d)\n", chunk, rec.offset+chunk);
assert(rec.offset);
int64_t chunk = 0;
for(; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
Page * cnk = loadPage(xid, rec.offset+chunk);
writelock(cnk->rwlatch,0);
if(*stasis_page_type_ptr(cnk) != BLOB_PAGE) {
stasis_blob_initialize_page(cnk);
}
unlock(cnk->rwlatch);
// Don't care about race; writes in race have undefined semantics...
TpageSetRange(xid,rec.offset+chunk,0,((const byte*)dat)+(chunk*USABLE_SIZE_OF_PAGE),USABLE_SIZE_OF_PAGE);
}
Page * cnk = loadPage(xid, rec.offset+chunk);
writelock(cnk->rwlatch,0);
if(*stasis_page_type_ptr(cnk) != BLOB_PAGE) {
stasis_blob_initialize_page(cnk);
}
unlock(cnk->rwlatch);
byte * buf = calloc(1,USABLE_SIZE_OF_PAGE);
memcpy(buf, ((const byte*)dat)+(chunk*USABLE_SIZE_OF_PAGE), rid.size % USABLE_SIZE_OF_PAGE);
TpageSetRange(xid,rec.offset+chunk,0,buf,USABLE_SIZE_OF_PAGE);
free(buf);
}
static int notSupported(int xid, Page * p) { return 0; }
static void blobLoaded(Page *p) {
p->LSN = *stasis_page_lsn_ptr(p);
DEBUG("load lsn: %lld\n", (long long)p->LSN);
}
static void blobFlushed(Page *p) {
*stasis_page_lsn_ptr(p) = p->LSN;
DEBUG("flush lsn: %lld\n", (long long)p->LSN);
}
static void blobCleanup(Page *p) { }
@ -99,3 +107,9 @@ static page_impl pi = {
page_impl blobImpl() {
return pi;
}
void stasis_blob_initialize_page(Page * p) {
assertlocked(p->rwlatch);
DEBUG("lsn: %lld\n",(long long)p->LSN);
stasis_page_cleanup(p);
*stasis_page_type_ptr(p) = BLOB_PAGE;
}

View file

@ -25,7 +25,7 @@ void naiveTraverse(int xid, recordid rid, int num) {
node[transClos_outdegree] = num;
numTset++;
Tset(xid, rid, node);
Tset(xid, rid, (const byte*)node);
int i = 0;
@ -78,7 +78,7 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t
node[transClos_outdegree] = num;
numTset++;
Tset(xid, localRid, node); /// @todo TsetRange?
Tset(xid, localRid, (const byte*)node); /// @todo TsetRange?
int i;
for(i =0 ; i < transClos_outdegree; i++) {
recordid nextRid = arrayList;

View file

@ -64,97 +64,64 @@ LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN) {
ret->xid = xid;
ret->type = XPREPARE;
*(lsn_t*)(((struct __raw_log_entry*)ret)+1)=recLSN;
// assert(sizeofLogEntry(ret) == sizeof(struct __raw_log_entry)+sizeof(lsn_t));
return ret;
}
const byte * getUpdateArgs(const LogEntry * ret) {
assert(ret->type == UPDATELOG ||
ret->type == CLRLOG);
if(ret->update.argSize == 0) {
if(ret->update.arg_size == 0) {
return NULL;
} else {
return ((byte*)ret) +
return ((const byte*)ret) +
sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry);
}
}
const byte * getUpdatePreImage(const LogEntry * ret) {
assert(ret->type == UPDATELOG ||
ret->type == CLRLOG);
if(operationsTable[ret->update.funcID].undo != NO_INVERSE &&
operationsTable[ret->update.funcID].undo != NO_INVERSE_WHOLE_PAGE) {
return NULL;
} else {
return ((byte*)ret) +
sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry) +
ret->update.argSize;
}
}
lsn_t getPrepareRecLSN(const LogEntry *e) {
lsn_t ret = *(lsn_t*)(((struct __raw_log_entry*)e)+1);
if(ret == -1) { ret = e->LSN; }
return ret;
}
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
unsigned int funcID, recordid rid,
const byte * args, unsigned int argSize,
const byte * preImage) {
int invertible = operationsTable[funcID].undo != NO_INVERSE;
int whole_page_phys = operationsTable[funcID].undo == NO_INVERSE_WHOLE_PAGE;
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
unsigned int op, pageid_t page,
const byte * args, unsigned int arg_size) {
/** Use calloc since the struct might not be packed in memory;
otherwise, we'd leak uninitialized bytes to the log. */
size_t logentrysize =
sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry) + argSize +
((!invertible) ? stasis_record_type_to_size(rid.size)
: 0) +
(whole_page_phys ? PAGE_SIZE
: 0);
size_t logentrysize =
sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + arg_size;
LogEntry * ret = calloc(1,logentrysize);
ret->LSN = -1;
ret->prevLSN = prevLSN;
ret->xid = xid;
ret->type = UPDATELOG;
ret->update.funcID = funcID;
ret->update.rid = rid;
ret->update.argSize = argSize;
if(argSize) {
memcpy((void*)getUpdateArgs(ret), args, argSize);
}
if(!invertible) {
memcpy((void*)getUpdatePreImage(ret), preImage,
stasis_record_type_to_size(rid.size));
ret->update.funcID = op;
ret->update.page = page;
ret->update.arg_size = arg_size;
if(arg_size) {
memcpy((void*)getUpdateArgs(ret), args, arg_size);
}
if(whole_page_phys) {
memcpy((void*)getUpdatePreImage(ret), preImage,
PAGE_SIZE);
}
//assert(logentrysize == sizeofLogEntry(ret));
// XXX checks for uninitialized values in valgrind
// stasis_crc32(ret, sizeofLogEntry(ret), 0);
return ret;
}
LogEntry * allocCLRLogEntry(const LogEntry * old_e) {
CLRLogEntry * ret = calloc(1,sizeof(CLRLogEntry));
// Could handle other types, but we should never encounter them here.
assert(old_e->type == UPDATELOG);
LogEntry * ret = calloc(1, sizeofLogEntry(old_e));
memcpy(ret, old_e, sizeofLogEntry(old_e));
ret->LSN = -1;
// prevLSN is OK already
// xid is OK already
ret->prevLSN = old_e->prevLSN;
ret->xid = old_e->xid;
ret->type = CLRLOG;
// update is also OK
return ret;
DEBUG("compensates: %lld\n", old_e->LSN);
assert(old_e->LSN!=-1);
ret->clr.compensated_lsn = old_e->LSN;
return (LogEntry*)ret;
}
@ -162,15 +129,14 @@ LogEntry * allocCLRLogEntry(const LogEntry * old_e) {
long sizeofLogEntry(const LogEntry * log) {
switch (log->type) {
case CLRLOG:
{
return sizeof(CLRLogEntry);
}
case UPDATELOG:
{
int undoType = operationsTable[log->update.funcID].undo;
return sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry) + log->update.argSize +
((undoType == NO_INVERSE) ? stasis_record_type_to_size(log->update.rid.size)
: 0) +
((undoType == NO_INVERSE_WHOLE_PAGE) ? PAGE_SIZE : 0);
}
{
return sizeof(struct __raw_log_entry) +
sizeof(UpdateLogEntry) + log->update.arg_size;
}
case INTERNALLOG:
return LoggerSizeOfInternalLogEntry(log);
case XPREPARE:

View file

@ -320,49 +320,17 @@ lsn_t LogTransPrepare(TransactionLog * l) {
return groupPrepare(l);
}
/**
@todo Does the handling of operation types / argument sizes belong
here? Shouldn't it be in logEntry.c, or perhaps with other code
that reasons about the various operation types?
*/
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args) {
void * preImage = NULL;
long argSize = 0;
LogEntry * e;
LogEntry * LogUpdate(TransactionLog * l, Page * p, unsigned int op,
const byte * arg, size_t arg_size) {
argSize = operationsTable[operation].sizeofData;
if(argSize == SIZEOF_RECORD) argSize = stasis_record_type_to_size(rid.size);
if(argSize == SIZEIS_PAGEID) argSize = rid.page;
int undoType = operationsTable[operation].undo;
if(undoType == NO_INVERSE) {
DEBUG("Creating %ld byte physical pre-image.\n", stasis_record_type_to_size(rid.size));
preImage = malloc(stasis_record_type_to_size(rid.size));
stasis_record_read(l->xid, p, rid, preImage);
} else if (undoType == NO_INVERSE_WHOLE_PAGE) {
DEBUG("Logging entire page\n");
preImage = malloc(PAGE_SIZE);
memcpy(preImage, p->memAddr, PAGE_SIZE);
} else {
DEBUG("No pre-image");
}
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize,
preImage);
LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op,
p ? p->id : INVALID_PAGE,
arg, arg_size);
LogWrite(e);
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize);
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size);
if(preImage) {
free(preImage);
}
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
return e;
@ -374,15 +342,16 @@ lsn_t LogCLR(const LogEntry * old_e) {
DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid,
e->LSN, LSN, prevLSN);
lsn_t ret = e->LSN;
FreeLogEntry(e);
return ret;
}
lsn_t LogDummyCLR(int xid, lsn_t prevLSN) {
LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP,
NULLRID, NULL, 0, 0);
lsn_t LogDummyCLR(int xid, lsn_t prevLSN, lsn_t compensatedLSN) {
LogEntry * e = allocUpdateLogEntry(prevLSN, xid, OPERATION_NOOP,
INVALID_PAGE, NULL, 0);
e->LSN = compensatedLSN;
lsn_t ret = LogCLR(e);
FreeLogEntry(e);
return ret;

View file

@ -52,134 +52,83 @@ terms specified in this license.
#include <stasis/page.h>
#include <stasis/transactional.h> /// XXX for xactiontable
Operation operationsTable[MAX_OPERATIONS];
/**
@todo operations.c should handle LSN's for non-logical operations.
*/
void doUpdate(const LogEntry * e, Page * p) {
DEBUG("OPERATION update arg length %d, lsn = %ld\n",
e->contents.update.argSize, e->LSN);
assert(p);
operationsTable[e->update.funcID].run(e, p);
writelock(p->rwlatch,0);
DEBUG("OPERATION xid %d Do, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
stasis_page_lsn_write(e->xid, p, e->LSN);
unlock(p->rwlatch);
operationsTable[e->update.funcID].run(e->xid, p, e->LSN,
e->update.rid, getUpdateArgs(e));
}
void redoUpdate(const LogEntry * e) {
if(e->type == UPDATELOG) {
recordid rid = e->update.rid;
Page * p;
lsn_t pageLSN;
try {
if(operationsTable[e->update.funcID].sizeofData == SIZEIS_PAGEID) {
p = NULL;
pageLSN = 0;
} else {
p = loadPage(e->xid, rid.page);
pageLSN = stasis_page_lsn_read(p);
}
} end;
// Only handle update log entries
assert(e->type == UPDATELOG);
// If this is a logical operation, something is broken
assert(e->update.page != INVALID_PAGE);
if(e->LSN > pageLSN) {
DEBUG("OPERATION Redo, %ld > %ld {%d %d %ld}\n",
e->LSN, pageLSN, rid.page, rid.slot, rid.size);
// Need to check the id field to find out what the _REDO_ action
// is for this log type. contrast with doUpdate(), which
// doesn't use the .id field.
operationsTable[operationsTable[e->update.funcID].id]
.run(e->xid, p, e->LSN, e->update.rid, getUpdateArgs(e));
if(operationsTable[operationsTable[e->update.funcID].id].run == noop)
return;
} else {
DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %ld}\n",
e->LSN, pageLSN, rid.page, rid.slot, rid.size);
}
if(p) {
releasePage(p);
}
} else if(e->type == CLRLOG) {
recordid rid = e->update.rid;
Page * p = NULL;
lsn_t pageLSN;
int isNullRid = !memcmp(&rid, &NULLRID, sizeof(recordid));
if(!isNullRid) {
if(operationsTable[e->update.funcID].sizeofData == SIZEIS_PAGEID) {
p = NULL;
pageLSN = 0;
} else {
try {
p = loadPage(e->xid, rid.page);
pageLSN = stasis_page_lsn_read(p);
} end;
}
}
/* See if the page contains the result of the undo that this CLR
is supposed to perform. If it doesn't, or this was a logical
operation, then undo the original operation. */
if(isNullRid || e->LSN > pageLSN) {
DEBUG("OPERATION Undoing for clr, %ld {%d %d %ld}\n",
e->LSN, rid.page, rid.slot, rid.size);
undoUpdate(e, p, e->LSN);
} else {
DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %ld}\n",
e->LSN, rid.page, rid.slot, rid.size);
}
if(p) {
releasePage(p);
}
Page * p = loadPage(e->xid, e->update.page);
writelock(p->rwlatch,0);
if(stasis_page_lsn_read(p) < e->LSN) {
DEBUG("OPERATION xid %d Redo, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
// Need to check the id field to find out what the REDO_action
// is for this log type.
// contrast with doUpdate(), which doesn't check the .id field.
stasis_page_lsn_write(e->xid, p, e->LSN); //XXX do this after run();
unlock(p->rwlatch); /// XXX keep lock while calling run();
operationsTable[operationsTable[e->update.funcID].id]
.run(e,p);
} else {
abort();
DEBUG("OPERATION xid %d skip redo, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
unlock(p->rwlatch);
}
releasePage(p);
}
void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
void undoUpdate(const LogEntry * e, lsn_t effective_lsn) {
// Only handle update entries
assert(e->type == UPDATELOG);
int undo = operationsTable[e->update.funcID].undo;
DEBUG("OPERATION FuncID %d Undo op %d LSN %ld\n",
e->update.funcID, undo, clr_lsn);
#ifdef DEBUGGING
recordid rid = e->update.rid;
#endif
lsn_t page_lsn = -1;
if(p) {
page_lsn = stasis_page_lsn_read(p);
}
if(e->LSN <= page_lsn || !p) {
// Actually execute the undo
if(undo == NO_INVERSE) {
if(e->update.page == INVALID_PAGE) {
// logical undos are excuted unconditionally.
DEBUG("OPERATION %d Physical undo, %ld {%d %d %ld}\n", undo, e->LSN,
e->update.rid.page, e->contents.rid.slot, e->update.rid.size);
DEBUG("OPERATION xid %d FuncID %d Undo, %d LSN %lld {logical}\n", e->xid,
e->update.funcID, undo, e->LSN);
assert(p);
stasis_record_write(e->xid, p, clr_lsn, e->update.rid, getUpdatePreImage(e));
} else if(undo == NO_INVERSE_WHOLE_PAGE) {
DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN,
e->update.rid.page);
assert(p);
memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE);
stasis_page_lsn_write(e->xid, p, clr_lsn);
} else {
DEBUG("OPERATION %d Logical undo, %ld {%d %d %ld}\n", undo, e->LSN,
e->update.rid.page, e->update.rid.slot, e->update.rid.size);
operationsTable[undo].run(e->xid, p, clr_lsn, e->update.rid,
getUpdateArgs(e));
}
operationsTable[undo].run(e,0);
} else {
DEBUG("OPERATION %d Skipping undo, %ld {%d %d %ld}\n", undo, e->LSN,
e->update.rid.page, e->update.rid.slot, e->update.rid.size);
Page * p = loadPage(e->xid, e->update.page);
writelock(p->rwlatch,0);
if(stasis_page_lsn_read(p) < effective_lsn) {
DEBUG("OPERATION xid %d Undo, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
stasis_page_lsn_write(e->xid, p, effective_lsn); // XXX call after run()
unlock(p->rwlatch); // release after run()
operationsTable[undo].run(e,p);
} else {
DEBUG("OPERATION xid %d skip undo, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
unlock(p->rwlatch);
}
releasePage(p);
}
}

View file

@ -77,7 +77,7 @@
*/
//}end
static int operate_helper(int xid, Page * p, recordid rid, const void * dat) {
static int operate_helper(int xid, Page * p, recordid rid) {
if(stasis_record_type_read(xid, p, rid) == INVALID_SLOT) {
stasis_record_alloc_done(xid, p, rid);
@ -90,30 +90,73 @@ static int operate_helper(int xid, Page * p, recordid rid, const void * dat) {
return 0;
}
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
typedef struct {
slotid_t slot;
int64_t size;
} alloc_arg;
static int op_alloc(const LogEntry* e, Page* p) { //(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
writelock(p->rwlatch, 0);
int ret = operate_helper(xid,p,rid,dat);
stasis_page_lsn_write(xid,p,lsn);
assert(e->update.arg_size >= sizeof(alloc_arg));
const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e);
recordid rid = {
p->id,
arg->slot,
arg->size
};
int ret = operate_helper(e->xid,p,rid);
if(e->update.arg_size == sizeof(alloc_arg) + arg->size) {
// if we're aborting a dealloc, we'd better have a sane preimage to apply
stasis_record_write(e->xid,p,e->LSN,rid,(const byte*)(arg+1));
} else {
// otherwise, no preimage
assert(e->update.arg_size == sizeof(alloc_arg));
}
unlock(p->rwlatch);
return ret;
}
static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
static int op_dealloc(const LogEntry* e, Page* p) { //deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
writelock(p->rwlatch,0);
stasis_record_free(xid, p, rid);
stasis_page_lsn_write(xid,p,lsn);
assert(stasis_record_type_read(xid, p, rid) == INVALID_SLOT);
assert(e->update.arg_size >= sizeof(alloc_arg));
const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e);
recordid rid = {
p->id,
arg->slot,
arg->size
};
// assert that we've got a sane preimage or we're aborting a talloc (no preimage)
assert(e->update.arg_size == sizeof(alloc_arg) + arg->size || e->update.arg_size == sizeof(alloc_arg));
stasis_record_free(e->xid, p, rid);
assert(stasis_record_type_read(e->xid, p, rid) == INVALID_SLOT);
unlock(p->rwlatch);
return 0;
}
static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
static int op_realloc(const LogEntry* e, Page* p) { //reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
writelock(p->rwlatch,0);
assert(stasis_record_type_read(xid, p, rid) == INVALID_SLOT);
int ret = operate_helper(xid, p, rid, dat);
byte * buf = stasis_record_write_begin(xid,p,rid);
memcpy(buf, dat, stasis_record_length_read(xid,p,rid));
stasis_page_lsn_write(xid,p,lsn);
assert(e->update.arg_size >= sizeof(alloc_arg));
const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e);
recordid rid = {
p->id,
arg->slot,
arg->size
};
assert(stasis_record_type_read(e->xid, p, rid) == INVALID_SLOT);
int ret = operate_helper(e->xid, p, rid);
assert(e->update.arg_size == sizeof(alloc_arg)
+ stasis_record_length_read(e->xid,p,rid));
byte * buf = stasis_record_write_begin(e->xid,p,rid);
memcpy(buf, arg+1, stasis_record_length_read(e->xid,p,rid));
stasis_record_write_done(e->xid,p,rid,buf);
unlock(p->rwlatch);
return ret;
@ -124,9 +167,8 @@ static pthread_mutex_t talloc_mutex = PTHREAD_MUTEX_INITIALIZER;
Operation getAlloc() {
Operation o = {
OPERATION_ALLOC, /* ID */
0,
OPERATION_DEALLOC, /* OPERATION_NOOP, */
&operate
op_alloc
};
return o;
}
@ -135,9 +177,8 @@ Operation getAlloc() {
Operation getDealloc() {
Operation o = {
OPERATION_DEALLOC,
SIZEOF_RECORD,
OPERATION_REALLOC, /* OPERATION_NOOP, */
&deoperate
op_dealloc
};
return o;
}
@ -146,9 +187,8 @@ Operation getDealloc() {
Operation getRealloc() {
Operation o = {
OPERATION_REALLOC,
0,
OPERATION_NOOP,
&reoperate
op_realloc
};
return o;
}
@ -237,10 +277,10 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
short type;
if(size >= BLOB_THRESHOLD_SIZE) {
type = BLOB_SLOT;
} else {
} else {
type = size;
}
recordid rid;
begin_action_ret(pthread_mutex_unlock, &talloc_mutex, NULLRID) {
@ -296,9 +336,11 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
allocationPolicyUpdateFreespaceLockedPage(allocPolicy, xid, ap, newFreespace);
unlock(p->rwlatch);
Tupdate(xid, rid, NULL, OPERATION_ALLOC);
alloc_arg a = { rid.slot, rid.size };
if(type == BLOB_SLOT) {
Tupdate(xid, rid, &a, sizeof(a), OPERATION_ALLOC);
if(type == BLOB_SLOT) {
rid.size = size;
allocBlob(xid, rid);
}
@ -321,11 +363,12 @@ void allocTransactionCommit(int xid) {
} compensate;
}
compensated_function recordid TallocFromPage(int xid, long page, unsigned long type) {
unsigned long size = type;
if(size > BLOB_THRESHOLD_SIZE) {
compensated_function recordid TallocFromPage(int xid, long page, unsigned long size) {
short type;
if(size >= BLOB_THRESHOLD_SIZE) {
type = BLOB_SLOT;
} else {
type = size;
}
pthread_mutex_lock(&talloc_mutex);
@ -338,7 +381,9 @@ compensated_function recordid TallocFromPage(int xid, long page, unsigned long t
allocationPolicyAllocedFromPage(allocPolicy, xid, page);
unlock(p->rwlatch);
Tupdate(xid,rid,NULL,OPERATION_ALLOC);
alloc_arg a = { rid.slot, rid.size };
Tupdate(xid, rid, &a, sizeof(a), OPERATION_ALLOC);
if(type == BLOB_SLOT) {
rid.size = size;
@ -360,7 +405,6 @@ compensated_function void Tdealloc(int xid, recordid rid) {
// @todo this needs to garbage collect empty storage regions.
void * preimage = malloc(rid.size);
Page * p;
pthread_mutex_lock(&talloc_mutex);
try {
@ -370,11 +414,20 @@ compensated_function void Tdealloc(int xid, recordid rid) {
recordid newrid = stasis_record_dereference(xid, p, rid);
allocationPolicyLockPage(allocPolicy, xid, newrid.page);
readlock(p->rwlatch,0);
int64_t size = stasis_record_length_read(xid,p,rid);
unlock(p->rwlatch);
byte * preimage = malloc(sizeof(alloc_arg)+rid.size);
((alloc_arg*)preimage)->slot = rid.slot;
((alloc_arg*)preimage)->size = size;
begin_action(releasePage, p) {
stasis_record_read(xid, p, rid, preimage);
stasis_record_read(xid, p, rid, preimage+sizeof(alloc_arg));
/** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */
Tupdate(xid, rid, preimage, OPERATION_DEALLOC);
Tupdate(xid, rid, preimage, sizeof(alloc_arg)+rid.size, OPERATION_DEALLOC);
} compensate;
pthread_mutex_unlock(&talloc_mutex);
@ -405,27 +458,31 @@ compensated_function int TrecordSize(int xid, recordid rid) {
}
void TinitializeSlottedPage(int xid, int pageid) {
recordid rid = { pageid, SLOTTED_PAGE, 0 };
Tupdate(xid, rid, NULL, OPERATION_INITIALIZE_PAGE);
alloc_arg a = { SLOTTED_PAGE, 0 };
recordid rid = { pageid, 0, 0 };
Tupdate(xid, rid, &a, sizeof(a), OPERATION_INITIALIZE_PAGE);
}
void TinitializeFixedPage(int xid, int pageid, int slotLength) {
recordid rid = { pageid, FIXED_PAGE, slotLength };
Tupdate(xid, rid, NULL, OPERATION_INITIALIZE_PAGE);
alloc_arg a = { FIXED_PAGE, slotLength };
recordid rid = { pageid, 0, 0 };
Tupdate(xid, rid, &a, sizeof(a), OPERATION_INITIALIZE_PAGE);
}
static int operate_initialize_page(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
static int op_initialize_page(const LogEntry* e, Page* p) { //int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
writelock(p->rwlatch, 0);
switch(rid.slot) {
assert(e->update.arg_size == sizeof(alloc_arg));
const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e);
switch(arg->slot) {
case SLOTTED_PAGE:
stasis_slotted_initialize_page(p);
break;
case FIXED_PAGE:
stasis_fixed_initialize_page(p, rid.size, stasis_fixed_records_per_page(rid.size));
case FIXED_PAGE:
stasis_fixed_initialize_page(p, arg->size, stasis_fixed_records_per_page(arg->size));
break;
default:
abort();
}
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
return 0;
}
@ -434,9 +491,8 @@ static int operate_initialize_page(int xid, Page *p, lsn_t lsn, recordid rid, co
Operation getInitializePage() {
Operation o = {
OPERATION_INITIALIZE_PAGE,
0,
OPERATION_NOOP,
&operate_initialize_page
op_initialize_page
};
return o;
}

View file

@ -49,16 +49,18 @@ compensated_function recordid TarrayListAlloc(int xid, int count, int multiplier
rid.size = size;
rid.slot = 0;
try_ret(NULLRID) {
Tupdate(xid, rid, &tlp, OPERATION_ARRAY_LIST_ALLOC);
Tupdate(xid, rid, &tlp, sizeof(tlp), OPERATION_ARRAY_LIST_ALLOC);
} end_ret(NULLRID);
return rid;
}
static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
static int op_array_list_alloc(const LogEntry* e, Page* p) {
const TarrayListParameters * tlp = dat;
assert(e->update.arg_size == sizeof(TarrayListParameters));
const TarrayListParameters * tlp = (const TarrayListParameters*)getUpdateArgs(e);
int firstPage = tlp->firstPage;
int count = tlp->initialSize;
@ -81,16 +83,14 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
firstDataPageRid.slot = 4;
int firstDataPage = firstPage + 1;
(*(int*)stasis_record_write_begin(xid, p, countRid))= count;
(*(int*)stasis_record_write_begin(xid, p, multiplierRid))= multiplier;
(*(int*)stasis_record_write_begin(xid, p, firstDataPageRid))= firstDataPage;
(*(int*)stasis_record_write_begin(xid, p, slotSizeRid))= size;
(*(int*)stasis_record_write_begin(xid, p, maxOffset))= -1;
(*(int*)stasis_record_write_begin(e->xid, p, countRid))= count;
(*(int*)stasis_record_write_begin(e->xid, p, multiplierRid))= multiplier;
(*(int*)stasis_record_write_begin(e->xid, p, firstDataPageRid))= firstDataPage;
(*(int*)stasis_record_write_begin(e->xid, p, slotSizeRid))= size;
(*(int*)stasis_record_write_begin(e->xid, p, maxOffset))= -1;
*stasis_page_type_ptr(p) = ARRAY_LIST_PAGE;
stasis_page_lsn_write(xid, p, lsn);
recordid ret;
ret.page = firstPage;
ret.slot = 0; /* slot = # of slots in array... */
@ -103,9 +103,8 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
Operation getArrayListAlloc() {
Operation o = {
OPERATION_ARRAY_LIST_ALLOC, /* ID */
sizeof(TarrayListParameters),
OPERATION_NOOP, /* Since TpageAllocMany will be undone, the page we touch will be nuked anyway, so set this to NO-OP. */
&operateAlloc
&op_array_list_alloc
};
return o;
}
@ -118,7 +117,7 @@ Operation getArrayListAlloc() {
@todo this function calls pow(), which is horribly inefficient.
*/
static compensated_function int TarrayListExtendInternal(int xid, recordid rid, int slots, int op) {
compensated_function int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p;
try_ret(compensation_error()) {
p = loadPage(xid, rid.page);
@ -159,32 +158,27 @@ static compensated_function int TarrayListExtendInternal(int xid, recordid rid,
It should generate 1 entry. (Need better LSN handling first.)*/
{
recordid newpage;
newpage.slot = 0;
newpage.size = tlp.size;
newpage.slot = 0;
newpage.size = 0;
for(int i = newFirstPage; i < newFirstPage + blockSize; i++) {
newpage.page = i;
TupdateRaw(xid, newpage, 0, OPERATION_FIXED_PAGE_ALLOC);
TupdateRaw(xid, newpage, &tlp.size, sizeof(tlp.size), OPERATION_FIXED_PAGE_ALLOC);
}
}
TupdateRaw(xid, tmp, &newFirstPage, op);
TsetRaw(xid,tmp,&newFirstPage);
DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage);
}
tmp.slot = MAX_OFFSET_POSITION;
int newMaxOffset = tlp.maxOffset+slots;
TupdateRaw(xid, tmp, &newMaxOffset, op);
TsetRaw(xid, tmp, &newMaxOffset);
} end_ret(compensation_error());
return 0;
}
compensated_function int TarrayListInstantExtend(int xid, recordid rid, int slots) {
return TarrayListExtendInternal(xid, rid, slots, OPERATION_INSTANT_SET);
}
compensated_function int TarrayListExtend(int xid, recordid rid, int slots) {
return TarrayListExtendInternal(xid, rid, slots, OPERATION_SET);
}
compensated_function int TarrayListLength(int xid, recordid rid) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch, 0);

View file

@ -48,21 +48,23 @@ terms specified in this license.
#include <stasis/operations.h>
#include <stasis/page.h>
static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
int i;
static int op_decrement(const LogEntry* e, Page* p) {
int i;
assert(e->update.arg_size == sizeof(slotid_t));
recordid r = {p->id, *(slotid_t*)getUpdateArgs(e), sizeof(int)};
stasis_record_read(xid, p, r, (byte*)&i);
i--;
stasis_record_write(xid, p, lsn, r, (byte*)&i);
return 0;
stasis_record_read(e->xid, p, r, (byte*)&i);
i--;
stasis_record_write(e->xid, p, e->LSN, r, (byte*)&i);
return 0;
}
Operation getDecrement() {
Operation o = {
OPERATION_DECREMENT, /* id */
0, /* sizeofData (this doesn't take any args) */
OPERATION_INCREMENT, /* we're not doing undo functions yet */
&operate /* Function */
};
return o;
Operation getDecrement() {
Operation o = {
OPERATION_DECREMENT,
OPERATION_INCREMENT,
op_decrement
};
return o;
}

View file

@ -48,21 +48,24 @@ terms specified in this license.
#include <stasis/operations.h>
#include <stasis/page.h>
static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
int i;
static int op_decrement(const LogEntry* e, Page* p) {
int i;
stasis_record_read(xid, p, r, (byte*)&i);
i++;
stasis_record_write(xid, p, lsn, r, (byte*)&i);
return 0;
assert(e->update.arg_size == sizeof(slotid_t));
recordid r = {p->id, *(slotid_t*)getUpdateArgs(e), sizeof(int)};
stasis_record_read(e->xid, p, r, (byte*)&i);
i++;
stasis_record_write(e->xid, p, e->LSN, r, (byte*)&i);
return 0;
}
Operation getIncrement() {
Operation o = {
OPERATION_INCREMENT, /* id */
0, /* sizeofData (this doesn't take any args) */
OPERATION_DECREMENT, /* we're not doing undo functions yet */
&operate /* Function */
};
return o;
Operation getIncrement() {
Operation o = {
OPERATION_INCREMENT,
OPERATION_DECREMENT,
op_decrement
};
return o;
}

View file

@ -47,18 +47,50 @@ terms specified in this license.
#include <stasis/operations.h>
#include <stasis/page.h>
#include <string.h>
static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
stasis_record_write(xid, p, lsn, rid, dat);
// XXX do not use
static int op_instant_set(const LogEntry *e, Page* p) {
assert(e->update.arg_size >= sizeof(slotid_t) + sizeof(int64_t));
const byte * b = getUpdateArgs(e);
recordid rid;
rid.page = p->id;
rid.slot = *(slotid_t*)b; b+=sizeof(slotid_t);
rid.size = *(int64_t*)b; b+=sizeof(int64_t);
assert(e->update.arg_size == sizeof(slotid_t) + sizeof(int64_t) + rid.size);
assert(stasis_record_type_to_size(rid.size) == rid.size);
stasis_record_write(e->xid, p, e->LSN, rid, b);
return 0;
}
int TinstantSet(int xid, recordid rid, const void * dat) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
rid = stasis_record_dereference(xid,p,rid);
unlock(p->rwlatch);
releasePage(p);
rid.size = stasis_record_type_to_size(rid.size);
size_t sz = sizeof(slotid_t) + sizeof(int64_t) + rid.size;
byte * const buf = malloc(sz);
byte * b = buf;
*(slotid_t*) b = rid.slot; b += sizeof(slotid_t);
*(int64_t*) b = rid.size; b += sizeof(int64_t);
memcpy(b, dat, rid.size);
Tupdate(xid,rid,buf,sz,OPERATION_INSTANT_SET);
free(buf);
return 0;
}
Operation getInstantSet() {
Operation o = {
OPERATION_INSTANT_SET, /* id */
SIZEOF_RECORD, /* use the size of the record as size of arg */
OPERATION_NOOP,
&operate /* Function */
OPERATION_NOOP,
op_instant_set
};
return o;
}

View file

@ -38,12 +38,12 @@ extern pthread_mutex_t linearHashMutex;// = PTHREAD_MUTEX_INITIALIZER;
extern pblHashTable_t * openHashes ;
static int operateUndoInsert(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
static int op_linear_hash_undo_insert(const LogEntry* e, Page *p) {
abort();
#if 0
int keySize = rid.size;
int valSize = rid.slot;
rid.slot = 0;
// rid.size = sizeof(recordid);
rid.slot = sizeof(hashEntry) + keySize + valSize;
if(!pblHtLookup(openHashes, &rid.page, sizeof(int))) {
@ -53,6 +53,7 @@ static int operateUndoInsert(int xid, Page * p, lsn_t lsn, recordid rid, const v
ThashInstantDelete(xid, rid, dat, keySize, valSize);
return 0;
#endif
}
typedef struct {
@ -60,7 +61,9 @@ typedef struct {
int valSize;
} undoDeleteArg;
static int operateUndoDelete(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
static int op_linear_hash_delete(const LogEntry *e, Page *p) {
abort();
#if 0
const undoDeleteArg * arg = dat;
int keySize = arg->keySize;
int valSize = arg->valSize;
@ -78,13 +81,12 @@ static int operateUndoDelete(int xid, Page * p, lsn_t lsn, recordid rid, const v
ThashInstantInsert(xid, rid, argBytes, keySize,
argBytes + keySize, valSize);
return 0;
#endif
}
//statiint noop (int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { pageWriteLSN(xid, p, lsn); return 0; }
Operation getLinearInsert() {
Operation o = {
OPERATION_LINEAR_INSERT,
SIZEOF_RECORD,
OPERATION_UNDO_LINEAR_INSERT,
&noop
};
@ -93,7 +95,6 @@ Operation getLinearInsert() {
Operation getLinearDelete() {
Operation o = {
OPERATION_LINEAR_DELETE,
SIZEOF_RECORD,
OPERATION_UNDO_LINEAR_DELETE,
&noop
};
@ -104,9 +105,8 @@ Operation getLinearDelete() {
Operation getUndoLinearInsert() {
Operation o = {
OPERATION_UNDO_LINEAR_INSERT,
SIZEOF_RECORD,
OPERATION_NOOP,
&operateUndoInsert
op_linear_hash_undo_insert
};
return o;
}
@ -114,9 +114,8 @@ Operation getUndoLinearInsert() {
Operation getUndoLinearDelete() {
Operation o = {
OPERATION_UNDO_LINEAR_DELETE,
SIZEOF_RECORD,
OPERATION_NOOP,
&operateUndoDelete
op_linear_hash_delete
};
return o;
}
@ -128,7 +127,7 @@ void TlogicalHashInsert(int xid, recordid hashRid, void * key, int keySize, void
hashRid.slot = valSize;
hashRid.size = keySize;
TupdateRaw(xid, hashRid, key, OPERATION_LINEAR_INSERT);
TupdateRaw(xid, hashRid, key, keySize, OPERATION_LINEAR_INSERT);
/* Perform redo-only insert. */
hashRid.size = sizeof(hashEntry) + keySize + valSize;
@ -155,7 +154,7 @@ int TlogicalHashDelete(int xid, recordid hashRid, void * key, int keySize, void
hashRid.size = sizeof(undoDeleteArg) + keySize + valSize;
TupdateRaw(xid, hashRid, arg, OPERATION_LINEAR_DELETE);
TupdateRaw(xid, hashRid, arg, sizeof(undoDeleteArg) + keySize + valSize, OPERATION_LINEAR_DELETE);
hashRid.size = sizeof(hashEntry) + keySize + valSize;
free(arg);
/* hashRid.size = sizeof(recordid); */
@ -202,7 +201,8 @@ if(mycount <= 0 && !(mycount * -1) % FF_AM) { */
pthread_mutex_lock(&exp_slow_mutex);
// int j;
TarrayListInstantExtend(xid, hash, 1 /*AMORTIZE*/);
abort();
// TarrayListInstantExtend(xid, hash, 1 /*AMORTIZE*/);
// pthread_mutex_lock(&linearHashMutex); //Already hold this!

View file

@ -57,7 +57,7 @@ compensated_function recordid ThashCreate(int xid, int keySize, int valueSize) {
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(recordid));
} else {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(lladd_linkedList_entry) + keySize + valueSize);
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(stasis_linkedList_entry) + keySize + valueSize);
}
} end_ret(NULLRID);
try_ret(NULLRID) {
@ -115,8 +115,9 @@ typedef struct {
int valueSize;
} linearHash_remove_arg;*/
compensated_function static int operateInsert(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
const linearHash_remove_arg * args = dat;
//compensated_function static int operateInsert(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
compensated_function static int op_linear_hash_insert(const LogEntry* e, Page* p) {
const linearHash_remove_arg * args = (const linearHash_remove_arg*)getUpdateArgs(e);
recordid hashHeader = args->hashHeader;
int keySize = args->keySize;
int valueSize = args->valueSize;
@ -127,19 +128,19 @@ compensated_function static int operateInsert(int xid, Page *p, lsn_t lsn, reco
byte * value = ((byte*)(args+1))+ keySize;
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
__ThashInsert(xid, hashHeader, key, keySize, value, valueSize);
__ThashInsert(e->xid, hashHeader, key, keySize, value, valueSize);
} compensate_ret(compensation_error());
return 0;
}
compensated_function static int operateRemove(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
const linearHash_insert_arg * args = dat;
compensated_function static int op_linear_hash_remove(const LogEntry* e, Page* p) {
const linearHash_insert_arg * args = (const linearHash_insert_arg*) getUpdateArgs(e);
recordid hashHeader = args->hashHeader;
int keySize = args->keySize;
byte * key = (byte*)(args + 1);
begin_action_ret(pthread_mutex_unlock, &linear_hash_mutex, compensation_error()) {
pthread_mutex_lock(&linear_hash_mutex);
__ThashRemove(xid, hashHeader, key, keySize);
__ThashRemove(e->xid, hashHeader, key, keySize);
} compensate_ret(compensation_error());
return 0;
@ -148,9 +149,8 @@ Operation getLinearHashInsert() {
Operation o = {
// OPERATION_LINEAR_HASH_INSERT,
OPERATION_NOOP,
SIZEIS_PAGEID,
OPERATION_LINEAR_HASH_REMOVE,
&operateInsert
&op_linear_hash_insert
// &noop
};
return o;
@ -159,9 +159,8 @@ Operation getLinearHashRemove() {
Operation o = {
// OPERATION_LINEAR_HASH_REMOVE,
OPERATION_NOOP,
SIZEIS_PAGEID,
OPERATION_LINEAR_HASH_INSERT,
&operateRemove
&op_linear_hash_remove
//&noop
};
return o;
@ -378,7 +377,7 @@ compensated_function static void ThashSplitBucket(int xid, recordid hashHeader,
free(value);
}
} else {
lladd_linkedList_iterator * it = TlinkedListIterator(xid, old_bucket_rid, lhh->keySize, lhh->valueSize);
stasis_linkedList_iterator * it = TlinkedListIterator(xid, old_bucket_rid, lhh->keySize, lhh->valueSize);
byte * key, *value;
int keySize, valueSize;
while(TlinkedListNext(xid, it, &key, &keySize, &value, &valueSize)) {

View file

@ -18,7 +18,7 @@
To access an entry's contents:
lladd_linkedList_entry * entry;
stasis_linkedList_entry * entry;
...
if(entry->size) {
key = (byte*)(entry + 1);
@ -32,7 +32,7 @@
To get the successor in the list:
lladd_linkedList_entry next = entry->next;
stasis_linkedList_entry next = entry->next;
@ -56,16 +56,16 @@ compensated_function static int __TlinkedListRemove(int xid, recordid list, con
typedef struct {
recordid list;
int keySize;
} lladd_linkedListInsert_log;
} stasis_linkedListInsert_log;
typedef struct {
recordid list;
int keySize;
int valueSize;
} lladd_linkedListRemove_log;
} stasis_linkedListRemove_log;
compensated_function static int operateInsert(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
compensated_function static int op_linked_list_nta_insert(const LogEntry* e, Page* p) {
assert(!p);
lladd_linkedListRemove_log * log = (lladd_linkedListRemove_log*)dat;
stasis_linkedListRemove_log * log = (stasis_linkedListRemove_log*)getUpdateArgs(e);;
byte * key;
byte * value;
@ -80,15 +80,15 @@ compensated_function static int operateInsert(int xid, Page *p, lsn_t lsn, reco
// printf("Operate insert called: rid.page = %d keysize = %d valuesize = %d %d {%d %d %d}\n", rid.page, log->keySize, log->valueSize, *(int*)key, value->page, value->slot, value->size);
// Skip writing the undo! Recovery will write a CLR after we're done, effectively
// wrapping this in a nested top action, so we needn't worry about that either.
__TlinkedListInsert(xid, log->list, key, keySize, value, valueSize);
__TlinkedListInsert(e->xid, log->list, key, keySize, value, valueSize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
return 0;
}
compensated_function static int operateRemove(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
compensated_function static int op_linked_list_nta_remove(const LogEntry *e, Page* p) {
assert(!p);
lladd_linkedListRemove_log * log = (lladd_linkedListRemove_log*)dat;
stasis_linkedListRemove_log * log = (stasis_linkedListRemove_log*)getUpdateArgs(e);
byte * key;
int keySize;
@ -99,7 +99,7 @@ compensated_function static int operateRemove(int xid, Page *p, lsn_t lsn, reco
pthread_mutex_lock(&linked_list_mutex);
// printf("Operate remove called: %d\n", *(int*)key);
// Don't call the version that writes an undo entry!
__TlinkedListRemove(xid, log->list, key, keySize);
__TlinkedListRemove(e->xid, log->list, key, keySize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
return 0;
@ -111,7 +111,7 @@ compensated_function int TlinkedListInsert(int xid, recordid list, const byte *
ret = TlinkedListRemove(xid, list, key, keySize);
} end_ret(compensation_error()); */
lladd_linkedListInsert_log * undoLog = malloc(sizeof(lladd_linkedListInsert_log) + keySize);
stasis_linkedListInsert_log * undoLog = malloc(sizeof(stasis_linkedListInsert_log) + keySize);
undoLog->list = list;
undoLog->keySize = keySize;
@ -119,7 +119,7 @@ compensated_function int TlinkedListInsert(int xid, recordid list, const byte *
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
(byte*)undoLog, sizeof(lladd_linkedListInsert_log) + keySize);
(byte*)undoLog, sizeof(stasis_linkedListInsert_log) + keySize);
free(undoLog);
__TlinkedListInsert(xid, list, key, keySize, value, valueSize);
TendNestedTopAction(xid, handle);
@ -133,18 +133,16 @@ compensated_function int TlinkedListInsert(int xid, recordid list, const byte *
Operation getLinkedListInsert() {
Operation o = {
OPERATION_NOOP,
SIZEIS_PAGEID,
OPERATION_LINKED_LIST_REMOVE,
&operateInsert
&op_linked_list_nta_insert
};
return o;
}
Operation getLinkedListRemove() {
Operation o = {
OPERATION_NOOP,
SIZEIS_PAGEID,
OPERATION_LINKED_LIST_INSERT,
&operateRemove
&op_linked_list_nta_remove
};
return o;
}
@ -153,7 +151,7 @@ compensated_function static void __TlinkedListInsert(int xid, recordid list, con
try {
lladd_linkedList_entry * entry = malloc(sizeof(lladd_linkedList_entry) + keySize + valueSize);
stasis_linkedList_entry * entry = malloc(sizeof(stasis_linkedList_entry) + keySize + valueSize);
Tread(xid, list, entry);
if(!entry->next.size) {
@ -164,11 +162,11 @@ compensated_function static void __TlinkedListInsert(int xid, recordid list, con
entry->next.size = -1;
Tset(xid, list, entry);
} else {
lladd_linkedList_entry * newEntry = malloc(sizeof(lladd_linkedList_entry) + keySize + valueSize);
stasis_linkedList_entry * newEntry = malloc(sizeof(stasis_linkedList_entry) + keySize + valueSize);
memcpy(newEntry + 1, key, keySize);
memcpy(((byte*)(newEntry+1))+keySize, value, valueSize);
newEntry->next = entry->next;
recordid newRid = Talloc(xid, sizeof(lladd_linkedList_entry) + keySize + valueSize);
recordid newRid = Talloc(xid, sizeof(stasis_linkedList_entry) + keySize + valueSize);
Tset(xid, newRid, newEntry);
entry->next = newRid;
Tset(xid, list, entry);
@ -180,7 +178,7 @@ compensated_function static void __TlinkedListInsert(int xid, recordid list, con
compensated_function int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
lladd_linkedList_entry * entry = malloc(list.size);
stasis_linkedList_entry * entry = malloc(list.size);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, -2) {
pthread_mutex_lock(&linked_list_mutex);
@ -200,7 +198,7 @@ compensated_function int TlinkedListFind(int xid, recordid list, const byte * ke
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
int valueSize = list.size - (sizeof(lladd_linkedList_entry) + keySize);
int valueSize = list.size - (sizeof(stasis_linkedList_entry) + keySize);
*value = malloc(valueSize);
memcpy(*value, ((byte*)(entry+1))+keySize, valueSize);
done = 1;
@ -237,8 +235,8 @@ compensated_function int TlinkedListRemove(int xid, recordid list, const byte *
return 0;
}
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
int entrySize = sizeof(lladd_linkedListRemove_log) + keySize + valueSize;
lladd_linkedListRemove_log * undoLog = malloc(entrySize);
int entrySize = sizeof(stasis_linkedListRemove_log) + keySize + valueSize;
stasis_linkedListRemove_log * undoLog = malloc(entrySize);
undoLog->list = list;
undoLog->keySize = keySize;
@ -247,7 +245,7 @@ compensated_function int TlinkedListRemove(int xid, recordid list, const byte *
memcpy(undoLog+1, key, keySize);
memcpy(((byte*)(undoLog+1))+keySize, value, valueSize);
// printf("entry size %d sizeof(remove_log)%d keysize %d valuesize %d sizeof(rid) %d key %d value {%d %d %ld}\n",
// entrySize, sizeof(lladd_linkedListRemove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
// entrySize, sizeof(stasis_linkedListRemove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
(byte*)undoLog, entrySize);
free(value);
@ -261,7 +259,7 @@ compensated_function int TlinkedListRemove(int xid, recordid list, const byte *
}
compensated_function static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
lladd_linkedList_entry * entry = malloc(list.size);
stasis_linkedList_entry * entry = malloc(list.size);
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
@ -292,14 +290,14 @@ compensated_function static int __TlinkedListRemove(int xid, recordid list, cons
Tset(xid, lastRead, entry);
} else {
assert(entry->next.size == list.size); // Otherwise, something strange is happening, or the list contains entries with variable sizes.
lladd_linkedList_entry * entry2 = malloc(list.size);
stasis_linkedList_entry * entry2 = malloc(list.size);
Tread(xid, entry->next, entry2);
Tdealloc(xid, entry->next); // could break iterator, since it writes one entry ahead.
Tset(xid, lastRead, entry2);
free(entry2);
}
} else {
lladd_linkedList_entry * entry2 = malloc(list.size);
stasis_linkedList_entry * entry2 = malloc(list.size);
assert(oldLastRead.size != -2);
Tread(xid, oldLastRead, entry2);
memcpy(&(entry2->next), &(entry->next), sizeof(recordid));
@ -355,8 +353,8 @@ compensated_function int TlinkedListMove(int xid, recordid start_list, recordid
compensated_function recordid TlinkedListCreate(int xid, int keySize, int valueSize) {
recordid ret;
try_ret(NULLRID) {
ret = Talloc(xid, sizeof(lladd_linkedList_entry) + keySize + valueSize);
byte * cleared = calloc(sizeof(lladd_linkedList_entry) + keySize + valueSize, sizeof(byte));
ret = Talloc(xid, sizeof(stasis_linkedList_entry) + keySize + valueSize);
byte * cleared = calloc(sizeof(stasis_linkedList_entry) + keySize + valueSize, sizeof(byte));
Tset(xid, ret, cleared);
free(cleared);
} end_ret(NULLRID);
@ -364,7 +362,7 @@ compensated_function recordid TlinkedListCreate(int xid, int keySize, int valueS
}
compensated_function void TlinkedListDelete(int xid, recordid list) {
try {
lladd_linkedList_entry * entry = malloc(list.size);
stasis_linkedList_entry * entry = malloc(list.size);
Tread(xid, list, entry);
Tdealloc(xid, list);
@ -385,8 +383,8 @@ compensated_function void TlinkedListDelete(int xid, recordid list) {
} end;
}
compensated_function lladd_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize) {
lladd_linkedList_iterator * it = malloc(sizeof(lladd_linkedList_iterator));
compensated_function stasis_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize) {
stasis_linkedList_iterator * it = malloc(sizeof(stasis_linkedList_iterator));
it->keySize = keySize;
it->valueSize = valueSize;
it->next = list;
@ -394,10 +392,10 @@ compensated_function lladd_linkedList_iterator * TlinkedListIterator(int xid, re
it->listRoot = list;
return it;
}
void TlinkedListClose(int xid, lladd_linkedList_iterator * it) {
void TlinkedListClose(int xid, stasis_linkedList_iterator * it) {
free(it);
}
compensated_function int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int * keySize, byte **value, int * valueSize) {
compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * it, byte ** key, int * keySize, byte **value, int * valueSize) {
if(it->next.size == -1) {
return 0;
@ -405,7 +403,7 @@ compensated_function int TlinkedListNext(int xid, lladd_linkedList_iterator * it
int done = 0;
int ret = 0;
lladd_linkedList_entry * entry;
stasis_linkedList_entry * entry;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
@ -438,7 +436,7 @@ compensated_function int TlinkedListNext(int xid, lladd_linkedList_iterator * it
}
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
assert(it->keySize + it->valueSize + sizeof(lladd_linkedList_entry) == it->next.size);
assert(it->keySize + it->valueSize + sizeof(stasis_linkedList_entry) == it->next.size);
entry = malloc(it->next.size);
Tread(xid, it->next, entry);

View file

@ -3,6 +3,7 @@
#include <limits.h>
#include <assert.h>
#include <stasis/latches.h>
#include <stasis/page.h>
/**
A from-scratch implementation of linear hashing. Uses the
@ -78,7 +79,7 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
hashRid.slot = bucket_number;
nextEntry = hashRid;
found = 0;
while(nextEntry.size != -1 && nextEntry.size != 0) {
@ -99,8 +100,9 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
}
void expand (int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
void expand(int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
/* Total hack; need to do this better, by storing stuff in the hash table headers.*/
static int count = 4096 * .25;
count --;
#define AMORTIZE 1000
@ -167,8 +169,6 @@ void rehash(int xid, recordid hashRid, unsigned int next_split, unsigned int i,
free(D_contents);
free(A_contents);
free(B_contents);
/* printf("Expand was a noop.\n");
fflush(NULL); */
return;
}
@ -216,8 +216,6 @@ void rehash(int xid, recordid hashRid, unsigned int next_split, unsigned int i,
free(D_contents);
free(A_contents);
free(B_contents);
/* printf("Loop 1 returning.\n");
fflush(NULL); */
return;
}
assert(oldANext.size == sizeof(hashEntry) + keySize + valSize);
@ -228,9 +226,7 @@ void rehash(int xid, recordid hashRid, unsigned int next_split, unsigned int i,
new_hash = hash(A_contents+1, keySize, i, UINT_MAX) + 2;
}
/* printf("Got past loop 1\n");
fflush(NULL); */
B = A_contents->next;
while(B.size != -1) {
@ -286,7 +282,7 @@ void rehash(int xid, recordid hashRid, unsigned int next_split, unsigned int i,
}
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete) {
recordid deleteMe;
recordid deleteMe;
if(!skipDelete) {
if(deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, e+1, keySize, valSize, &deleteMe)) {
Tdealloc(xid, deleteMe);
@ -300,10 +296,12 @@ void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry *
/*@todo consider recovery for insertIntoBucket. */
hashRid.slot = bucket_number;
assert(hashRid.size == sizeof(hashEntry) + valSize + keySize);
// Page * p = loadPage(xid, hashRid.page);
// assert(stasis_record_type_to_size(stasis_record_dereference(xid, p, hashRid).size) == sizeof(hashEntry) + valSize + keySize);
// releasePage(p);
Tread(xid, hashRid, bucket_contents);
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
if(!bucket_contents->next.size) { // Size = 0 -> nothing in bucket. Size != 0 -> bucket occupied.
e->next.page = 0;
e->next.slot = 0;
@ -311,13 +309,15 @@ void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry *
Tset(xid, hashRid, e);
} else {
recordid newEntry = Talloc(xid, sizeof(hashEntry) + keySize + valSize);
assert(newEntry.size);
e->next = bucket_contents->next;
bucket_contents->next = newEntry;
assert(newEntry.size == sizeof(hashEntry) + keySize + valSize);
Tset(xid, newEntry, e);
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
Tset(xid, hashRid, bucket_contents);
}
Tread(xid, hashRid, bucket_contents);
assert(bucket_contents->next.size);
}
int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
@ -384,7 +384,8 @@ int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * buck
recordid ThashAlloc(int xid, int keySize, int valSize) {
/* Want 16 buckets, doubling on overflow. */
recordid rid = TarrayListAlloc(xid, 4096, 2, sizeof(hashEntry) + keySize + valSize);
recordid rid = TarrayListAlloc(xid, 4096, 2, sizeof(hashEntry) + keySize + valSize);
assert(rid.size == sizeof(hashEntry) + keySize + valSize);
TarrayListExtend(xid, rid, 4096+2);
recordid * headerRidA = calloc (1, sizeof(recordid) + keySize + valSize);
@ -409,10 +410,10 @@ recordid ThashAlloc(int xid, int keySize, int valSize) {
pblHtInsert(openHashes, &(rid.page), sizeof(int), headerRidB);
assert(headerRidB);
recordid * check = malloc(rid.size);
assert(headerRidB);
Page * p = loadPage(xid, rid.page);
recordid * check = malloc(stasis_record_type_to_size(stasis_record_dereference(xid, p, rid).size));
releasePage(p);
rid.slot = 0;
Tread(xid, rid, check);
assert(headerRidB);
@ -455,10 +456,8 @@ void TnaiveHashInsert(int xid, recordid hashRid,
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(int));
/* printf("header: %d %d\n", headerHashBits, headerNextSplit); */
int bucket = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
hashEntry * e = calloc(1,sizeof(hashEntry) + keySize + valSize);
memcpy(e+1, key, keySize);
memcpy(((byte*)(e+1)) + keySize, val, valSize);
@ -526,9 +525,6 @@ int ThashClose(int xid, recordid hashRid) {
int TnaiveHashLookup(int xid, recordid hashRid, void * key, int keySize, void * buf, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(int));
/* printf("lookup header: %d %d\n", headerHashBits, headerNextSplit); */
recordid tmp = hashRid;
tmp.slot = 1;
int bucket_number = hash(key, keySize, headerHashBits, headerNextSplit - 2) + 2;
int ret = findInBucket(xid, hashRid, bucket_number, key, keySize, buf, valSize);
return ret;

View file

@ -48,7 +48,6 @@ terms specified in this license.
#include <stasis/common.h>
#include <stasis/operations/nestedTopActions.h>
#include <stasis/logger/logger2.h>
#include <pbl/pbl.h>
#include <string.h>
#include <stdlib.h>
#include <stasis/latches.h>
@ -58,65 +57,52 @@ extern pthread_mutex_t transactional_2_mutex;
extern TransactionLog XactionTable[];
pblHashTable_t * nestedTopActions = NULL;
void initNestedTopActions() {
nestedTopActions = pblHtCreate();
}
void deinitNestedTopActions() {
pblHtDelete(nestedTopActions);
void deinitNestedTopActions() {
}
/** @todo TbeginNestedTopAction's API might not be quite right.
typedef struct {
lsn_t prev_lsn;
lsn_t compensated_lsn;
} stasis_nta_handle;
/** @todo TbeginNestedTopAction's API might not be quite right.
Are there cases where we need to pass a recordid in?
@return a handle that must be passed into TendNestedTopAction
*/
void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
recordid rid = NULLRID;
assert(xid >= 0);
rid.page = datSize;
LogEntry * e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], NULL, rid, op, dat);
LogEntry * e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], NULL, op, dat, datSize);
DEBUG("Begin Nested Top Action e->LSN: %ld\n", e->LSN);
lsn_t * prevLSN = malloc(sizeof(lsn_t));
*prevLSN = e->LSN;
pthread_mutex_lock(&transactional_2_mutex);
void * ret = pblHtLookup(nestedTopActions, &xid, sizeof(int));
if(ret) {
pblHtRemove(nestedTopActions, &xid, sizeof(int));
}
pblHtInsert(nestedTopActions, &xid, sizeof(int), prevLSN);
pthread_mutex_unlock(&transactional_2_mutex);
stasis_nta_handle * h = malloc(sizeof(stasis_nta_handle));
h->prev_lsn = e->prevLSN;
h->compensated_lsn = e->LSN;
FreeLogEntry(e);
return ret;
return h;
}
/**
/**
Call this function at the end of a nested top action.
@return the lsn of the CLR. Most users (everyone?) will ignore this.
*/
lsn_t TendNestedTopAction(int xid, void * handle) {
pthread_mutex_lock(&transactional_2_mutex);
lsn_t * prevLSN = pblHtLookup(nestedTopActions, &xid, sizeof(int));
pblHtRemove(nestedTopActions, &xid, sizeof(int));
if(handle) {
pblHtInsert(nestedTopActions, &xid, sizeof(int), handle);
}
stasis_nta_handle * h = handle;
assert(xid >= 0);
// Write a CLR.
lsn_t clrLSN = LogDummyCLR(xid, *prevLSN);
// Ensure that the next action in this transaction points to the CLR.
lsn_t clrLSN = LogDummyCLR(xid, h->prev_lsn, h->compensated_lsn);
// Ensure that the next action in this transaction points to the CLR.
XactionTable[xid % MAX_TRANSACTIONS].prevLSN = clrLSN;
DEBUG("NestedTopAction CLR %d, LSN: %ld type: %ld (undoing: %ld, next to undo: %ld)\n", e->xid,
clrLSN, undoneLSN, *prevLSN);
free(prevLSN);
pthread_mutex_unlock(&transactional_2_mutex);
free(h);
return clrLSN;
}

View file

@ -48,18 +48,13 @@ terms specified in this license.
#include <stasis/operations.h>
#include <stasis/page.h>
int noop(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
/* If p is null, then this is a logical no-op that spans pages, so do nothing.
Otherwise, write the LSN to the appropriate page (to keep recovery happy)
and return */
if(p) stasis_page_lsn_write(xid, p, lsn);
int noop(const LogEntry* e, Page* p) {
return 0;
}
Operation getNoop() {
Operation o = {
OPERATION_NOOP,
0,
Operation o = {
OPERATION_NOOP,
OPERATION_NOOP,
&noop
};

View file

@ -9,13 +9,32 @@
static pthread_mutex_t pageAllocMutex;
int __pageSet(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
memcpy(p->memAddr, d, PAGE_SIZE);
stasis_page_lsn_write(xid, p, lsn);
static int op_page_set_range(const LogEntry* e, Page* p) {
assert(e->update.arg_size >= sizeof(int));
assert(!((e->update.arg_size - sizeof(int)) % 2));
int off = *(int*)getUpdateArgs(e);
int len = (e->update.arg_size - sizeof(int)) >> 1;
assert(off+len <=PAGE_SIZE);
memcpy(p->memAddr + off, getUpdateArgs(e)+sizeof(int), len);
return 0;
}
static int op_page_set_range_inverse(const LogEntry* e, Page* p) {
assert(e->update.arg_size >= sizeof(int));
assert(!((e->update.arg_size - sizeof(int)) % 2));
int off = *(int*)getUpdateArgs(e);
int len = (e->update.arg_size - sizeof(int)) >> 1;
assert(off+len <=PAGE_SIZE);
memcpy(p->memAddr + off, getUpdateArgs(e)+sizeof(int)+len, len);
return 0;
}
compensated_function int TpageGet(int xid, int pageid, byte *memAddr) {
compensated_function int TpageGet(int xid, int pageid, void *memAddr) {
Page * q = 0;
try_ret(compensation_error()) {
q = loadPage(xid, pageid);
@ -27,14 +46,29 @@ compensated_function int TpageGet(int xid, int pageid, byte *memAddr) {
return 0;
}
compensated_function int TpageSet(int xid, int pageid, byte * memAddr) {
compensated_function int TpageSet(int xid, int pageid, const void * memAddr) {
return TpageSetRange(xid, pageid, 0, memAddr, PAGE_SIZE);
}
int TpageSetRange(int xid, int pageid, int offset, const void * memAddr, int len) {
// XXX need to pack offset into front of log entry
recordid rid;
rid.page = pageid;
rid.slot = 0;
rid.size = 0;
try_ret(compensation_error()) {
Tupdate(xid,rid,memAddr, OPERATION_PAGE_SET);
Page * p = loadPage(xid, rid.page);
byte * logArg = malloc(sizeof(int) + 2 * len);
*(int*)logArg = offset;
memcpy(logArg+sizeof(int), ((const byte*)memAddr), len);
memcpy(logArg+sizeof(int)+len, p->memAddr+offset, len);
try_ret(compensation_error()) {
Tupdate(xid,rid,logArg,sizeof(int)+len*2,OPERATION_PAGE_SET_RANGE);
} end_ret(compensation_error());
free(logArg);
return 0;
}
@ -68,10 +102,11 @@ compensated_function int TpageAlloc(int xid /*, int type */) {
return TregionAlloc(xid, 1, STORAGE_MANAGER_NAIVE_PAGE_ALLOC);
}
int __fixedPageAlloc(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
int op_fixed_page_alloc(const LogEntry* e, Page* p) {
writelock(p->rwlatch,0);
stasis_fixed_initialize_page(p, r.size, stasis_fixed_records_per_page(r.size));
stasis_page_lsn_write(xid, p, lsn);
assert(e->update.arg_size == sizeof(int));
int slot_size = *(const int*)getUpdateArgs(e);
stasis_fixed_initialize_page(p, slot_size, stasis_fixed_records_per_page(slot_size));
unlock(p->rwlatch);
return 0;
}
@ -84,17 +119,19 @@ int __fixedPageAlloc(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
*/
recordid TfixedPageAlloc(int xid, int size) {
int page = TpageAlloc(xid);
recordid rid = {page, stasis_fixed_records_per_page(size), size};
Tupdate(xid, rid, 0, OPERATION_FIXED_PAGE_ALLOC);
Tupdate(xid, rid, &size, sizeof(int), OPERATION_FIXED_PAGE_ALLOC);
return rid;
}
Operation getFixedPageAlloc() {
Operation o = {
OPERATION_FIXED_PAGE_ALLOC,
0,
OPERATION_NOOP,
&__fixedPageAlloc
&op_fixed_page_alloc
};
return o;
}
@ -148,12 +185,20 @@ int TpageGetType(int xid, int pageid) {
*/
Operation getPageSet() {
Operation getPageSetRange() {
Operation o = {
OPERATION_PAGE_SET,
PAGE_SIZE, /* This is the type of the old page, for undo purposes */
/*OPERATION_PAGE_SET, */ NO_INVERSE_WHOLE_PAGE,
&__pageSet
OPERATION_PAGE_SET_RANGE,
OPERATION_PAGE_SET_RANGE_INVERSE,
op_page_set_range
};
return o;
}
Operation getPageSetRangeInverse() {
Operation o = {
OPERATION_PAGE_SET_RANGE_INVERSE,
OPERATION_PAGE_SET_RANGE,
&op_page_set_range_inverse
};
return o;
}

View file

@ -54,17 +54,16 @@ terms specified in this license.
#include <stdio.h>
recordid prepare_bogus_rec = { 0, 0, 0};
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
LogForce(lsn);
static int op_prepare(const LogEntry * e, Page * p) {
LogForce(e->LSN);
return 0;
}
Operation getPrepare() {
Operation o = {
OPERATION_PREPARE, /* id */
0, /* No extra data. */
OPERATION_NOOP,
&operate /* Function */
&op_prepare /* Function */
};
return o;
}

View file

@ -1,6 +1,7 @@
#include "config.h"
#include <stasis/page.h>
#include <stasis/operations.h>
#include <stasis/logger/logger2.h>
#include <assert.h>
typedef struct regionAllocLogArg{
@ -20,37 +21,38 @@ static void TdeallocBoundaryTag(int xid, unsigned int page);
/** This doesn't need a latch since it is only initiated within nested
top actions (and is local to this file. During abort(), the nested
top action's logical undo grabs the necessary latches.
@todo opearate_alloc_boundary_tag is executed without holding the
proper mutex during REDO. For now this doesn't matter, but it
could matter in the future.
*/
static int operate_alloc_boundary_tag(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
static int op_alloc_boundary_tag(const LogEntry* e, Page* p) {
writelock(p->rwlatch, 0);
stasis_slotted_initialize_page(p);
recordid rid = {p->id, 0, sizeof(boundary_tag)};
assert(e->update.arg_size == sizeof(boundary_tag));
*stasis_page_type_ptr(p) = BOUNDARY_TAG_PAGE;
stasis_record_alloc_done(xid, p, rid);
stasis_page_lsn_write(xid, p, lsn);
byte * buf = stasis_record_write_begin(xid, p, rid);
memcpy(buf, dat, stasis_record_length_read(xid, p, rid));
stasis_record_alloc_done(e->xid, p, rid);
byte * buf = stasis_record_write_begin(e->xid, p, rid);
memcpy(buf, getUpdateArgs(e), stasis_record_length_read(e->xid, p, rid));
stasis_record_write_done(e->xid, p, rid, buf);
unlock(p->rwlatch);
return 0;
}
static int operate_alloc_region(int xid, Page * p, lsn_t lsn, recordid rid, const void * datP) {
static int op_alloc_region(const LogEntry *e, Page* p) {
pthread_mutex_lock(&region_mutex);
assert(0 == holding_mutex);
holding_mutex = pthread_self();
regionAllocArg *dat = (regionAllocArg*)datP;
TregionAllocHelper(xid, dat->startPage, dat->pageCount, dat->allocationManager);
regionAllocArg *dat = (regionAllocArg*)getUpdateArgs(e);
TregionAllocHelper(e->xid, dat->startPage, dat->pageCount, dat->allocationManager);
holding_mutex = 0;
pthread_mutex_unlock(&region_mutex);
return 0;
}
static int operate_dealloc_region_unlocked(int xid, Page * p, lsn_t lsn, recordid rid, const void * datP) {
regionAllocArg *dat = (regionAllocArg*)datP;
static int operate_dealloc_region_unlocked(int xid, regionAllocArg *dat) {
unsigned int firstPage = dat->startPage + 1;
boundary_tag t;
@ -60,21 +62,20 @@ static int operate_dealloc_region_unlocked(int xid, Page * p, lsn_t lsn, recordi
t.status = REGION_VACANT;
t.region_xid = xid;
TsetBoundaryTag(xid, firstPage -1, &t);
// TregionDealloc(xid, dat->startPage+1);
return 0;
}
static int operate_dealloc_region(int xid, Page * p, lsn_t lsn, recordid rid, const void * datP) {
static int op_dealloc_region(const LogEntry* e, Page* p) {
int ret;
pthread_mutex_lock(&region_mutex);
assert(0 == holding_mutex);
holding_mutex = pthread_self();
ret = operate_dealloc_region_unlocked(xid, p, lsn, rid, datP);
ret = operate_dealloc_region_unlocked(e->xid, (regionAllocArg*)getUpdateArgs(e));
holding_mutex = 0;
pthread_mutex_unlock(&region_mutex);
@ -85,8 +86,8 @@ static int operate_dealloc_region(int xid, Page * p, lsn_t lsn, recordid rid, co
static void TallocBoundaryTag(int xid, unsigned int page, boundary_tag* tag) {
//printf("Alloc boundary tag at %d = { %d, %d, %d }\n", page, tag->size, tag->prev_size, tag->status);
assert(holding_mutex == pthread_self());
recordid rid = {page, 0, sizeof(boundary_tag)};
Tupdate(xid, rid, tag, OPERATION_ALLOC_BOUNDARY_TAG);
recordid rid = {page, 0, 0};
Tupdate(xid, rid, tag, sizeof(boundary_tag), OPERATION_ALLOC_BOUNDARY_TAG);
}
int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
@ -95,7 +96,6 @@ int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
if(TpageGetType(xid, rid.page) != BOUNDARY_TAG_PAGE) {
return 0;
}
// assert(TpageGetType(xid, rid.page) == BOUNDARY_TAG_PAGE);
Tread(xid, rid, tag);
assert((page == 0 && tag->prev_size == UINT32_MAX) || (page != 0 && tag->prev_size != UINT32_MAX));
//printf("Read boundary tag at %d = { %d, %d, %d }\n", page, tag->size, tag->prev_size, tag->status);
@ -113,7 +113,7 @@ int TregionReadBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
static void TsetBoundaryTag(int xid, unsigned int page, boundary_tag* tag) {
//printf("Write boundary tag at %d = { %d, %d, %d }\n", page, tag->size, tag->prev_size, tag->status);
// Sanity checking:
assert((page == 0 && tag->prev_size == UINT32_MAX) || (page != 0 && tag->prev_size < UINT32_MAX/2));
assert(holding_mutex == pthread_self());
@ -159,9 +159,14 @@ void regionsInit() {
// flush the page, since this code is deterministic, and will be
// re-run before recovery if this update doesn't make it to disk
// after a crash.
recordid rid = {0,0,sizeof(boundary_tag)};
// recordid rid = {0,0,sizeof(boundary_tag)};
operate_alloc_boundary_tag(0,p,0,rid,&t);
// hack; allocate a fake log entry; pass it into ourselves.
LogEntry * e = allocUpdateLogEntry(0,0,OPERATION_ALLOC_BOUNDARY_TAG,
p->id, (const byte*)&t, sizeof(boundary_tag));
op_alloc_boundary_tag(e,p);
FreeLogEntry(e);
}
holding_mutex = 0;
releasePage(p);
@ -375,8 +380,7 @@ static void consolidateRegions(int xid, unsigned int * firstPage, boundary_tag
assert(ret);
succ_tag.prev_size = pred_tag.size;
TsetBoundaryTag(xid, succ_page, &succ_tag);
// assert(succ_tag.status != REGION_VACANT);
assert(succ_page - pred_page - 1 == pred_tag.size);
}
@ -420,12 +424,7 @@ void TregionDealloc(int xid, unsigned int firstPage) {
void * handle = TbeginNestedTopAction(xid, OPERATION_DEALLOC_REGION, (const byte*)&arg, sizeof(regionAllocArg));
operate_dealloc_region_unlocked(xid, 0, 0, NULLRID, (const byte*)&arg);
/*t.status = REGION_VACANT;
t.region_xid = xid;
TsetBoundaryTag(xid, firstPage -1, &t); */
operate_dealloc_region_unlocked(xid, &arg);
firstPage --;
@ -491,9 +490,8 @@ unsigned int TregionAlloc(int xid, unsigned int pageCount, int allocationManager
Operation getAllocBoundaryTag() {
Operation o = {
OPERATION_ALLOC_BOUNDARY_TAG,
sizeof(boundary_tag),
OPERATION_NOOP,
&operate_alloc_boundary_tag
op_alloc_boundary_tag
};
return o;
}
@ -501,9 +499,16 @@ Operation getAllocBoundaryTag() {
Operation getAllocRegion() {
Operation o = {
OPERATION_ALLOC_REGION,
sizeof(regionAllocArg),
OPERATION_DEALLOC_REGION,
&operate_alloc_region
OPERATION_ALLOC_REGION_INVERSE,
noop
};
return o;
}
Operation getAllocRegionInverse() {
Operation o = {
OPERATION_ALLOC_REGION_INVERSE,
OPERATION_NOOP, // XXX need INVALID or something
op_dealloc_region
};
return o;
}
@ -511,9 +516,17 @@ Operation getAllocRegion() {
Operation getDeallocRegion() {
Operation o = {
OPERATION_DEALLOC_REGION,
sizeof(regionAllocArg),
OPERATION_ALLOC_REGION,
&operate_dealloc_region
OPERATION_DEALLOC_REGION_INVERSE,
noop
};
return o;
}
Operation getDeallocRegionInverse() {
Operation o = {
OPERATION_DEALLOC_REGION_INVERSE,
OPERATION_NOOP, // XXX should be INVALID
op_alloc_region
};
return o;
}

View file

@ -46,53 +46,144 @@ terms specified in this license.
**********************************************/
#include <stasis/operations.h>
#include <stasis/blobManager.h>
#include <stasis/page.h>
#include <string.h>
#include <assert.h>
static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
stasis_record_write(xid, p, lsn, rid, dat);
return 0;
static int op_set(const LogEntry *e, Page *p) {
readlock(p->rwlatch,0);
assert(e->update.arg_size >= sizeof(slotid_t) + sizeof(int64_t));
const byte * b = getUpdateArgs(e);
recordid rid;
rid.page = p->id;
rid.slot = *(slotid_t*)b; b+=sizeof(slotid_t);
rid.size = *(int64_t*)b; b+=sizeof(int64_t);
assert(e->update.arg_size == sizeof(slotid_t) + sizeof(int64_t) + 2 * rid.size);
assert(stasis_record_type_to_size(rid.size) == rid.size);
assert(stasis_record_length_read(e->xid,p,rid) == rid.size);
stasis_record_write(e->xid, p, e->LSN, rid, b);
unlock(p->rwlatch);
return 0;
}
typedef struct {
int offset;
int realRecordLength;
} set_range_t;
static int op_set_inverse(const LogEntry *e, Page *p) {
readlock(p->rwlatch,0);
assert(e->update.arg_size >= sizeof(slotid_t) + sizeof(int64_t));
const byte * b = getUpdateArgs(e);
recordid rid;
static int operateRange(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
int diffLength = rid.size - sizeof(set_range_t);
assert(! (diffLength % 2));
diffLength /= 2;
const set_range_t * range = dat;
rid.size = range->realRecordLength;
rid.page = p->id;
rid.slot = *(slotid_t*)b; b+=sizeof(slotid_t);
rid.size = *(int64_t*)b; b+=sizeof(int64_t);
byte * data = (byte*)(range + 1);
byte * tmp = malloc(rid.size);
assert(e->update.arg_size == sizeof(slotid_t) + sizeof(int64_t) + 2 * rid.size);
assert(stasis_record_type_to_size(rid.size) == rid.size);
stasis_record_read(xid, p, rid, tmp);
memcpy(tmp+range->offset, data, diffLength);
stasis_record_write(xid, p, lsn, rid, tmp);
stasis_record_write(e->xid, p, e->LSN, rid, b+rid.size);
unlock(p->rwlatch);
free(tmp);
return 0;
}
static int deOperateRange(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
int diffLength = rid.size - sizeof(set_range_t);
typedef struct {
int offset;
slotid_t slot;
} set_range_t;
int Tset(int xid, recordid rid, const void * dat) {
Page * p = loadPage(xid, rid.page);
rid = stasis_record_dereference(xid,p,rid);
rid.size = stasis_record_type_to_size(rid.size);
if(rid.size > BLOB_THRESHOLD_SIZE) {
writeBlob(xid,p,rid,dat);
releasePage(p);
} else {
releasePage(p);
size_t sz = sizeof(slotid_t) + sizeof(int64_t) + 2 * rid.size;
byte * const buf = malloc(sz);
byte * b = buf;
*(slotid_t*) b = rid.slot; b += sizeof(slotid_t);
*(int64_t*) b = rid.size; b += sizeof(int64_t);
memcpy(b, dat, rid.size);
b += rid.size;
Tread(xid, rid, b);
Tupdate(xid,rid,buf,sz,OPERATION_SET);
free(buf);
}
return 0;
}
int TsetRaw(int xid, recordid rid, const void * dat) {
rid.size = stasis_record_type_to_size(rid.size);
size_t sz = sizeof(slotid_t) + sizeof(int64_t) + 2 * rid.size;
byte * const buf = malloc(sz);
byte * b = buf;
*(slotid_t*) b = rid.slot; b += sizeof(slotid_t);
*(int64_t*) b = rid.size; b += sizeof(int64_t);
memcpy(b, dat, rid.size);
b += rid.size;
TreadRaw(xid, rid, b);
// XXX get rid of recordid dereference assert in Tupdate, then change this
// to call Tupdate
TupdateRaw(xid,rid,buf,sz,OPERATION_SET);
free(buf);
return 0;
}
static int op_set_range(const LogEntry* e, Page* p) {
readlock(p->rwlatch,0);
int diffLength = e->update.arg_size - sizeof(set_range_t);
assert(! (diffLength % 2));
diffLength /= 2;
const set_range_t * range = dat;
rid.size = range->realRecordLength;
diffLength >>= 1;
const set_range_t * range = (const set_range_t*)getUpdateArgs(e);
recordid rid;
rid.page = p->id;
rid.slot = range->slot;
rid.size = stasis_record_length_read(e->xid,p,rid);
byte * data = (byte*)(range + 1);
data += diffLength;
byte * tmp = malloc(rid.size);
stasis_record_read(xid, p, rid, tmp);
stasis_record_read(e->xid, p, rid, tmp);
memcpy(tmp+range->offset, data, diffLength);
stasis_record_write(xid, p, lsn, rid, tmp);
stasis_record_write(e->xid, p, e->LSN, rid, tmp);
free(tmp);
unlock(p->rwlatch);
return 0;
}
static int op_set_range_inverse(const LogEntry* e, Page* p) {
readlock(p->rwlatch,0);
int diffLength = e->update.arg_size - sizeof(set_range_t);
assert(! (diffLength % 2));
diffLength >>= 1;
const set_range_t * range = (const set_range_t*)getUpdateArgs(e);
recordid rid;
rid.page = p->id;
rid.slot = range->slot;
rid.size = stasis_record_length_read(e->xid,p,rid);
byte * data = (byte*)(range + 1) + diffLength;
byte * tmp = malloc(rid.size);
stasis_record_read(e->xid, p, rid, tmp);
memcpy(tmp+range->offset, data, diffLength);
stasis_record_write(e->xid, p, e->LSN, rid, tmp);
free(tmp);
unlock(p->rwlatch);
return 0;
}
compensated_function void TsetRange(int xid, recordid rid, int offset, int length, const void * dat) {
@ -102,41 +193,47 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt
p = loadPage(xid, rid.page);
} end;
/// XXX rewrite without malloc (use read_begin, read_done)
set_range_t * range = malloc(sizeof(set_range_t) + 2 * length);
byte * record = malloc(rid.size);
range->offset = offset;
range->realRecordLength = rid.size;
range->slot = rid.slot;
// Copy new value into log structure
memcpy(range + 1, dat, length);
// No further locking is necessary here; readRecord protects the
// page layout, but attempts at concurrent modification have undefined
// results. (See page.c)
stasis_record_read(xid, p, rid, record);
// Copy old value into log structure
// Copy old value into log structure
memcpy((byte*)(range + 1) + length, record+offset, length);
// Pass size of range into Tupdate via the recordid.
rid.size = sizeof(set_range_t) + 2 * length;
free(record);
/** @todo will leak 'range' if interrupted with pthread_cancel */
begin_action(releasePage, p) {
Tupdate(xid, rid, range, OPERATION_SET_RANGE);
Tupdate(xid, rid, range, sizeof(set_range_t) + 2 * length, OPERATION_SET_RANGE);
free(range);
} compensate;
}
Operation getSet() {
Operation o = {
OPERATION_SET, /* id */
SIZEOF_RECORD, /* use the size of the record as size of arg */
NO_INVERSE,
&operate /* Function */
OPERATION_SET,
OPERATION_SET_INVERSE,
op_set
};
return o;
}
Operation getSetInverse() {
Operation o = {
OPERATION_SET_INVERSE,
OPERATION_SET,
op_set_inverse
};
return o;
}
@ -144,9 +241,8 @@ Operation getSet() {
Operation getSetRange() {
Operation o = {
OPERATION_SET_RANGE,
SIZEOF_RECORD,
OPERATION_SET_RANGE_INVERSE,
&operateRange
op_set_range
};
return o;
}
@ -154,9 +250,8 @@ Operation getSetRange() {
Operation getSetRangeInverse() {
Operation o = {
OPERATION_SET_RANGE_INVERSE,
SIZEOF_RECORD,
OPERATION_SET_RANGE,
&deOperateRange
op_set_range_inverse
};
return o;
}

View file

@ -98,11 +98,7 @@ static page_impl page_impls[MAX_PAGE_TYPE];
XXX latching for pageWriteLSN...
*/
void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
// These asserts belong here, but would cause some hacked up unit tests to fail...
// if(!page->dirty) {
// assert(page->LSN < lsn);
// }
// assertlocked(page->rwlatch);
assertlocked(page->rwlatch);
if(page->LSN < lsn) {
page->LSN = lsn;
@ -114,6 +110,7 @@ void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
XXX latching for pageReadLSN...
*/
lsn_t stasis_page_lsn_read(const Page * page) {
assertlocked(page->rwlatch);
return page->LSN;
}
@ -157,33 +154,25 @@ void stasis_record_write(int xid, Page * p, lsn_t lsn, recordid rid, const byte
assert( (p->id == rid.page) && (p->memAddr != NULL) );
readlock(p->rwlatch, 225);
if(rid.size > BLOB_THRESHOLD_SIZE) {
// XXX Kludge This is done so that recovery sees the LSN update. Otherwise, it gets upset... Of course, doing it will break blob recovery unless we set blob writes to do "logical" redo...
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
writeBlob(xid, p, lsn, rid, dat);
} else {
byte * buf = stasis_record_write_begin(xid, p, rid);
stasis_page_lsn_write(xid, p, lsn);
memcpy(buf, dat, stasis_record_length_read(xid, p, rid));
unlock(p->rwlatch);
}
assert(rid.size <= BLOB_THRESHOLD_SIZE);
byte * buf = stasis_record_write_begin(xid, p, rid);
memcpy(buf, dat, stasis_record_length_read(xid, p, rid));
unlock(p->rwlatch);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
}
int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
assert(rid.page == p->id);
if(rid.size > BLOB_THRESHOLD_SIZE) {
readBlob(xid, p, rid, buf);
assert(rid.page == p->id);
return 0;
} else {
readlock(p->rwlatch, 0);
const byte * dat = stasis_record_read_begin(xid,p,rid);
memcpy(buf, dat, stasis_record_length_read(xid,p,rid));
unlock(p->rwlatch);
return 0;
}
assert(rid.size <= BLOB_THRESHOLD_SIZE);
readlock(p->rwlatch, 0);
const byte * dat = stasis_record_read_begin(xid,p,rid);
memcpy(buf, dat, stasis_record_length_read(xid,p,rid));
unlock(p->rwlatch);
return 0;
}
/**
@todo stasis_record_dereference should dispatch via page_impl...

View file

@ -26,7 +26,6 @@
#include <stasis/linkedlist.h>
#include <stasis/page.h> // Needed for pageReadLSN.
static pblHashTable_t * transactionLSN;
static LinkedList * rollbackLSNs = NULL;
/** @todo There is no real reason to have this mutex (which prevents
@ -50,7 +49,9 @@ static pthread_mutex_t rollback_mutex = PTHREAD_MUTEX_INITIALIZER;
no longer reads the pages in, there's no longer any reason to build
the list of dirty pages.
*/
static void Analysis () {
static void Analysis() {
DEBUG("Recovery: Analysis\n");
const LogEntry * e;
@ -83,7 +84,7 @@ static void Analysis () {
rollbackLSNs for it. That value is now stale, so remove
it. */
DEBUG("Removing %ld\n", *xactLSN);
DEBUG("Removing %lld\n", *xactLSN);
removeVal(&rollbackLSNs, *xactLSN);
}
@ -126,14 +127,14 @@ static void Analysis () {
Add it to the list
*/
DEBUG("Adding %ld\n", e->LSN);
DEBUG("Adding %lld\n", e->LSN);
addSortedVal(&rollbackLSNs, e->LSN);
break;
case XABORT:
// If the last record we see for a transaction is an abort, then
// the transaction didn't commit, and must be rolled back.
DEBUG("Adding %ld\n", e->LSN);
DEBUG("Adding %lld\n", e->LSN);
addSortedVal(&rollbackLSNs, e->LSN);
break;
case XPREPARE:
@ -152,52 +153,79 @@ static void Analysis () {
TsetXIDCount(highestXid);
}
/**
Who runs where (if encountered):
Redo Undo
Physical Y Y (generate CLR, 'redo' it)
Logical N (nops) N (if encountered, already did phys undo;
otherwise, CLR masks it)
(clr for logical always gets generated by end NTA)
CLR for Physical Y N (whole point of CLR's is to skup this undo)
CLR for Logical N (could be undone later in Redo; NTA, so xact could commit)
Y (NTA replaces physical undo)
*/
static void Redo() {
LogHandle lh = getLogHandle();
const LogEntry * e;
DEBUG("Recovery: Redo\n");
while((e = nextInLog(&lh))) {
// Is this log entry part of a transaction that needs to be redone?
if(pblHtLookup(transactionLSN, &(e->xid), sizeof(int)) != NULL) {
// Check to see if this entry's action needs to be redone
switch(e->type) {
case UPDATELOG:
{
if(e->update.page == INVALID_PAGE) {
// logical redo; ignore
} else {
redoUpdate(e);
}
} break;
case CLRLOG:
{
// redoUpdate checks the page that contains e->rid, so we
// don't need to check to see if the page is newer than this
// log entry.
redoUpdate(e);
FreeLogEntry(e);
const LogEntry *ce = LogReadLSN(((CLRLogEntry*)e)->clr.compensated_lsn);
if(ce->update.page == INVALID_PAGE) {
// logical redo of end of NTA; no-op
} else {
undoUpdate(ce, e->LSN);
}
FreeLogEntry(ce);
} break;
case XCOMMIT:
{
if(globalLockManager.commit)
globalLockManager.commit(e->xid);
FreeLogEntry(e);
} break;
case XABORT:
{
// wait until undo is complete before informing the lock manager
FreeLogEntry(e);
} break;
case INTERNALLOG:
{
FreeLogEntry(e);
} break;
case XPREPARE:
{
FreeLogEntry(e);
} break;
default:
abort();
}
}
}
FreeLogEntry(e);
}
}
static void Undo(int recovery) {
LogHandle lh;
DEBUG("Recovery: Undo (in recovery = %d)\n", recovery);
while(rollbackLSNs != NULL) {
const LogEntry * e;
lsn_t rollback = popMaxVal(&rollbackLSNs);
@ -214,40 +242,41 @@ static void Undo(int recovery) {
int prepared = 0;
while((!prepared) && (e = previousInTransaction(&lh))) {
thisXid = e->xid;
lsn_t this_lsn, clr_lsn;
switch(e->type) {
case UPDATELOG:
{
// If the rid is valid, load the page for undoUpdate.
// undoUpdate checks the LSN before applying physical undos
if(e->update.page == INVALID_PAGE) {
DEBUG("logical update\n");
Page * p = NULL;
if(e->update.rid.size != -1) {
p = loadPage(thisXid, e->update.rid.page);
// logical undo: no-op; then the NTA didn't complete, and
// we've finished physical undo for this op
} else {
DEBUG("physical update\n");
// Log a CLR for this entry
lsn_t clr_lsn = LogCLR(e);
DEBUG("logged clr\n");
// If this fails, something is wrong with redo or normal operation.
this_lsn = stasis_page_lsn_read(p);
assert(e->LSN <= this_lsn);
} else {
// The log entry is not associated with a particular page.
// (Therefore, it must be an idempotent logical log entry.)
}
clr_lsn = LogCLR(e);
undoUpdate(e, p, clr_lsn);
if(p) {
releasePage(p);
}
undoUpdate(e, clr_lsn);
DEBUG("rolled back clr's update\n");
}
break;
}
case CLRLOG:
// Don't undo CLRs; they were undone during Redo
{
const LogEntry * ce = LogReadLSN(((CLRLogEntry*)e)->clr.compensated_lsn);
if(ce->update.page == INVALID_PAGE) {
DEBUG("logical clr\n");
undoUpdate(ce, 0); // logical undo; effective LSN doesn't matter
} else {
DEBUG("physical clr: op %d lsn %lld\n", ce->update.funcID, ce->LSN);
// no-op. Already undone during redo. This would redo the original op.
}
FreeLogEntry(ce);
}
break;
case XABORT:
printf("Found abort for %d\n", e->xid);
DEBUG("Found abort for %d\n", e->xid);
reallyAborted = 1;
// Since XABORT is a no-op, we can silently ignore it. XABORT
// records may be passed in by undoTrans.
@ -256,21 +285,20 @@ static void Undo(int recovery) {
// Should never abort a transaction that contains a commit record
abort();
case XPREPARE: {
printf("found prepared xact %d\n", e->xid);
DEBUG("found prepared xact %d\n", e->xid);
if(!reallyAborted) {
printf("xact wasn't aborted\n");
DEBUG("xact wasn't aborted\n");
prepared = 1;
Trevive(e->xid, e->LSN, getPrepareRecLSN(e));
} else {
printf("xact was aborted\n");
DEBUG("xact was aborted\n");
}
} break;
default:
printf
DEBUG
("Unknown log type to undo (TYPE=%d,XID= %d,LSN=%lld), skipping...\n",
e->type, e->xid, e->LSN);
fflush(NULL);
abort();
}
FreeLogEntry(e);
@ -279,8 +307,7 @@ static void Undo(int recovery) {
globalLockManager.abort(thisXid);
}
}
}
}
void InitiateRecovery() {
transactionLSN = pblHtCreate();
@ -309,7 +336,7 @@ void undoTrans(TransactionLog transaction) {
assert(!rollbackLSNs);
if(transaction.prevLSN > 0) {
DEBUG("scheduling lsn %ld for undo.\n", transaction.prevLSN);
DEBUG("scheduling xid %d (lsn %lld) for undo.\n", transaction.xid, transaction.prevLSN);
addSortedVal(&rollbackLSNs, transaction.prevLSN);
} else {
/* Nothing to undo. (Happens for read-only xacts.) */

View file

@ -17,6 +17,7 @@
#include <stasis/logger/logger2.h>
#include <stasis/truncation.h>
#include <stasis/io/handle.h>
#include <stasis/blobManager.h> // XXX remove this, move Tread() to set.c
#include <stdio.h>
#include <assert.h>
#include <limits.h>
@ -45,6 +46,7 @@ void setupOperationsTable() {
memset(XactionTable, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS);
// @todo clean out unused constants...
operationsTable[OPERATION_SET] = getSet();
operationsTable[OPERATION_SET_INVERSE] = getSetInverse();
operationsTable[OPERATION_INCREMENT] = getIncrement();
operationsTable[OPERATION_DECREMENT] = getDecrement();
operationsTable[OPERATION_ALLOC] = getAlloc();
@ -53,9 +55,9 @@ void setupOperationsTable() {
operationsTable[OPERATION_LHREMOVE] = getLHRemove(); */
operationsTable[OPERATION_DEALLOC] = getDealloc();
operationsTable[OPERATION_REALLOC] = getRealloc();
/* operationsTable[OPERATION_PAGE_ALLOC] = getPageAlloc();
operationsTable[OPERATION_PAGE_DEALLOC] = getPageDealloc(); */
operationsTable[OPERATION_PAGE_SET] = getPageSet();
operationsTable[OPERATION_PAGE_SET_RANGE] = getPageSetRange();
operationsTable[OPERATION_PAGE_SET_RANGE_INVERSE] = getPageSetRangeInverse();
/* operationsTable[OPERATION_UPDATE_FREESPACE] = getUpdateFreespace();
operationsTable[OPERATION_UPDATE_FREESPACE_INVERSE] = getUpdateFreespaceInverse();
@ -92,7 +94,10 @@ void setupOperationsTable() {
operationsTable[OPERATION_FIXED_PAGE_ALLOC] = getFixedPageAlloc();
operationsTable[OPERATION_ALLOC_REGION] = getAllocRegion();
operationsTable[OPERATION_ALLOC_REGION_INVERSE] = getAllocRegionInverse();
operationsTable[OPERATION_DEALLOC_REGION] = getDeallocRegion();
operationsTable[OPERATION_DEALLOC_REGION_INVERSE] = getDeallocRegionInverse();
}
@ -261,8 +266,8 @@ int Tbegin() {
return XactionTable[index].xid;
}
static compensated_function void TactionHelper(int xid, recordid rid,
const void * dat, int op,
static compensated_function void TactionHelper(int xid, recordid rid,
const void * dat, size_t datlen, int op,
Page * p) {
LogEntry * e;
assert(xid >= 0);
@ -272,7 +277,7 @@ static compensated_function void TactionHelper(int xid, recordid rid,
}
} end;
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, op, dat, datlen);
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
doUpdate(e, p);
@ -280,30 +285,28 @@ static compensated_function void TactionHelper(int xid, recordid rid,
}
compensated_function void TupdateRaw(int xid, recordid rid,
const void * dat, int op) {
// XXX remove this function once it's clear that nobody is failing the assert in Tupdate()
compensated_function void TupdateRaw(int xid, recordid rid, const void * dat, size_t datlen,
int op) {
assert(xid >= 0);
Page * p = loadPage(xid, rid.page);
TactionHelper(xid, rid, dat, op, p);
TactionHelper(xid, rid, dat, datlen, op, p);
releasePage(p);
}
compensated_function void TupdateStr(int xid, recordid rid,
const char *dat, int op) {
Tupdate(xid, rid, dat, op);
compensated_function void TupdateStr(int xid, recordid rid,
const char *dat, size_t datlen, int op) {
Tupdate(xid, rid, dat, datlen, op);
}
compensated_function void Tupdate(int xid, recordid rid,
const void *dat, int op) {
const void *dat, size_t datlen, int op) {
Page * p = loadPage(xid, rid.page);
rid = stasis_record_dereference(xid, p, rid);
if(p->id != rid.page) {
releasePage(p);
p = loadPage(xid, rid.page);
}
TactionHelper(xid, rid, dat, op, p);
recordid rid2 = stasis_record_dereference(xid, p, rid);
assert(rid2.page == rid.page);
TactionHelper(xid, rid, dat, datlen, op, p);
releasePage(p);
}
@ -322,6 +325,22 @@ compensated_function void Tread(int xid, recordid rid, void * dat) {
releasePage(p);
p = loadPage(xid, rid.page);
}
if(rid.size > BLOB_THRESHOLD_SIZE) {
DEBUG("call readBlob %lld %lld %lld\n", (long long)rid.page, (long long)rid.slot, (long long)rid.size);
readBlob(xid,p,rid,dat);
assert(rid.page == p->id);
} else {
stasis_record_read(xid, p, rid, dat);
}
releasePage(p);
}
compensated_function void TreadRaw(int xid, recordid rid, void * dat) {
Page * p;
try {
p = loadPage(xid, rid.page);
} end;
stasis_record_read(xid, p, rid, dat);
releasePage(p);
}
@ -506,6 +525,12 @@ int TisActiveTransaction(int xid) {
return ret;
}
int stasis_transaction_table_set_prev_lsn(int xid, lsn_t prevLSN) {
assert(XactionTable[xid%xidCount%MAX_TRANSACTIONS].xid == xid);
XactionTable[xid%MAX_TRANSACTIONS].prevLSN = prevLSN;
return 0;
}
int TdurabilityLevel() {
if(bufferManagerType == BUFFER_MANAGER_MEM_ARRAY) {
return VOLATILE;

View file

@ -43,6 +43,8 @@ BEGIN_C_DECLS
@todo Update blobManager to (partially) provide a page api
@todo Move blobManager to page and/or operations directory
@ingroup LLADD_CORE
*/
@ -56,7 +58,7 @@ void readBlob(int xid, Page * p, recordid rid, void * buf);
/**
Write the contents of buf to the blob in recordid rid.
*/
void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf);
void writeBlob(int xid, Page * p, recordid rid, const void * buf);
compensated_function recordid preAllocBlob(int xid, long blobsize);
compensated_function recordid preAllocBlobFromPage(int xid, long page, long blobsize);

View file

@ -105,6 +105,9 @@ typedef long long lsn_t;
#define LSN_T_MAX INT64_MAX
typedef long long pageid_t;
#define PAGEID_T_MAX INT64_MAX
typedef int32_t slotid_t;
#define SLOTID_T_MAX INT32_MAX
/*#define DEBUGGING */
/*#define PROFILE_LATCHES*/
@ -118,6 +121,18 @@ typedef long long pageid_t;
#define DEBUG(...)
#endif /*DEBUGGING*/
/**
* represents how to look up a record on a page
* @todo int64_t (for recordid.size) is a stopgap fix.
*/
#pragma pack(push,1)
typedef struct {
pageid_t page;
slotid_t slot;
int64_t size;
} recordid;
#pragma pack(pop)
#include "compensations.h"
#endif /* __stasis_common_h */

View file

@ -55,7 +55,7 @@ terms specified in this license.
#ifndef __CONSTANTS_H__
#define __CONSTANTS_H__
#include <stasis/transactional.h>
/*#define DEBUG 1*/
#define LOG_FILE "logfile.txt"
@ -115,32 +115,31 @@ terms specified in this license.
/** Operation types */
#define NO_INVERSE_WHOLE_PAGE -2
#define NO_INVERSE -1
#define OPERATION_SET 0
#define OPERATION_INCREMENT 1
#define OPERATION_DECREMENT 2
#define OPERATION_ALLOC 3
#define OPERATION_PREPARE 4
#define OPERATION_LHINSERT 5
#define OPERATION_LHREMOVE 6
#define OPERATION_DEALLOC 7
#define OPERATION_REALLOC 8
/*#define OPERATION_PAGE_ALLOC ?
#define OPERATION_PAGE_DEALLOC 9 */
#define OPERATION_PAGE_SET 10
#define OPERATION_UPDATE_FREESPACE 11
#define OPERATION_UPDATE_FREESPACE_INVERSE 12
#define OPERATION_UPDATE_FREELIST 13
#define OPERATION_UPDATE_FREELIST_INVERSE 14
#define OPERATION_FREE_PAGE 15
#define OPERATION_ALLOC_FREED 16
#define OPERATION_UNALLOC_FREED 17
#define OPERATION_NOOP 18
#define OPERATION_INSTANT_SET 19
#define OPERATION_ARRAY_LIST_ALLOC 20
#define OPERATION_INITIALIZE_PAGE 21
// #define OPERATION_UNINITIALIZE_PAGE 22
#define OPERATION_SET_INVERSE 1
#define OPERATION_INCREMENT 2
#define OPERATION_DECREMENT 3
#define OPERATION_ALLOC 4
#define OPERATION_PREPARE 5
//#define OPERATION_LHINSERT 6
//#define OPERATION_LHREMOVE 7
#define OPERATION_DEALLOC 8
#define OPERATION_REALLOC 9
#define OPERATION_PAGE_SET_RANGE 10
#define OPERATION_PAGE_SET_RANGE_INVERSE 11
/*#define OPERATION_UPDATE_FREESPACE 12
#define OPERATION_UPDATE_FREESPACE_INVERSE 13
#define OPERATION_UPDATE_FREELIST 14
#define OPERATION_UPDATE_FREELIST_INVERSE 15
#define OPERATION_FREE_PAGE 16
#define OPERATION_ALLOC_FREED 17
#define OPERATION_UNALLOC_FREED 18 */
#define OPERATION_NOOP 19
#define OPERATION_INSTANT_SET 20
#define OPERATION_ARRAY_LIST_ALLOC 21
#define OPERATION_INITIALIZE_PAGE 22
#define OPERATION_LINEAR_INSERT 23
#define OPERATION_UNDO_LINEAR_INSERT 24
#define OPERATION_LINEAR_DELETE 25
@ -160,7 +159,10 @@ terms specified in this license.
#define OPERATION_FIXED_PAGE_ALLOC 36
#define OPERATION_ALLOC_REGION 37
#define OPERATION_DEALLOC_REGION 38
#define OPERATION_ALLOC_REGION_INVERSE 38
#define OPERATION_DEALLOC_REGION 39
#define OPERATION_DEALLOC_REGION_INVERSE 40
// these operations are specific to OASYS
#define OPERATION_OASYS_DIFF_DO 75
@ -190,6 +192,8 @@ terms specified in this license.
#define NORMAL_SLOT (-3)
#define SLOT_TYPE_END (-4)
#define INVALID_PAGE (-1)
/** Initialized statically in transactional2.c */
extern const short SLOT_TYPE_LENGTHS[];

View file

@ -61,13 +61,17 @@ BEGIN_C_DECLS
typedef struct {
unsigned int funcID : 8;
recordid rid;
unsigned int argSize;
pageid_t page;
int64_t arg_size;
/* Implicit members:
args; @ ((byte*)ule) + sizeof(UpdateLogEntry)
preImage; @ ((byte*)ule) + sizeof(UpdateLogEntry) + ule.argSize */
*/
} UpdateLogEntry;
typedef struct {
lsn_t compensated_lsn;
} CLRLogArgs;
struct __raw_log_entry {
lsn_t LSN;
lsn_t prevLSN;
@ -83,6 +87,13 @@ typedef struct {
UpdateLogEntry update;
} LogEntry;
typedef struct {
lsn_t LSN;
lsn_t prevLSN;
int xid;
unsigned int type;
CLRLogArgs clr;
} CLRLogEntry;
/**
Allocate a log entry that does not contain any extra payload
information. (Eg: Tbegin, Tcommit, etc.)
@ -99,10 +110,10 @@ LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN);
@return a LogEntry that should be freed with free().
*/
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
unsigned int operation, recordid rid,
const byte * args, unsigned int argSize,
const byte * preImage);
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
unsigned int op, pageid_t page,
const byte * arg, unsigned int arg_size);
/**
Allocate a CLR entry. These are written during recovery as log
entries are undone. This moves undo operations into the redo
@ -111,7 +122,7 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
@return a LogEntry that should be freed with free().
*/
LogEntry * allocCLRLogEntry(const LogEntry * old_e);
LogEntry * allocCLRLogEntry(const LogEntry * e);
/**
@return the length, in bytes, of e.
*/
@ -120,11 +131,6 @@ long sizeofLogEntry(const LogEntry * e);
@return the operation's arguments.
*/
const byte * getUpdateArgs(const LogEntry * e);
/**
@return the undo information for operations that use record-based
phsysical undo.
*/
const byte * getUpdatePreImage(const LogEntry * e);
lsn_t getPrepareRecLSN(const LogEntry *e);

View file

@ -70,8 +70,8 @@ terms specified in this license.
#ifndef __LOGWRITER_H__
#define __LOGWRITER_H__
#include <stasis/constants.h>
#include <stasis/common.h>
#include <stasis/logger/logEntry.h>
BEGIN_C_DECLS
/**

View file

@ -157,8 +157,8 @@ lsn_t LogTransAbort(TransactionLog * l);
its operation argument to the extent necessary for allocating and laying out
the log entry. Finally, it updates the state of the parameter l.
*/
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args);
LogEntry * LogUpdate(TransactionLog * l, Page * p, unsigned int operation,
const byte * arg, size_t arg_size);
/**
Any LogEntry that is returned by a function in logger2.h or
@ -180,7 +180,7 @@ void FreeLogEntry(const LogEntry * e);
*/
lsn_t LogCLR(const LogEntry * e);
lsn_t LogDummyCLR(int xid, lsn_t prevLSN);
lsn_t LogDummyCLR(int xid, lsn_t prev_lsn, lsn_t compensated_lsn);
/**
Write a end transaction record @see XEND

View file

@ -76,7 +76,8 @@ BEGIN_C_DECLS
/**
* function pointer that the operation will run
*/
typedef int (*Function)(int xid, Page * p, lsn_t lsn, recordid r, const void *d);
//typedef int (*Function)(int xid, Page * p, slotid_t slot, int64_t arg_size, lsn_t lsn, const void *d);
typedef int (*Function)(const LogEntry* e, Page * p); //, slotid_t slot, int64_t arg_size, lsn_t lsn, const void *d);
/**
@ -87,7 +88,7 @@ typedef int (*Function)(int xid, Page * p, lsn_t lsn, recordid r, const void *d)
size field of the recordid is used to determine the size of the
argument passed into the operation.
*/
#define SIZEOF_RECORD -1
//#define SIZEOF_RECORD -1
/**
Logical log entries (such as those used by nested top actions
have a null recordid, as they are not assoicated with a specific page
@ -98,23 +99,13 @@ typedef int (*Function)(int xid, Page * p, lsn_t lsn, recordid r, const void *d)
operation uses a variable length argument, but is associated with
a specfic page.
*/
#define SIZEIS_PAGEID -2
/** If the Operation struct's undo field is set to this value, then
physical logging is used in lieu of logical logging.
*/
//#define SIZEIS_PAGEID -2
#define NO_INVERSE -1
typedef struct {
/**
* ID of operation, also index into operations table
*/
int id;
/**
This value is the size of the arguments that this operation
takes. If set to SIZEOF_RECORD, then the size of the record
that the operation affects will be used instead.
*/
long sizeofData;
/**
Implementing operations that may span records is subtle.
Recovery assumes that page writes (and therefore logical
@ -191,20 +182,15 @@ extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */
void doUpdate(const LogEntry * e, Page * p);
/** Undo the update under normal operation, and during recovery.
Checks to see if the operation's results are reflected in the
contents of the buffer manager. If they are, then it performs the
undo.
For logical undo, this unconditionally executes the requested operation.
Does not write to the log.
For physical undo, this compares the page LSN to clr_lsn, and runs
it if the page is out of date.
This function does not generate CLR because this would result in
extra CLRs being generated during recovery.
@param e The log entry containing the operation to be undone.
@param p A pointer to the memory resident copy of the page that is being managed by bufferManager.
@param clr_lsn The lsn of the clr that corresponds to this undo operation.
@param e The log entry containing the operation to be undone.
@param clr_lsn The lsn of the clr that records this undo operation.
*/
void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn);
void undoUpdate(const LogEntry * e, lsn_t clr_lsn);
/**
Redoes an operation during recovery. This is different than
doUpdate because it checks to see if the operation needs to be redone

View file

@ -101,13 +101,6 @@ compensated_function recordid TarrayListAlloc(int xid, int numPages, int multipl
@param slots the number of slots to end to the end of the ArrayList.
*/
compensated_function int TarrayListExtend(int xid, recordid rid, int slots);
/**
Do not call this function.
@deprecated This function is known to be broken, and is only
called by a deprecated hash implementation.
*/
compensated_function int TarrayListInstantExtend(int xid, recordid rid, int slots);
/**
Get the length of an ArrayList.

View file

@ -55,8 +55,11 @@ terms specified in this license.
#ifndef __DECREMENT_H__
#define __DECREMENT_H__
#define Tdecrement(xid,rid) Tupdate(xid,rid,0, OPERATION_DECREMENT)
#include <stasis/constants.h>
static inline void Tdecrement(int xid, recordid rid) {
Tupdate(xid,rid,&rid.slot,sizeof(rid.slot),OPERATION_DECREMENT);
}
Operation getDecrement();
#endif

View file

@ -61,8 +61,11 @@ terms specified in this license.
#ifndef __INCREMENT_H__
#define __INCREMENT_H__
#define Tincrement(xid,rid) Tupdate(xid,rid,0, OPERATION_INCREMENT)
#include <stasis/constants.h>
static inline void Tincrement(int xid, recordid rid) {
Tupdate(xid,rid,&rid.slot,sizeof(rid.slot),OPERATION_INCREMENT);
}
Operation getIncrement();
#endif

View file

@ -54,8 +54,10 @@ terms specified in this license.
#ifndef __INSTANT_SET_H__
#define __INSTANT_SET_H__
#define TinstantSet(xid,rid,dat) Tupdate(xid,rid,dat, OPERATION_INSTANT_SET)
/**
XXX never use TinstantSet
*/
int TinstantSet(int xid, recordid rid, const void * dat);
Operation getInstantSet();
Operation getInstantSetRaw();

View file

@ -37,7 +37,7 @@ typedef struct {
int numBuckets;
int keySize;
int valueSize;
lladd_linkedList_iterator * it;
stasis_linkedList_iterator * it;
lladd_pagedList_iterator * pit;
} lladd_hash_iterator;

View file

@ -16,7 +16,7 @@
#define __LINKED_LIST_NTA_H
typedef struct {
recordid next;
} lladd_linkedList_entry;
} stasis_linkedList_entry;
typedef struct {
int keySize;
int valueSize;
@ -27,7 +27,7 @@ typedef struct {
the head of the list, then the iterator needs to reset itself. */
int first;
recordid listRoot;
} lladd_linkedList_iterator;
} stasis_linkedList_iterator;
compensated_function int TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
compensated_function int TlinkedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value);
@ -39,12 +39,12 @@ compensated_function int TlinkedListMove(int xid, recordid start_list, recordid
was first called.
@return a new iterator initialized to the head of the list. */
compensated_function lladd_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize);
void TlinkedListClose(int xid, lladd_linkedList_iterator * it);
compensated_function stasis_linkedList_iterator * TlinkedListIterator(int xid, recordid list, int keySize, int valueSize);
void TlinkedListClose(int xid, stasis_linkedList_iterator * it);
/** @return 1 if there was another entry to be iterated over. 0 otherwise.
If this function returns 1, the caller must free() the malloced memory
returned via the key and value arguments.*/
compensated_function int TlinkedListNext(int xid, lladd_linkedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize);
compensated_function recordid TlinkedListCreate(int xid, int keySize, int ValueSize);
compensated_function void TlinkedListDelete(int xid, recordid list);

View file

@ -55,5 +55,5 @@ terms specified in this license.
#define __NOOP_H__
Operation getNoop();
int noop(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat);
int noop(const LogEntry* e, Page* p);
#endif

View file

@ -64,13 +64,13 @@ compensated_function int TpageAlloc(int xid/*, int type*/);
compensated_function recordid TfixedPageAlloc(int xid, int size);
compensated_function int TpageAllocMany(int xid, int count/*, int type*/);
compensated_function int TpageDealloc(int xid, int pageid);
compensated_function int TpageSet(int xid, int pageid, byte* dat);
compensated_function int TpageGet(int xid, int pageid, byte* buf);
compensated_function int TpageSet(int xid, int pageid, const void* dat);
compensated_function int TpageSetRange(int xid, int pageid, int offset, const void* dat, int len);
compensated_function int TpageGet(int xid, int pageid, void* buf);
int TpageGetType(int xid, int pageid);
/*Operation getPageAlloc();
Operation getPageDealloc(); */
Operation getPageSet();
Operation getPageSetRange();
Operation getPageSetRangeInverse();
Operation getFixedPageAlloc();

View file

@ -78,19 +78,6 @@ terms specified in this license.
//#include <lladd/logger/logEntry.h>
extern recordid prepare_bogus_rec;
/**
Prepare transaction for commit. Currently, a transaction may be
prepared multiple times. Once Tprepare() returns, the caller is
guaranteed that the current transaction will resume exactly where
it was the last time Tprepare() was called.
@todo Tprepare() shouldn't take a record or buffer as arguments...
@param xid Transaction id.
@param rec must be a valid record id. any valid recordid will do. This parameter will be removed eventually.
*/
//#define Tprepare(xid) Tupdate(xid, NULLRID, 0, OPERATION_PREPARE)
Operation getPrepare();

View file

@ -38,7 +38,9 @@ int TregionReadBoundaryTag(int xid, pageid_t pid, boundary_tag *tag);
Operation getAllocBoundaryTag();
Operation getAllocRegion();
Operation getAllocRegionInverse();
Operation getDeallocRegion();
Operation getDeallocRegionInverse();
/** This function checks the regions in the page file for consistency.
It makes sure that the doublly linked list is consistent (eg

View file

@ -60,13 +60,16 @@ terms specified in this license.
@param rid the recordid of the record to be changed.
@param dat the new value of the record.
*/
#define Tset(xid,rid,dat) Tupdate(xid,rid,dat, OPERATION_SET)
int Tset(int xid, recordid rid, const void * dat);
int TsetRaw(int xid, recordid rid, const void * dat);
Operation getSet();
Operation getSetRaw();
Operation getSetInverse();
Operation getSetRangeInverse();
Operation getSetRange();
Operation getSetRangeInverse();
/**
Change an interval of bytes within a record.

View file

@ -859,9 +859,8 @@ int stasis_page_impl_register(page_impl impl);
void stasis_slotted_initialize_page(Page * p);
void stasis_fixed_initialize_page(Page * page, size_t size, int count);
void stasis_indirect_initialize_page(Page * p, int height);
int stasis_fixed_records_per_page(size_t size);
void stasis_blob_initialize_page(Page * p);
END_C_DECLS
#endif

View file

@ -543,18 +543,8 @@ terms specified in this license.
BEGIN_C_DECLS
/**
* represents how to look up a record on a page
* @todo recordid.page should be 64bit.
* @todo int64_t (for recordid.size) is a stopgap fix.
*/
//XXX doesn't belong here.
#pragma pack(push,1)
typedef struct {
int page; // XXX needs to be pageid_t, but that breaks unit tests.
int slot;
int64_t size; //signed long long size;
} recordid;
typedef struct {
size_t offset;
size_t size;
@ -562,7 +552,6 @@ typedef struct {
} blob_record_t;
#pragma pack(pop)
extern const recordid ROOT_RECORD;
extern const recordid NULLRID;
@ -577,8 +566,6 @@ extern const recordid NULLRID;
#define RECORD_ARRAY (-1)
#include "operations.h"
/**
* Currently, Stasis has a fixed number of transactions that may be
* active at one time.
@ -623,11 +610,11 @@ int Tbegin();
* @see operations.h set.h
*/
compensated_function void Tupdate(int xid, recordid rid,
const void *dat, int op);
const void *dat, size_t datlen, int op);
compensated_function void TupdateStr(int xid, recordid rid,
const char *dat, int op);
const char *dat, size_t datlen, int op);
compensated_function void TupdateRaw(int xid, recordid rid,
const void *dat, int op);
const void *dat, size_t datlen, int op);
/**
* Read the value of a record.
*
@ -636,6 +623,15 @@ compensated_function void TupdateRaw(int xid, recordid rid,
* @param dat buffer into which data goes
*/
compensated_function void Tread(int xid, recordid rid, void *dat);
/**
* Read a value of a record without first dereferencing the record.
* Use Tread() unless you're implementing code that provides
* dereferencible records.
*
* @see arrayList for a data structure that uses recordid
* dereferencing to transparently provide records to its callers.
*/
compensated_function void TreadRaw(int xid, recordid rid, void *dat);
compensated_function void TreadStr(int xid, recordid rid, char *dat);
/**
@ -683,6 +679,18 @@ int TuncleanShutdown();
* @param reclsn The lsn of the transaction's BEGIN record.
*/
void Trevive(int xid, lsn_t prevlsn, lsn_t reclsn);
/**
Prepare transaction for commit. Currently, a transaction may be
prepared multiple times. Once Tprepare() returns, the caller is
guaranteed that the current transaction will resume exactly where
it was the last time Tprepare() was called.
@todo move prepare to prepare.[ch]
@param xid Transaction id.
@param rec must be a valid record id. any valid recordid will do. This parameter will be removed eventually.
*/
int Tprepare(int xid);
/**
* Used by the recovery process.
@ -709,6 +717,8 @@ int* TlistActiveTransactions();
*/
int TisActiveTransaction(int xid);
int stasis_transaction_table_set_prev_lsn(int xid, lsn_t lsn);
/**
This is used by log truncation.
*/
@ -725,6 +735,9 @@ lsn_t transactions_minRecLSN();
after a crash.
*/
int TdurabilityLevel();
#include "operations.h"
END_C_DECLS
#endif

View file

@ -417,7 +417,7 @@ START_TEST(recoverBlob__crash) {
Tread(xid, rid, &j);
arraySet(k, 9);
fail_unless(!memcmp(j,k,ARRAY_SIZE), "set not working?");
fail_unless(!memcmp(j,k,ARRAY_SIZE * sizeof(int)), "set not working?");
Tcommit(xid);
@ -430,25 +430,29 @@ START_TEST(recoverBlob__crash) {
/* RID = 6. */
Tread(xid, rid, &j);
fail_unless(!memcmp(j,k,ARRAY_SIZE), NULL);
fail_unless(!memcmp(j,k,ARRAY_SIZE * sizeof(int)), NULL);
TuncleanShutdown();
printf("\nreopen 1\n");
Tinit();
printf("\nreopen 1 done\n");
Tread(xid, rid, &j);
arraySet(k, 9);
fail_unless(!memcmp(j,k,ARRAY_SIZE), "Recovery didn't roll back in-progress xact!");
fail_unless(!memcmp(j,k,ARRAY_SIZE * sizeof(int)), "Recovery didn't roll back in-progress xact!");
Tdeinit();
printf("\nreopen 2\n");
Tinit();
Tread(xid, rid, &j);
assert(!memcmp(j,k,ARRAY_SIZE * sizeof(int)));
fail_unless(!memcmp(j,k,ARRAY_SIZE), "Recovery failed on second re-open.");
fail_unless(!memcmp(j,k,ARRAY_SIZE * sizeof(int)), "Recovery failed on second re-open.");
Tdeinit();

View file

@ -55,7 +55,7 @@ terms specified in this license.
#include <time.h>
#define LOG_NAME "check_linearHashNTA.log"
#define NUM_ENTRIES 100000
static const int NUM_ENTRIES = 100000;
/** @test
*/
START_TEST(linearHashNTAtest)
@ -81,9 +81,9 @@ START_TEST(linearHashNTAtest)
ThashInsert(xid, hashHeader, (byte*)&i, sizeof(int), (byte*)&val, sizeof(recordid));
found = ThashLookup(xid, hashHeader, (byte*)&i, sizeof(int), (byte**)bval2);
assert(sizeof(recordid) == found);
assert(val2->page == i * NUM_ENTRIES);
assert(val2->slot == val2->page * NUM_ENTRIES);
assert(val2->size == val2->slot * NUM_ENTRIES);
assert(val2->page == val.page);
assert(val2->slot == val.slot);
assert(val2->size == val.size);
free(val2);
}
Tcommit(xid);
@ -114,7 +114,7 @@ START_TEST(linearHashNTAtest)
int found = ThashLookup(xid, hashHeader, (byte*)&i, sizeof(int), (byte**)bval2);
assert(sizeof(recordid) == found);
assert(val2->page == i * NUM_ENTRIES);
assert(val2->slot == val2->page * NUM_ENTRIES);
assert(val2->slot == (slotid_t)val2->page * NUM_ENTRIES);
assert(val2->size == val2->slot * NUM_ENTRIES);
free(val2);
}
@ -150,9 +150,9 @@ START_TEST(linearHashNTAVariableSizetest)
int ret = ThashLookup(xid, hashHeader, (byte*)&i, sizeof(int), (byte**)bval2);
assert(sizeof(recordid) == ret);
assert(val2->page == i * NUM_ENTRIES);
assert(val2->slot == val2->page * NUM_ENTRIES);
assert(val2->size == val2->slot * NUM_ENTRIES);
assert(val2->page == val.page);
assert(val2->slot == val.slot);
assert(val2->size == val.size);
free(val2);
}
@ -185,7 +185,7 @@ START_TEST(linearHashNTAVariableSizetest)
int ret = ThashLookup(xid, hashHeader, (byte*)&i, sizeof(int), (byte**)bval2);
assert(sizeof(recordid) == ret);
assert(val2->page == i * NUM_ENTRIES);
assert(val2->slot == val2->page * NUM_ENTRIES);
assert(val2->slot == (slotid_t)val2->page * NUM_ENTRIES);
assert(val2->size == val2->slot * NUM_ENTRIES);
free(val2);
}

View file

@ -62,27 +62,6 @@ START_TEST(rawLogEntryAlloc)
}
END_TEST
/*START_TEST(clrLogEntryAlloc)
{
recordid rid = { 3, 4, 5 };
LogEntry * log = allocCLRLogEntry(200, 1, 7, rid, 8);
assert(log->LSN == -1);
assert(log->prevLSN == 200);
assert(log->xid == 1);
assert(log->type == CLRLOG);
assert(sizeofLogEntry(log) == sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry));
assert(log->contents.clr.thisUpdateLSN == 7);
assert(log->contents.clr.rid.page == 3);
assert(log->contents.clr.rid.slot == 4);
assert(log->contents.clr.rid.size == 5);
assert(log->contents.clr.undoNextLSN == 8);
free(log);
}
END_TEST */
/** @test
Quick test of allocUpdateLogEntry
@ -92,41 +71,33 @@ END_TEST */
START_TEST(updateLogEntryAlloc)
{
int * preImageCpy;
int preImage[] = {10000, 20000, 30000};
char args[] = {'a', 'b', 'c'};
recordid rid = { 3 , 4, sizeof(int)*3 };
LogEntry * log;
Tinit(); /* Needed because it sets up the operations table. */
log = allocUpdateLogEntry(200, 1, OPERATION_SET,
rid,
(const byte*)args, 3*sizeof(char), (const byte*)preImage);
rid.page,
(const byte*)args, 3*sizeof(char));
assert(log->LSN == -1);
assert(log->prevLSN == 200);
assert(log->xid == 1);
assert(log->type == UPDATELOG);
assert(log->update.funcID == OPERATION_SET);
/* assert(log->contents.update.invertible == 0); */
assert(log->update.rid.page == 3);
assert(log->update.rid.slot == 4);
assert(log->update.rid.size == 3*sizeof(int));
assert(log->update.argSize == 3*sizeof(char));
assert(log->update.page == 3);
assert(log->update.arg_size == 3*sizeof(char));
assert(getUpdateArgs(log) != NULL);
assert(args[0] == ((char*)getUpdateArgs(log))[0]);
assert(args[1] == ((char*)getUpdateArgs(log))[1]);
assert(args[2] == ((char*)getUpdateArgs(log))[2]);
preImageCpy = (int*)getUpdatePreImage(log);
assert(preImageCpy != NULL);
assert(preImage[0] == preImageCpy[0]);
assert(preImage[1] == preImageCpy[1]);
assert(preImage[2] == preImageCpy[2]);
// printf("sizes %d %d\n",sizeofLogEntry(log),(sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + (sizeof(char))));
assert(sizeofLogEntry(log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 3 * (sizeof(int)+sizeof(char))));
assert(sizeofLogEntry(log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 3 * (sizeof(char))));
free(log);
Tdeinit();
}
@ -135,29 +106,22 @@ END_TEST
START_TEST(updateLogEntryAllocNoExtras)
{
int * preImageCpy;
int preImage[] = {10000, 20000, 30000};
char args[] = {'a', 'b', 'c'};
recordid rid = { 3 , 4, sizeof(int)*3 };
LogEntry * log = allocUpdateLogEntry(200, 1, OPERATION_LHINSERT,
rid,
(byte*)args, 0, (byte*)preImage);
LogEntry * log = allocUpdateLogEntry(200, 1, OPERATION_SET,
rid.page,
(byte*)args, 0);
assert(log->LSN == -1);
assert(log->prevLSN == 200);
assert(log->xid == 1);
assert(log->type == UPDATELOG);
assert(log->update.funcID == OPERATION_LHINSERT);
/* assert(log->contents.update.invertible == 1); */
assert(log->update.rid.page == 3);
assert(log->update.rid.slot == 4);
assert(log->update.rid.size == 3*sizeof(int));
assert(log->update.argSize == 0);
assert(log->update.funcID == OPERATION_SET);
assert(log->update.page == 3);
assert(log->update.arg_size == 0);
assert(getUpdateArgs(log) == NULL);
preImageCpy = (int*)getUpdatePreImage(log);
assert(preImageCpy == NULL);
assert(sizeofLogEntry(log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 0 * (sizeof(int)+sizeof(char))));
free(log);
@ -175,11 +139,9 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */
tcase_add_test(tc, rawLogEntryAlloc);
// tcase_add_test(tc, clrLogEntryAlloc);
tcase_add_test(tc, updateLogEntryAlloc);
tcase_add_test(tc, updateLogEntryAllocNoExtras);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);

View file

@ -83,7 +83,6 @@ static void setup_log() {
recordid rid;
byte * args = (byte*)"Test 123.";
long args_size = 10; /* Including null */
unsigned long preImage = 42;
rid.page = 0;
rid.slot = 0;
@ -105,7 +104,7 @@ static void setup_log() {
FreeLogEntry (e);
FreeLogEntry (f);
e = allocUpdateLogEntry(prevLSN, xid, 1, rid, args, args_size, (byte*) &preImage);
e = allocUpdateLogEntry(prevLSN, xid, 1, rid.page, args, args_size);
LogWrite(e);
prevLSN = e->prevLSN;
@ -403,7 +402,7 @@ void reopenLogWorkload(int truncating) {
for(int i = 0; i < ENTRY_COUNT; i++) {
entries[i] = LogUpdate(&l, NULL, NULLRID, OPERATION_NOOP, NULL);
entries[i] = LogUpdate(&l, NULL, OPERATION_NOOP, NULL, 0);
if(i == SYNC_POINT) {
if(truncating) {
@ -441,7 +440,7 @@ void reopenLogWorkload(int truncating) {
LogEntry * entries2[ENTRY_COUNT];
for(int i = 0; i < ENTRY_COUNT; i++) {
entries2[i] = LogUpdate(&l, NULL, NULLRID, OPERATION_NOOP, NULL);
entries2[i] = LogUpdate(&l, NULL, OPERATION_NOOP, NULL, 0);
if(i == SYNC_POINT) {
syncLog_LogWriter();
}

View file

@ -185,8 +185,8 @@ START_TEST(multiplexTest) {
for(i = 0; i < NUM_INSERTS; i++) {
(*(lsn_t*)(arg+1)) = i;
LogEntry * e = allocUpdateLogEntry(-1, -1, OPERATION_LINEAR_HASH_INSERT, NULLRID, (byte*)arg,
sizeof(linearHash_remove_arg) + sizeof(lsn_t) + sizeof(char), NULL);
LogEntry * e = allocUpdateLogEntry(-1, -1, OPERATION_LINEAR_HASH_INSERT, INVALID_PAGE, (byte*)arg,
sizeof(linearHash_remove_arg) + sizeof(lsn_t) + sizeof(char));
ThashInsert(xid, hash, (byte*)&i, sizeof(lsn_t), (byte*)e, sizeofLogEntry(e));

View file

@ -65,7 +65,7 @@ START_TEST(operation_physical_do_undo) {
recordid rid;
lsn_t lsn = 2;
int buf;
int arg;
LogEntry * setToTwo;
Tinit();
@ -80,17 +80,30 @@ START_TEST(operation_physical_do_undo) {
unlock(p->rwlatch);
releasePage(p);
buf = 1;
arg = 2;
DEBUG("A\n");
setToTwo = allocUpdateLogEntry(-1, xid, OPERATION_SET, rid, (void*)&arg, sizeof(int), (void*)&buf);
byte arg[sizeof(slotid_t) + sizeof(int64_t) + 2 * sizeof(int)];
byte * cur = arg;
*(slotid_t*)cur = rid.slot; cur += sizeof(slotid_t);
*(int64_t*) cur = sizeof(int); cur += sizeof(int64_t);
*(int*) cur = 2; cur += sizeof(int);
*(int*) cur = 1;
// XXX fails; set log format has changed
setToTwo = allocUpdateLogEntry(-1, xid, OPERATION_SET, rid.page,
(void*)arg, sizeof(slotid_t) + sizeof(int64_t) + 2 * sizeof(int));
/* Do, undo and redo operation without updating the LSN field of the page. */
DEBUG("B\n");
p = loadPage(xid, rid.page);
// manually fill in UNDO field
//stasis_record_read(xid, p, rid, ((byte*)(setToTwo) + sizeofLogEntry(setToTwo) - rid.size));
stasis_record_write(xid, p, lsn, rid, (byte*)&buf);
releasePage(p);
setToTwo->LSN = 10;
@ -104,26 +117,26 @@ START_TEST(operation_physical_do_undo) {
stasis_record_read(xid, p, rid, (byte*)&buf);
releasePage(p);
fail_unless(buf == 2, NULL);
assert(buf == 2);
DEBUG("D\n");
p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
fail_unless(10 == stasis_page_lsn_read(p), "page lsn not set correctly.");
assert(10 == stasis_page_lsn_read(p)); // "page lsn not set correctly."
unlock(p->rwlatch);
setToTwo->LSN = 5;
undoUpdate(setToTwo, p, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */
undoUpdate(setToTwo, 12); //, p, 8); /* Should succeed: log LSN is lower than page LSN, but effective LSN is higher than page LSN */
releasePage(p);
p = loadPage(xid, rid.page);
stasis_record_read(xid, p, rid, (byte*)&buf);
releasePage(p);
fail_unless(buf == 1, NULL);
assert(buf == 1);
DEBUG("E\n");
redoUpdate(setToTwo);
@ -133,7 +146,7 @@ START_TEST(operation_physical_do_undo) {
stasis_record_read(xid, p, rid, (byte*)&buf);
releasePage(p);
fail_unless(buf == 1, NULL);
assert(buf == 1);
/* Now, simulate scenarios from normal operation:
do the operation, and update the LSN, (update happens)
@ -179,14 +192,13 @@ START_TEST(operation_physical_do_undo) {
p = loadPage(xid, rid.page);
stasis_record_read(xid, p, rid, (byte*)&buf);
assert(buf == 2);
fail_unless(buf == 2, NULL);
DEBUG("G undo set to 2\n");
undoUpdate(setToTwo, p, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
undoUpdate(setToTwo, 20); //, p, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
stasis_record_read(xid, p, rid, (byte*)&buf);
fail_unless(buf == 1, NULL);
assert(buf == 1);
releasePage(p);
DEBUG("H don't redo set to 2\n");
@ -196,7 +208,7 @@ START_TEST(operation_physical_do_undo) {
stasis_record_read(xid, p, rid, (byte*)&buf);
fail_unless(buf == 1, NULL);
assert(buf == 1);
stasis_record_write(xid, p, 0, rid, (byte*)&buf); /* reset the page's LSN. */
@ -207,7 +219,7 @@ START_TEST(operation_physical_do_undo) {
p = loadPage(xid, rid.page);
stasis_record_read(xid, p, rid, (byte*)&buf);
fail_unless(buf == 2, NULL);
assert(buf == 2);
releasePage(p);
Tdeinit();
}

View file

@ -83,7 +83,9 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) {
pthread_mutex_lock(&lsn_mutex);
lsn++;
this_lsn = lsn;
readlock(p->rwlatch,0);
assert(stasis_page_lsn_read(p) < this_lsn);
unlock(p->rwlatch);
pthread_mutex_unlock(&lsn_mutex);
if(! first ) {

View file

@ -66,29 +66,35 @@ START_TEST(pageOpCheckRecovery) {
int pageid1 = TpageAlloc(xid);
int pageid2 = TpageAlloc(xid);
assert(pageid1 != pageid2);
Page p;
byte memAddr[PAGE_SIZE];
byte memAddr[USABLE_SIZE_OF_PAGE];
p.memAddr = memAddr;
memset(p.memAddr, 1, PAGE_SIZE);
memset(p.memAddr, 1, USABLE_SIZE_OF_PAGE);
// Reset the page type after overwriting it with memset. Otherwise, Stasis
// will try to interpret it when it flushes the page to disk.
*stasis_page_type_ptr(&p) = 0;
TpageSet(xid, pageid1, p.memAddr);
TpageSetRange(xid, pageid1, 0, p.memAddr, USABLE_SIZE_OF_PAGE);
memset(p.memAddr, 2, PAGE_SIZE);
memset(p.memAddr, 2, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageSet(xid, pageid2, p.memAddr);
TpageSetRange(xid, pageid2, 0, p.memAddr, USABLE_SIZE_OF_PAGE);
Tcommit(xid);
xid = Tbegin();
TpageAlloc(xid); /* This test doesn't check for leaks, so we don't need to remember this pageid. */
int pageid_dead = TpageAlloc(xid); /* This test doesn't check for leaks, so we don't need to remember this pageid. */
assert(pageid_dead != pageid1);
assert(pageid_dead != pageid2);
TpageDealloc(xid, pageid1);
TpageDealloc(xid, pageid2);
TuncleanShutdown();
@ -98,27 +104,30 @@ START_TEST(pageOpCheckRecovery) {
xid = Tbegin();
int pageid3 = TpageAlloc(xid);
memset(p.memAddr, 3, PAGE_SIZE);
assert(pageid1 != pageid3);
assert(pageid2 != pageid3);
memset(p.memAddr, 3, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageSet(xid, pageid3, p.memAddr);
TpageSetRange(xid, pageid3, 0, p.memAddr, USABLE_SIZE_OF_PAGE);
byte newAddr[PAGE_SIZE];
byte newAddr[USABLE_SIZE_OF_PAGE];
memset(p.memAddr, 1, PAGE_SIZE);
memset(p.memAddr, 1, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid1, newAddr);
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE));
memset(p.memAddr, 2, PAGE_SIZE);
memset(p.memAddr, 2, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid2, newAddr);
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE));
memset(p.memAddr, 3, PAGE_SIZE);
memset(p.memAddr, 3, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid3, newAddr);
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE));
Tcommit(xid);
Tdeinit();

View file

@ -78,7 +78,7 @@ START_TEST (recovery_idempotent) {
Tread(xid, rid, &k);
fail_unless(j == k, "Get/Set broken?");
assert(j == k); // Get/Set broken?
Tcommit(xid);
@ -92,7 +92,7 @@ START_TEST (recovery_idempotent) {
Tread(xid, rid, &k);
fail_unless(j == k, "Recovery messed something up!");
assert(j == k); // Recovery messed something up!
Tcommit(xid);
@ -128,7 +128,7 @@ START_TEST (recovery_exactlyOnce) {
Tread(xid, rid, &k);
fail_unless(j == k, "Get/Set broken?");
assert(j == k); // Get/Set broken?
Tcommit(xid);
@ -144,8 +144,7 @@ START_TEST (recovery_exactlyOnce) {
printf("j = %d, k = %d\n", j, k);
assert(j == k);
fail_unless(j == k, "Recovery messed something up!");
assert(j == k); // Recovery messed something up!
Tcommit(xid);
@ -182,7 +181,7 @@ START_TEST (recovery_idempotentAbort) {
Tread(xid, rid, &k);
fail_unless(j == k, "Get/Set broken?");
assert(j == k); // Get/Set broken?
Tcommit(xid);
xid = Tbegin();
@ -190,7 +189,7 @@ START_TEST (recovery_idempotentAbort) {
Tset(xid, rid, &k);
k = 4;
Tread(xid, rid, &k);
fail_unless(k == 2, NULL);
assert(k == 2);
Tabort(xid);
xid = Tbegin();
@ -199,7 +198,7 @@ START_TEST (recovery_idempotentAbort) {
Tabort(xid);
fail_unless(j == k, "Didn't abort!");
assert(j == k);
Tdeinit();
@ -211,7 +210,7 @@ START_TEST (recovery_idempotentAbort) {
Tread(xid, rid, &k);
fail_unless(j == k, "Recovery messed something up!");
assert(j == k); // Recovery messed something up!
Tcommit(xid);
@ -246,11 +245,11 @@ START_TEST (recovery_exactlyOnceAbort) {
Tincrement(xid, rid);
Tread(xid, rid, &k);
fail_unless(j == k-1, NULL);
assert(j == k-1);
Tabort(xid);
xid = Tbegin();
Tread(xid, rid, &k);
fail_unless(j == k, "didn't abort?");
assert(j == k); //didn't abort?
Tcommit(xid);
Tdeinit();
@ -259,7 +258,8 @@ START_TEST (recovery_exactlyOnceAbort) {
xid = Tbegin();
Tread(xid, rid, &k);
fail_unless(j == k, "Recovery didn't abort correctly");
assert(j == k);
Tcommit(xid);
Tdeinit();
@ -279,30 +279,23 @@ START_TEST(recovery_clr) {
DEBUG("\n\nStart CLR test\n\n");
Tinit();
xid = Tbegin();
xid = Tbegin();
rid = Talloc(xid, sizeof(int));
Tread(xid, rid, &j);
Tcommit(xid);
xid = Tbegin();
xid = Tbegin();
Tincrement(xid, rid);
Tabort(xid);
Tabort(xid);
xid = Tbegin();
xid = Tbegin();
Tread(xid, rid, &k);
Tcommit(xid);
fail_unless(j == k, NULL);
Tdeinit();
assert(j == k);
Tdeinit();
Tinit();
Tdeinit();
@ -310,12 +303,10 @@ START_TEST(recovery_clr) {
Tinit();
xid = Tbegin();
Tread(xid, rid, &k);
Tcommit(xid);
fail_unless(j == k, NULL);
assert(j == k);
Tdeinit();
Tinit();
@ -326,7 +317,7 @@ START_TEST(recovery_clr) {
Tcommit(xid);
fail_unless(j == k, NULL);
assert(j == k);
Tdeinit();
@ -367,7 +358,7 @@ START_TEST(recovery_crash) {
/* RID = 9. */
Tread(xid, rid, &j);
fail_unless(j == 9, "Increment not working?");
assert(j == 9); // Increment not working?
Tcommit(xid);
@ -380,7 +371,7 @@ START_TEST(recovery_crash) {
/* RID = 6. */
Tread(xid, rid, &j);
fail_unless(j == 6, "Decrement not working?");
assert(j == 6); // Decrement not working?
TuncleanShutdown();
@ -388,14 +379,14 @@ START_TEST(recovery_crash) {
Tread(xid, rid, &j);
fail_unless(j == 9, "Recovery didn't roll back in-progress xact!");
assert(j == 9); // Recovery didn't roll back in-progress xact!
Tdeinit();
Tinit();
Tread(xid, rid, &j);
fail_unless(j == 9, "Recovery failed on second re-open.");
assert(j == 9); // Recovery failed on second re-open.
Tdeinit();
@ -475,10 +466,9 @@ START_TEST (recovery_multiple_xacts) {
Tread(xid3, rid3, &j3);
Tread(xid4, rid4, &j4);
fail_unless(j1 == 1, NULL);
fail_unless(j2 == 2, NULL);
fail_unless(j3 == 4, NULL);
fail_unless(j4 == 4, NULL);
assert(j1 == 1);
assert(j2 == 2);
assert(j3 == 4);
assert(j4 == 4);
stasis_suppress_unclean_shutdown_warnings = 1;
Tdeinit();

View file

@ -12,9 +12,9 @@ static char * logEntryToString(const LogEntry * le) {
switch(le->type) {
case UPDATELOG:
{
recordid rid = le->update.rid;
asprintf(&ret, "UPDATE\tlsn=%9lld\tprevlsn=%9lld\txid=%4d\trid={%8d %5d %5lld}\tfuncId=%3d\targSize=%9d\n", le->LSN, le->prevLSN, le->xid,
rid.page, rid.slot, (long long int)rid.size, le->update.funcID, le->update.argSize );
asprintf(&ret, "UPDATE\tlsn=%9lld\tprevlsn=%9lld\txid=%4d\tpage={%8lld}\tfuncId=%3d\targSize=%9lld\n", le->LSN, le->prevLSN, le->xid,
le->update.page, le->update.funcID, (long long)le->update.arg_size );
}
break;
@ -48,9 +48,8 @@ static char * logEntryToString(const LogEntry * le) {
break;
case CLRLOG:
{
recordid rid = le->update.rid;
asprintf(&ret, "CLR \tlsn=%9lld\tprevlsn=%9lld\txid=%4d\trid={%8d %5d %5lld}\n", le->LSN, le->prevLSN, le->xid,
rid.page, rid.slot, (long long int) rid.size );
asprintf(&ret, "CLR \tlsn=%9lld\tprevlsn=%9lld\txid=%4d\tcompensates={%8lld}\n", le->LSN, le->prevLSN, le->xid,
((CLRLogEntry*)le)->clr.compensated_lsn);
}
break;
}