Started adding error handling code to LLADD.

This commit is contained in:
Sears Russell 2005-02-16 04:11:14 +00:00
parent f4484e35b5
commit 84a20a3c96
29 changed files with 188 additions and 115 deletions

View file

@ -20,7 +20,6 @@
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#ifndef __LIBDFA_RW_H
#define __LIBDFA_RW_H
typedef struct {

View file

@ -100,7 +100,7 @@ typedef struct Page_s Page;
* @param pageid ID of the page you want to load
* @return fully formed Page type
*/
Page * loadPage(int xid, int pageid);
compensated_function Page * loadPage(int xid, int pageid);
/**
loadPage aquires a lock when it is called, effectively pinning it

View file

@ -112,6 +112,8 @@ extern int errno;
#else
#define DEBUG(...)
#include "compensations.h"
#endif /*DEBUGGING*/
#endif /* __lladd_common_h */

View file

@ -97,7 +97,8 @@ void lock_c_line_1231(lock * l) {
properly.
Also, begin_action(NULL, NULL) is supported, and is useful for
checking the return value of a called function.
checking the return value of a called function, but, for
efficiency, try{ } end; is recommended
*/
void compensations_init();
@ -106,13 +107,19 @@ int compensation_error();
void compensation_clear_error();
void compensation_set_error(int code);
#define try do { if(compensation_error()) return; do
#define try_ret(x) do { if(compensation_error()) return (x); do
#define end while(0); if(compensation_error()) return; }while(0)
#define end_ret(x) while(0); if(compensation_error()) return (x); }while(0)
#define begin_action(func, var) \
if(compensation_error()) return; \
do{ \
void (*_func_)(void*); \
pthread_cleanup_push(_func_=(void(*)(void*))(func), (void*)(var));\
do
/** @todo compensation variables don't need _func_ anymore. */
#define end_action \
while(0); \
pthread_cleanup_pop(_func_ && compensation_error()); \

View file

@ -173,4 +173,21 @@ terms specified in this license.
#define XEND 6
#define CLRLOG 7
/* Page types */
#define UNINITIALIZED_PAGE 0
#define SLOTTED_PAGE 1
#define INDIRECT_PAGE 2
#define LLADD_HEADER_PAGE 3
#define LLADD_FREE_PAGE 4
#define FIXED_PAGE 5
#define ARRAY_LIST_PAGE 6
/* Record types */
#define UNINITIALIZED_RECORD 0
#define BLOB_RECORD 1
#define SLOTTED_RECORD 2
#define FIXED_RECORD 3
#endif

View file

@ -17,8 +17,8 @@ extern LockManagerSetup globalLockManager;
void lockManagerInit();
int lockManagerReadLockRecord(int xid, recordid rid);
int lockManagerWriteLockRecord(int xid, recordid rid);
compensated_function int lockManagerReadLockRecord(int xid, recordid rid);
compensated_function int lockManagerWriteLockRecord(int xid, recordid rid);
int lockManagerUnlockRecord(int xid, recordid rid);
int lockManagerCommit(int xid);

View file

@ -84,6 +84,10 @@ lladd_hash_iterator * ThashIterator(int xid, recordid hash, int keySize, int val
*/
int ThashNext(int xid, lladd_hash_iterator * it, byte ** key, int * keySize, byte** value, int * valueSize);
/** Free the hash iterator and its associated resources. */
void ThashDone(int xid, lladd_hash_iterator * it);
Operation getLinearHashInsert();
Operation getLinearHashRemove();

View file

@ -74,7 +74,7 @@ Operation getSetRange();
efficiently, it performs a number of extra memcpy() calls over the
entire record.
*/
void TsetRange(int xid, recordid rid, int offset, int length, const void * dat);
compensated_function void TsetRange(int xid, recordid rid, int offset, int length, const void * dat);
#endif

View file

@ -140,7 +140,7 @@ terms specified in this license.
In order to facilitate this, LLADD provides the function TgetRecordType() and
guarantess that the first recordid returned by any allocation will point to
the same page and slot as the constant ROOT_RECORD. TgetRecordType
will return NULL_RECORD if the record passed to it does not exist.
will return NULLRID if the record passed to it does not exist.
Therefore, the following code will safely initialize or reopen a data
store:
@ -262,8 +262,8 @@ typedef struct {
long size;
} recordid;
extern const recordid ZERO_RID;
extern const recordid ROOT_RECORD;
extern const recordid NULLRID;
/**
If a recordid's slot field is set to this, then the recordid
@ -379,6 +379,7 @@ void Trevive(int xid, long lsn);
*/
void TsetXIDCount(int xid);
END_C_DECLS
#endif

View file

@ -141,7 +141,7 @@ DfaSet * cHtCoordinatorInit(char * config_file, short(*get_broadcast_group)(DfaS
DfaSet * cHtSubordinateInit(char * config_file, short(*get_broadcast_group)(DfaSet *, Message *), int subordinate_number);
int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet);
int cHtCommit(state_machine_id xid, DfaSet * dfaSet);
/* int cHtAbort(state_machine_id xid, DfaSet * dfaSet);*/
int cHtAbort(state_machine_id xid, DfaSet * dfaSet);
/** The server side state for a CHT. */
typedef struct {

View file

@ -267,9 +267,10 @@ Page * getPage(int pageid, int locktype) {
return ret;
}
Page *loadPage(int xid, int pageid) {
if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); }
compensated_function Page *loadPage(int xid, int pageid) {
try_ret(NULL) {
if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); }
} end_ret(NULL);
Page * ret = getPage(pageid, RO);
return ret;
}

View file

@ -186,7 +186,7 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
ts.tv_nsec = tv.tv_usec * 1000;
if(tod_ret != 0) {
perror("Could not get time of day");
compensation_set_error(LLADD_DEADLOCK);
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}
while(ridLock->writers || (ridLock->readers - me)) {
@ -301,10 +301,10 @@ int lockManagerCommitHashed(int xid, int datLen) {
return ret;
}
int lockManagerReadLockRecord(int xid, recordid rid) {
compensated_function int lockManagerReadLockRecord(int xid, recordid rid) {
return lockManagerReadLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerWriteLockRecord(int xid, recordid rid) {
compensated_function int lockManagerWriteLockRecord(int xid, recordid rid) {
return lockManagerWriteLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerUnlockRecord(int xid, recordid rid) {
@ -314,10 +314,10 @@ int lockManagerCommitRecords(int xid) {
return lockManagerCommitHashed(xid, sizeof(recordid));
}
int lockManagerReadLockPage(int xid, int p) {
compensated_function int lockManagerReadLockPage(int xid, int p) {
return lockManagerReadLockHashed(xid, (byte*)&p, sizeof(int));
}
int lockManagerWriteLockPage(int xid, int p) {
compensated_function int lockManagerWriteLockPage(int xid, int p) {
return lockManagerWriteLockHashed(xid, (byte*)&p, sizeof(int));
}
int lockManagerUnlockPage(int xid, int p) {

View file

@ -323,8 +323,6 @@ static void recover_split(int xid, recordid hashRid, int i, int next_split, int
hashRid.slot = 0;
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = -1;
if(headerHashBits <= i && headerNextSplit <= next_split) {
@ -416,7 +414,6 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = -1;
// recordid ba_contents; TreadUnlocked(xid, ba, &ba_contents);
// recordid bb_contents = NULLRID;
@ -821,21 +818,19 @@ int TlogicalHashLookup(int xid, recordid hashRid, void * key, int keySize, void
linearHash_iterator * TlogicalHashIterator(int xid, recordid hashRid) {
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=2; NULLRID.size = -1;
linearHash_iterator * ret = malloc(sizeof(linearHash_iterator));
ret->current_hashBucket = 0;
ret->current_rid = NULLRID;
ret->current_rid.slot = 2;
return ret;
}
void TlogicalHashIteratorFree(linearHash_iterator * it) {
free(it);
}
linearHash_iteratorPair TlogicalHashIteratorNext(int xid, recordid hashRid, linearHash_iterator * it, int keySize, int valSize) {
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=2; NULLRID.size = -1;
recordid * headerRidB = pblHtLookup(openHashes, &hashRid.page, sizeof(int));
hashEntry * e = malloc(sizeof(hashEntry) + keySize + valSize);
linearHash_iteratorPair p;// = malloc(sizeof(linearHash_iteratorPair));
//next.size == 0 -> empty bucket. == -1 -> end of list.
@ -862,6 +857,7 @@ linearHash_iteratorPair TlogicalHashIteratorNext(int xid, recordid hashRid, line
p.key = NULL;
p.value = NULL;
it->current_rid = NULLRID;
it->current_rid.slot = 2;
// memcpy(&(it->current_rid), &(NULLRID), sizeof(recordid));
it->current_hashBucket = 0;
} else {

View file

@ -371,3 +371,13 @@ int ThashNext(int xid, lladd_hash_iterator * it, byte ** key, int * keySize, byt
}
return 1;
}
void ThashDone(int xid, lladd_hash_iterator * it) {
if(it->it) {
free(it->it);
}
if(it->pit) {
free(it->pit);
}
free(it);
}

View file

@ -3,9 +3,18 @@
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#define __USE_GNU
#include <pthread.h>
/*** @todo this is a big hack but it seems necessary to work with
gcc when the moon is full. (thanks mike demmer. ;) */
#ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
# define PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP \
{0, 0, 0, PTHREAD_MUTEX_RECURSIVE_NP, __LOCK_INITIALIZER}
#endif
/** A quick note on the format of linked lists. Each entry consists
of a struct with some variable length data appended to it.
@ -31,7 +40,9 @@
@file
*/
static pthread_mutex_t linked_list_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
typedef struct {

View file

@ -140,7 +140,6 @@ void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int v
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1);
recordid NULLRID; NULLRID.page = 0; NULLRID.slot=0; NULLRID.size = -1;
hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);

View file

@ -64,13 +64,9 @@ extern TransactionLog XactionTable[];
pblHashTable_t * nestedTopActions = NULL;
/** @todo this really should be set somewhere globally. */
static recordid NULLRID;
void initNestedTopActions() {
nestedTopActions = pblHtCreate();
NULLRID.page = 0;
NULLRID.slot = 0;
NULLRID.size = -1;
}
/** @todo TbeginNestedTopAction's API might not be quite right.
Are there cases where we need to pass a recordid in?

View file

@ -12,11 +12,6 @@ typedef struct {
recordid TpagedListAlloc(int xid) {
recordid NULLRID;
NULLRID.page = 0;
NULLRID.slot = 0;
NULLRID.size = -1;
recordid ret = Talloc(xid, sizeof(pagedListHeader));
pagedListHeader header;
header.thisPage = 0;

View file

@ -95,7 +95,12 @@ static int deOperateRange(int xid, Page * p, lsn_t lsn, recordid rid, const void
free(tmp);
return 0;
}
void TsetRange(int xid, recordid rid, int offset, int length, const void * dat) {
compensated_function void TsetRange(int xid, recordid rid, int offset, int length, const void * dat) {
Page * p;
try {
p = loadPage(xid, rid.page);
} end;
set_range_t * range = malloc(sizeof(set_range_t) + 2 * length);
byte * record = malloc(rid.size);
@ -106,7 +111,6 @@ void TsetRange(int xid, recordid rid, int offset, int length, const void * dat)
// Copy new value into log structure
memcpy(range + 1, dat, length);
Page * p = loadPage(xid, rid.page);
// No further locking is necessary here; readRecord protects the
// page layout, but attempts at concurrent modification have undefined
// results. (See page.c)

View file

@ -107,11 +107,11 @@ void pageWriteLSN(int xid, Page * page, lsn_t lsn) {
/* unlocked since we're only called by a function that holds the writelock. */
/* *(long *)(page->memAddr + START_OF_LSN) = page->LSN; */
begin_action(NULL,NULL) {
try {
if(globalLockManager.writeLockPage) {
globalLockManager.writeLockPage(xid, page->id);
}
} end_action;
} end;
if(page->LSN < lsn) {
page->LSN = lsn;

View file

@ -93,13 +93,6 @@ terms specified in this license.
BEGIN_C_DECLS
#define UNINITIALIZED_PAGE 0
#define SLOTTED_PAGE 1
#define INDIRECT_PAGE 2
#define LLADD_HEADER_PAGE 3
#define LLADD_FREE_PAGE 4
#define FIXED_PAGE 5
#define ARRAY_LIST_PAGE 6
#define lsn_ptr(page) (((lsn_t *)(&((page)->memAddr[PAGE_SIZE])))-1)
#define page_type_ptr(page) (((int*)lsn_ptr((page)))-1)
#define end_of_usable_space_ptr(page) page_type_ptr((page))
@ -111,10 +104,10 @@ BEGIN_C_DECLS
#define USABLE_SIZE_OF_PAGE (PAGE_SIZE - sizeof(lsn_t) - sizeof(int))
#define UNINITIALIZED_RECORD 0
/*#define UNINITIALIZED_RECORD 0
#define BLOB_RECORD 1
#define SLOTTED_RECORD 2
#define FIXED_RECORD 3
#define FIXED_RECORD 3 */
@ -221,7 +214,7 @@ void pageDeInit();
* @param page You must have a writelock on page before calling this function.
* @param lsn The new lsn of the page. If the new lsn is less than the page's current lsn, then the page's lsn will not be changed.
*/
void pageWriteLSN(int xid, Page * page, lsn_t lsn);
compensated_function void pageWriteLSN(int xid, Page * page, lsn_t lsn);
/**
* assumes that the page is already loaded in memory. It takes

View file

@ -14,8 +14,11 @@ void indirectInitialize(Page * p, int height) {
memset(p->memAddr, INVALID_SLOT, ((int)level_ptr(p)) - ((int)p->memAddr));
}
/** @todo locking for dereferenceRID? */
recordid dereferenceRID(int xid, recordid rid) {
Page * this = loadPage(xid, rid.page);
compensated_function recordid dereferenceRID(int xid, recordid rid) {
Page * this;
try_ret(NULLRID) {
this = loadPage(xid, rid.page);
} end_ret(NULLRID);
int offset = 0;
int max_slot;
while(*page_type_ptr(this) == INDIRECT_PAGE) {
@ -32,7 +35,9 @@ recordid dereferenceRID(int xid, recordid rid) {
int nextPage = *page_ptr(this, i);
releasePage(this);
this = loadPage(xid, nextPage);
try_ret(NULLRID) {
this = loadPage(xid, nextPage);
} end_ret(NULLRID);
}
rid.page = this->id;
@ -168,8 +173,11 @@ recordid __rallocMany(int xid, int parentPage, int recordSize, int recordCount)
return rid;
}
unsigned int indirectPageRecordCount(int xid, recordid rid) {
Page * p = loadPage(xid, rid.page);
compensated_function int indirectPageRecordCount(int xid, recordid rid) {
Page * p;
try_ret(-1){
p = loadPage(xid, rid.page);
}end_ret(-1);
int i = 0;
unsigned int ret;
if(*page_type_ptr(p) == INDIRECT_PAGE) {
@ -193,7 +201,7 @@ unsigned int indirectPageRecordCount(int xid, recordid rid) {
}
} else {
printf("Unknown page type in indirectPageRecordCount()\n");
printf("Unknown page type in indirectPageRecordCount\n");
abort();
}
releasePage(p);

View file

@ -43,11 +43,12 @@ BEGIN_C_DECLS
Translates a recordid that points to an indirect block into the
physical location of the record.
*/
recordid dereferenceRID(int xid, recordid rid);
#define dereferenceRIDUnlocked(y, x) dereferenceRID((y), (x))
compensated_function recordid dereferenceRID(int xid, recordid rid);
compensated_function static inline recordid dereferenceRIDUnlocked(int xid, recordid rid) {return dereferenceRID(xid,rid);}
//#define dereferenceRIDUnlocked(y, x) dereferenceRID((y), (x))
void indirectInitialize(Page * p, int height);
recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount);
unsigned int indirectPageRecordCount(int xid, recordid rid);
compensated_function int indirectPageRecordCount(int xid, recordid rid);
END_C_DECLS

View file

@ -156,8 +156,12 @@ int slottedFreespace(Page * page) {
interface? (The xid is there for now, in case it allows some
optimizations later. Perhaps it's better to cluster allocations
from the same xid on the same page, or something...)
@todo slottedPreRalloc should understand deadlock, and try another page if deadlock occurs.
@todo need to obtain (transaction-level) write locks _before_ writing log entries. Otherwise, we can deadlock at recovery.
*/
recordid slottedPreRalloc(int xid, long size, Page ** pp) {
compensated_function recordid slottedPreRalloc(int xid, long size, Page ** pp) {
recordid ret;
@ -175,18 +179,24 @@ recordid slottedPreRalloc(int xid, long size, Page ** pp) {
if(lastFreepage == -1) {
lastFreepage = TpageAlloc(xid);
*pp = loadPage(xid, lastFreepage);
try_ret(NULLRID) {
*pp = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
assert(*page_type_ptr(*pp) == UNINITIALIZED_PAGE);
slottedPageInitialize(*pp);
} else {
*pp = loadPage(xid, lastFreepage);
try_ret(NULLRID) {
*pp = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
}
if(slottedFreespace(*pp) < size ) {
releasePage(*pp);
lastFreepage = TpageAlloc(xid);
*pp = loadPage(xid, lastFreepage);
try_ret(NULLRID) {
*pp = loadPage(xid, lastFreepage);
} end_ret(NULLRID);
slottedPageInitialize(*pp);
}
@ -201,24 +211,20 @@ recordid slottedPreRalloc(int xid, long size, Page ** pp) {
return ret;
}
recordid slottedPreRallocFromPage(int xid, long page, long size, Page **pp) {
compensated_function recordid slottedPreRallocFromPage(int xid, long page, long size, Page **pp) {
recordid ret;
int isBlob = 0;
if(size == BLOB_SLOT) {
isBlob = 1;
size = sizeof(blob_record_t);
}
*pp = loadPage(xid, page);
try_ret(NULLRID) {
*pp = loadPage(xid, page);
} end_ret(NULLRID);
if(slottedFreespace(*pp) < size) {
releasePage(*pp);
*pp = NULL;
recordid rid;
rid.page = 0;
rid.slot = 0;
rid.size = -1;
return rid;
return NULLRID;
}
if(*page_type_ptr(*pp) == UNINITIALIZED_PAGE) {
@ -302,7 +308,7 @@ static void __really_do_ralloc(Page * page, recordid rid) {
}
recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
compensated_function recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
writelock(page->rwlatch, 376);
@ -347,26 +353,27 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
(*slot_length_ptr(page, rid.slot) >= PAGE_SIZE));
}
pageWriteLSN(xid, page, lsn);
writeunlock(page->rwlatch);
begin_action_ret(writeunlock, page->rwlatch, NULLRID) { // lock acquired above.
pageWriteLSN(xid, page, lsn);
} compensate_ret(NULLRID);
return rid;
}
void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
compensated_function void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
readlock(page->rwlatch, 443);
begin_action(unlock, page->rwlatch) {
readlock(page->rwlatch, 443);
*slot_ptr(page, rid.slot) = INVALID_SLOT;
*slot_length_ptr(page, rid.slot) = *freelist_ptr(page);
*freelist_ptr(page) = rid.slot;
/* *slot_length_ptr(page, rid.slot) = 0; */
*slot_ptr(page, rid.slot) = INVALID_SLOT;
*slot_length_ptr(page, rid.slot) = *freelist_ptr(page);
*freelist_ptr(page) = rid.slot;
/* *slot_length_ptr(page, rid.slot) = 0; */
pageWriteLSN(xid, page, lsn);
pageWriteLSN(xid, page, lsn);
} compensate;
unlock(page->rwlatch);
}
void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) {
@ -427,7 +434,7 @@ void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat
/*page->LSN = lsn;
*lsn_ptr(page) = lsn * /
pageWriteLSN(page); */
pageWriteLSN-page); */
unlock(page->rwlatch);
}

View file

@ -91,12 +91,12 @@ void slottedPageInitialize(Page * p);
* @see postRallocSlot the implementation of the second phase.
*
*/
recordid slottedPreRalloc(int xid, long size, Page**p);
compensated_function recordid slottedPreRalloc(int xid, long size, Page**p);
/**
Identical to slottedPreRalloc, but allows the user to specify which page the
record should be allocated in.
*/
recordid slottedPreRallocFromPage(int xid, long page, long size, Page**p);
compensated_function recordid slottedPreRallocFromPage(int xid, long page, long size, Page**p);
/**
* The second phase of slot allocation. Called after the log entry
@ -113,13 +113,13 @@ recordid slottedPreRallocFromPage(int xid, long page, long size, Page**p);
* page for this record (though it is possible that we need to compact
* the page)
*/
recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid);
compensated_function recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid);
/**
* Mark the space used by a record for reclaimation.
*
* @param rid the recordid to be freed.
*/
void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid);
compensated_function void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid);
void slottedPageInit();
void slottedPageDeinit();

View file

@ -20,6 +20,8 @@ TransactionLog XactionTable[MAX_TRANSACTIONS];
int numActiveXactions = 0;
int xidCount = 0;
const recordid ROOT_RECORD = {1, 0, -1};
const recordid NULLRID = {0,0,-1};
/**
Locking for transactional2.c works as follows:

View file

@ -13,7 +13,8 @@ int main(int argc, char ** argv) {
Tinit();
DfaSet * cht_client = cHtClientInit(conf);
pthread_t main_worker_loop = spawn_main_thread(cht_client);
// pthread_t main_worker_loop =
spawn_main_thread(cht_client);
// cht_evals go here...

View file

@ -30,22 +30,37 @@ void * pageWorkerThread(void * j) {
if(rw) {
// readlock
if(LLADD_DEADLOCK == globalLockManager.readLockPage(xid, m)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
}
int locked = 0;
// begin_action_ret(NULL,NULL, 0) {
if(LLADD_DEADLOCK == globalLockManager.readLockPage(xid, m)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
}
// } end_action_ret(0);
/* if(locked) {
assert(compensation_error() == LLADD_DEADLOCK);
compensation_clear_error();
} */
} else {
// writelock
int locked = 0;
// begin_action_ret(NULL, NULL, 0) {
if(LLADD_DEADLOCK == globalLockManager.writeLockPage(xid, m)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
locked = 1;
}
/* if(locked) {
int err = compensation_error();
assert(err == LLADD_DEADLOCK);
compensation_clear_error();
} */
// } end_action_ret(0);
}
}
@ -73,23 +88,27 @@ void * ridWorkerThread(void * j) {
if(rw) {
// readlock
if(LLADD_DEADLOCK == globalLockManager.readLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
}
// begin_action_ret(NULL, NULL, 0) {
if(LLADD_DEADLOCK == globalLockManager.readLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
}
// } end_action_ret(0);
} else {
// writelock
if(LLADD_DEADLOCK == globalLockManager.writeLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
}
// begin_action_ret(NULL, NULL, 0) {
if(LLADD_DEADLOCK == globalLockManager.writeLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
}
// } end_action_ret(0);
}
}

View file

@ -49,11 +49,11 @@ foreach my $i (@source) {
while(my $line = <IN>) {
$num++;
my $in = 0;
if ($line =~ /\bbegin_action(_ret)?\b/) {
if ($line =~ /\bbegin_action(_ret)?\s*\(\b/ || $line =~ /\btry(_ret)?\s*[\(\{]/) {
$nest++;
$in = 1;
}
if ($line =~ /\bend_action(_ret)?\b/ || $line =~ /\bcompensate(_ret)?\b/) {
if ($line =~ /}\s*end(_action)?(_ret)?\b/ || $line =~ /}\s*compensate(_ret)?\b/) {
$nest--;
if($in) {
warn "$pwd/$i:$num Cannot handle single line compensation checks\n";