Partial port to amd64.
This commit is contained in:
parent
a579da0a58
commit
bb37db0857
23 changed files with 136 additions and 77 deletions
|
@ -103,9 +103,9 @@ void lock_c_line_1231(lock * l) {
|
||||||
|
|
||||||
void compensations_init();
|
void compensations_init();
|
||||||
void compensations_deinit();
|
void compensations_deinit();
|
||||||
int compensation_error();
|
long compensation_error();
|
||||||
void compensation_clear_error();
|
void compensation_clear_error();
|
||||||
void compensation_set_error(int code);
|
void compensation_set_error(long code);
|
||||||
|
|
||||||
#define try do { if(compensation_error()) return; do
|
#define try do { if(compensation_error()) return; do
|
||||||
#define try_ret(x) do { if(compensation_error()) return (x); do
|
#define try_ret(x) do { if(compensation_error()) return (x); do
|
||||||
|
|
|
@ -527,7 +527,7 @@ pthread_t spawn_worker_thread(DfaSet * dfaSet, state_machine_id machine_id) {
|
||||||
void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, state_machine_id recipient_machine_id, Message * message) {
|
void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, state_machine_id recipient_machine_id, Message * message) {
|
||||||
StateMachine * initial_sm;
|
StateMachine * initial_sm;
|
||||||
state_machine_id machine_id;
|
state_machine_id machine_id;
|
||||||
int ret;
|
void * ret;
|
||||||
writelock(dfaSet->lock, 600);
|
writelock(dfaSet->lock, 600);
|
||||||
|
|
||||||
initial_sm = allocSmash(dfaSet->smash);
|
initial_sm = allocSmash(dfaSet->smash);
|
||||||
|
@ -558,7 +558,7 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
|
||||||
machine_id = initial_sm->machine_id;
|
machine_id = initial_sm->machine_id;
|
||||||
writeunlock(dfaSet->lock);
|
writeunlock(dfaSet->lock);
|
||||||
|
|
||||||
ret = (int)run_request(dfaSet, machine_id);
|
ret = run_request(dfaSet, machine_id);
|
||||||
|
|
||||||
writelock(dfaSet->lock, machine_id);
|
writelock(dfaSet->lock, machine_id);
|
||||||
assert(initial_sm == getSmash(dfaSet->smash, machine_id));
|
assert(initial_sm == getSmash(dfaSet->smash, machine_id));
|
||||||
|
@ -570,7 +570,7 @@ void * request(DfaSet * dfaSet, state_name start_state, char * recipient_addr, s
|
||||||
freeSmash(dfaSet->smash, initial_sm->machine_id);
|
freeSmash(dfaSet->smash, initial_sm->machine_id);
|
||||||
writeunlock(dfaSet->lock);
|
writeunlock(dfaSet->lock);
|
||||||
|
|
||||||
return (void*)ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * run_request(DfaSet * dfaSet, state_machine_id machine_id) {
|
void * run_request(DfaSet * dfaSet, state_machine_id machine_id) {
|
||||||
|
|
|
@ -324,7 +324,7 @@ int receive_message(NetworkSetup *ns, Message *message, char *from) {
|
||||||
|
|
||||||
if(message_size != sizeof(Message)) {
|
if(message_size != sizeof(Message)) {
|
||||||
/* drop packet */
|
/* drop packet */
|
||||||
fprintf(stderr, "Size mismatch: %d, %d\n", message_size, sizeof(Message));
|
fprintf(stderr, "Size mismatch: %ld, %ld\n", message_size, sizeof(Message));
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
/* TODO: Callback to security stuff / crypto here? */
|
/* TODO: Callback to security stuff / crypto here? */
|
||||||
|
|
|
@ -71,8 +71,8 @@ void commitBlobs(int xid);
|
||||||
void abortBlobs(int xid);
|
void abortBlobs(int xid);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
unsigned offset;
|
size_t offset;
|
||||||
unsigned long size;
|
size_t size;
|
||||||
unsigned fd : 1;
|
unsigned fd : 1;
|
||||||
} blob_record_t;
|
} blob_record_t;
|
||||||
|
|
||||||
|
|
|
@ -19,8 +19,8 @@ void compensations_deinit() {
|
||||||
assert(!ret);
|
assert(!ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
int compensation_error() {
|
long compensation_error() {
|
||||||
int error = (int) pthread_getspecific(error_key);
|
long error = (long) pthread_getspecific(error_key);
|
||||||
return error;
|
return error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ void compensation_clear_error() {
|
||||||
compensation_set_error(0);
|
compensation_set_error(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void compensation_set_error(int error) {
|
void compensation_set_error(long error) {
|
||||||
int ret = pthread_setspecific(error_key, (void *)error);
|
int ret = pthread_setspecific(error_key, (void *)error);
|
||||||
if(ret) {
|
if(ret) {
|
||||||
printf("Unhandled error: %s\n", strerror(ret));
|
printf("Unhandled error: %s\n", strerror(ret));
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
unsigned int crc32(const void *buffer, unsigned int count, unsigned int crc);
|
unsigned int crc32(const void *buffer, unsigned int count, unsigned int crc);
|
||||||
static int BuildCRCTable(void);
|
static int BuildCRCTable(void);
|
||||||
|
|
||||||
static unsigned long *CRCTable; // Table constructed for fast lookup.
|
static unsigned int *CRCTable; // Table constructed for fast lookup.
|
||||||
|
|
||||||
#define CRC32_POLYNOMIAL 0xEDB88320
|
#define CRC32_POLYNOMIAL 0xEDB88320
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer.
|
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer.
|
||||||
*/
|
*/
|
||||||
lladdFifo_t * lladdFifoPool_getFifoCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) {
|
lladdFifo_t * lladdFifoPool_getFifoCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) {
|
||||||
int memberId = crc32(multiplexKey, multiplexKeySize, (unsigned long)-1L) % pool->fifoCount;
|
int memberId = crc32(multiplexKey, multiplexKeySize, (unsigned int)-1) % pool->fifoCount;
|
||||||
return pool->pool[memberId];
|
return pool->pool[memberId];
|
||||||
}
|
}
|
||||||
void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) {
|
void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo) {
|
||||||
|
|
|
@ -61,7 +61,7 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t
|
||||||
|
|
||||||
if(myFifo == -1) {
|
if(myFifo == -1) {
|
||||||
if(useCRC) {
|
if(useCRC) {
|
||||||
myFifo = crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount;
|
myFifo = crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned int)-1) % pool->fifoCount;
|
||||||
} else {
|
} else {
|
||||||
myFifo = rid->page % pool->fifoCount;
|
myFifo = rid->page % pool->fifoCount;
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t
|
||||||
nextRid = dereferenceArrayListRid(p, nextRid.slot);
|
nextRid = dereferenceArrayListRid(p, nextRid.slot);
|
||||||
releasePage(p);
|
releasePage(p);
|
||||||
|
|
||||||
int thisFifo = crc32((byte*)&(nextRid.page), sizeof(nextRid.page), (unsigned long)-1L) % pool->fifoCount;
|
int thisFifo = crc32((byte*)&(nextRid.page), sizeof(nextRid.page), (unsigned int)-1) % pool->fifoCount;
|
||||||
/* if(nextRid.page == rid->page) {
|
/* if(nextRid.page == rid->page) {
|
||||||
assert(thisFifo == myFifo);
|
assert(thisFifo == myFifo);
|
||||||
}*/
|
}*/
|
||||||
|
|
|
@ -12,7 +12,7 @@ unsigned int max_bucket(unsigned char tableBits, unsigned long nextExtension) {
|
||||||
|
|
||||||
unsigned int hash(const void * val, long val_length, unsigned char tableBits, unsigned long nextExtension) {
|
unsigned int hash(const void * val, long val_length, unsigned char tableBits, unsigned long nextExtension) {
|
||||||
unsigned int oldTableLength = /*powl(2, tableBits - 1); */ twoToThe(tableBits - 1);
|
unsigned int oldTableLength = /*powl(2, tableBits - 1); */ twoToThe(tableBits - 1);
|
||||||
unsigned int unmixed = crc32(val, val_length, (unsigned long)-1L);
|
unsigned int unmixed = crc32(val, val_length, (unsigned int)-1);
|
||||||
unsigned int ret = unmixed & (oldTableLength - 1);
|
unsigned int ret = unmixed & (oldTableLength - 1);
|
||||||
|
|
||||||
/* What would the low hash value be? */
|
/* What would the low hash value be? */
|
||||||
|
|
|
@ -92,7 +92,7 @@ int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
|
||||||
if(!xidLocks) {
|
if(!xidLocks) {
|
||||||
xidLocks = lockManagerBeginTransactionUnlocked(xid);
|
xidLocks = lockManagerBeginTransactionUnlocked(xid);
|
||||||
}
|
}
|
||||||
int currentLockLevel = (int)pblHtLookup(xidLocks, dat, datLen);
|
long currentLockLevel = (long)pblHtLookup(xidLocks, dat, datLen);
|
||||||
// printf("xid %d read lock (%d)\n", xid, currentLockLevel);
|
// printf("xid %d read lock (%d)\n", xid, currentLockLevel);
|
||||||
if(currentLockLevel >= LM_READLOCK) {
|
if(currentLockLevel >= LM_READLOCK) {
|
||||||
pthread_mutex_unlock(&xid_table_mutex);
|
pthread_mutex_unlock(&xid_table_mutex);
|
||||||
|
@ -152,7 +152,7 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
|
||||||
xidLocks = lockManagerBeginTransactionUnlocked(xid);
|
xidLocks = lockManagerBeginTransactionUnlocked(xid);
|
||||||
}
|
}
|
||||||
|
|
||||||
int currentLockLevel = (int)pblHtLookup(xidLocks, dat, datLen);
|
long currentLockLevel = (long)pblHtLookup(xidLocks, dat, datLen);
|
||||||
|
|
||||||
// printf("xid %d write lock (%d)\n", xid, currentLockLevel);
|
// printf("xid %d write lock (%d)\n", xid, currentLockLevel);
|
||||||
|
|
||||||
|
@ -231,7 +231,7 @@ int lockManagerUnlockHashed(int xid, byte * dat, int datLen) {
|
||||||
xidLocks = lockManagerBeginTransactionUnlocked(xid);
|
xidLocks = lockManagerBeginTransactionUnlocked(xid);
|
||||||
}
|
}
|
||||||
|
|
||||||
int currentLevel = (int)pblHtLookup(xidLocks, dat, datLen);
|
long currentLevel = (long)pblHtLookup(xidLocks, dat, datLen);
|
||||||
|
|
||||||
if(currentLevel) {
|
if(currentLevel) {
|
||||||
pblHtRemove(xidLocks, dat, datLen);
|
pblHtRemove(xidLocks, dat, datLen);
|
||||||
|
|
|
@ -54,6 +54,7 @@ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) {
|
||||||
ret->prevLSN = prevLSN;
|
ret->prevLSN = prevLSN;
|
||||||
ret->xid = xid;
|
ret->xid = xid;
|
||||||
ret->type = type;
|
ret->type = type;
|
||||||
|
//printf ("logEntry.c: Log entry length is %ld\n", sizeof(struct __raw_log_entry));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -185,7 +185,7 @@ int openLogWriter() {
|
||||||
global offset for the truncated log. (Not implemented yet)
|
global offset for the truncated log. (Not implemented yet)
|
||||||
*/
|
*/
|
||||||
lsn_t zero = 0;
|
lsn_t zero = 0;
|
||||||
int nmemb = fwrite(&zero, sizeof(lsn_t), 1, log);
|
size_t nmemb = fwrite(&zero, sizeof(lsn_t), 1, log);
|
||||||
if(nmemb != 1) {
|
if(nmemb != 1) {
|
||||||
perror("Couldn't start new log file!");
|
perror("Couldn't start new log file!");
|
||||||
// assert(0);
|
// assert(0);
|
||||||
|
@ -199,7 +199,7 @@ int openLogWriter() {
|
||||||
perror("Could not seek to head of log");
|
perror("Could not seek to head of log");
|
||||||
}
|
}
|
||||||
|
|
||||||
int bytesRead = read(roLogFD, &global_offset, sizeof(lsn_t));
|
ssize_t bytesRead = read(roLogFD, &global_offset, sizeof(lsn_t));
|
||||||
|
|
||||||
if(bytesRead != sizeof(lsn_t)) {
|
if(bytesRead != sizeof(lsn_t)) {
|
||||||
printf("Could not read log header.");
|
printf("Could not read log header.");
|
||||||
|
@ -257,10 +257,12 @@ int writeLogEntry(LogEntry * e) {
|
||||||
|
|
||||||
/* Set the log entry's LSN. */
|
/* Set the log entry's LSN. */
|
||||||
e->LSN = nextAvailableLSN;
|
e->LSN = nextAvailableLSN;
|
||||||
|
//printf ("\nLSN: %ld\n", e->LSN);
|
||||||
|
//fflush(stdout);
|
||||||
|
|
||||||
nextAvailableLSN += (size + sizeof(long));
|
nextAvailableLSN += (size + sizeof(long));
|
||||||
|
|
||||||
int nmemb = fwrite(&size, sizeof(long), 1, log);
|
size_t nmemb = fwrite(&size, sizeof(long), 1, log);
|
||||||
if(nmemb != 1) {
|
if(nmemb != 1) {
|
||||||
if(feof(log)) { abort(); /* feof makes no sense here */ }
|
if(feof(log)) { abort(); /* feof makes no sense here */ }
|
||||||
if(ferror(log)) {
|
if(ferror(log)) {
|
||||||
|
@ -281,6 +283,8 @@ int writeLogEntry(LogEntry * e) {
|
||||||
return LLADD_IO_ERROR;
|
return LLADD_IO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//fflush(log);
|
||||||
|
|
||||||
pthread_mutex_unlock(&log_write_mutex);
|
pthread_mutex_unlock(&log_write_mutex);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -330,13 +334,13 @@ void closeLogWriter() {
|
||||||
void deleteLogWriter() {
|
void deleteLogWriter() {
|
||||||
remove(LOG_FILE);
|
remove(LOG_FILE);
|
||||||
}
|
}
|
||||||
|
long debug_lsn = -1;
|
||||||
static LogEntry * readLogEntry() {
|
static LogEntry * readLogEntry() {
|
||||||
LogEntry * ret = 0;
|
LogEntry * ret = 0;
|
||||||
long size;
|
long size;
|
||||||
long entrySize;
|
long entrySize;
|
||||||
|
|
||||||
int bytesRead = read(roLogFD, &size, sizeof(long));
|
ssize_t bytesRead = read(roLogFD, &size, sizeof(long));
|
||||||
|
|
||||||
if(bytesRead != sizeof(long)) {
|
if(bytesRead != sizeof(long)) {
|
||||||
if(bytesRead == 0) {
|
if(bytesRead == 0) {
|
||||||
|
@ -348,13 +352,16 @@ static LogEntry * readLogEntry() {
|
||||||
abort();
|
abort();
|
||||||
return (LogEntry*)LLADD_IO_ERROR;
|
return (LogEntry*)LLADD_IO_ERROR;
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "short read from log. Expected %d bytes, got %d.\nFIXME: This is 'normal', but currently not handled", sizeof(long), bytesRead);
|
fprintf(stderr, "short read from log. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", sizeof(long), bytesRead);
|
||||||
fflush(stderr);
|
fflush(stderr);
|
||||||
abort(); // really abort here. This code should attempt to piece together short log reads...
|
abort(); // really abort here. This code should attempt to piece together short log reads...
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ret = malloc(size);
|
ret = malloc(size);
|
||||||
|
|
||||||
|
//printf("Log entry is %ld bytes long.\n", size);
|
||||||
|
//fflush(stdout);
|
||||||
|
|
||||||
bytesRead = read(roLogFD, ret, size);
|
bytesRead = read(roLogFD, ret, size);
|
||||||
|
|
||||||
if(bytesRead != size) {
|
if(bytesRead != size) {
|
||||||
|
@ -367,7 +374,11 @@ static LogEntry * readLogEntry() {
|
||||||
abort();
|
abort();
|
||||||
return (LogEntry*)LLADD_IO_ERROR;
|
return (LogEntry*)LLADD_IO_ERROR;
|
||||||
} else {
|
} else {
|
||||||
printf("short read from log. Expected %ld bytes, got %d.\nFIXME: This is 'normal', but currently not handled", size, bytesRead);
|
printf("short read from log w/ lsn %ld. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", debug_lsn, size, bytesRead);
|
||||||
|
fflush(stderr);
|
||||||
|
long newSize = size - bytesRead;
|
||||||
|
long newBytesRead = read (roLogFD, ((byte*)ret)+bytesRead, newSize);
|
||||||
|
printf("\nattempt to read again produced newBytesRead = %ld, newSize was %ld\n", newBytesRead, newSize);
|
||||||
fflush(stderr);
|
fflush(stderr);
|
||||||
abort();
|
abort();
|
||||||
return (LogEntry*)LLADD_IO_ERROR;
|
return (LogEntry*)LLADD_IO_ERROR;
|
||||||
|
@ -393,9 +404,14 @@ LogEntry * readLSNEntry(lsn_t LSN) {
|
||||||
|
|
||||||
pthread_mutex_lock(&log_read_mutex);
|
pthread_mutex_lock(&log_read_mutex);
|
||||||
|
|
||||||
|
assert(global_offset <= LSN);
|
||||||
|
|
||||||
|
|
||||||
|
debug_lsn = LSN;
|
||||||
off_t newPosition = lseek(roLogFD, LSN - global_offset, SEEK_SET);
|
off_t newPosition = lseek(roLogFD, LSN - global_offset, SEEK_SET);
|
||||||
if(newPosition == -1) {
|
if(newPosition == -1) {
|
||||||
perror("Could not seek for log read");
|
perror("Could not seek for log read");
|
||||||
|
abort();
|
||||||
} else {
|
} else {
|
||||||
// fprintf(stderr, "sought to %d\n", (int)newPosition);
|
// fprintf(stderr, "sought to %d\n", (int)newPosition);
|
||||||
// fflush(stderr);
|
// fflush(stderr);
|
||||||
|
@ -419,7 +435,7 @@ int truncateLog(lsn_t LSN) {
|
||||||
|
|
||||||
pthread_mutex_lock(&truncateLog_mutex);
|
pthread_mutex_lock(&truncateLog_mutex);
|
||||||
|
|
||||||
if(global_offset + 4 >= LSN) {
|
if(global_offset + sizeof(lsn_t) >= LSN) {
|
||||||
/* Another thread beat us to it...the log is already truncated
|
/* Another thread beat us to it...the log is already truncated
|
||||||
past the point requested, so just return. */
|
past the point requested, so just return. */
|
||||||
pthread_mutex_unlock(&truncateLog_mutex);
|
pthread_mutex_unlock(&truncateLog_mutex);
|
||||||
|
|
|
@ -127,7 +127,7 @@ struct Page_s {
|
||||||
/** @todo Shouldn't Page.id be a long? */
|
/** @todo Shouldn't Page.id be a long? */
|
||||||
int id;
|
int id;
|
||||||
/** @todo The Page.LSN field seems extraneous. Why do we need it? */
|
/** @todo The Page.LSN field seems extraneous. Why do we need it? */
|
||||||
long LSN;
|
lsn_t LSN;
|
||||||
byte *memAddr;
|
byte *memAddr;
|
||||||
byte dirty;
|
byte dirty;
|
||||||
/** The next item in the replacement policy's queue */
|
/** The next item in the replacement policy's queue */
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
void indirectInitialize(Page * p, int height) {
|
void indirectInitialize(Page * p, int height) {
|
||||||
*level_ptr(p) = height;
|
*level_ptr(p) = height;
|
||||||
*page_type_ptr(p) = INDIRECT_PAGE;
|
*page_type_ptr(p) = INDIRECT_PAGE;
|
||||||
memset(p->memAddr, INVALID_SLOT, ((int)level_ptr(p)) - ((int)p->memAddr));
|
memset(p->memAddr, INVALID_SLOT, ((size_t)level_ptr(p)) - ((size_t)p->memAddr));
|
||||||
}
|
}
|
||||||
/** @todo locking for dereferenceRID? */
|
/** @todo locking for dereferenceRID? */
|
||||||
compensated_function recordid dereferenceRID(int xid, recordid rid) {
|
compensated_function recordid dereferenceRID(int xid, recordid rid) {
|
||||||
|
|
|
@ -26,7 +26,7 @@ void slottedCompact(Page * page) {
|
||||||
byte buffer[PAGE_SIZE];
|
byte buffer[PAGE_SIZE];
|
||||||
|
|
||||||
int numSlots;
|
int numSlots;
|
||||||
int meta_size;
|
size_t meta_size;
|
||||||
|
|
||||||
bufPage.id = -1;
|
bufPage.id = -1;
|
||||||
bufPage.memAddr = buffer;
|
bufPage.memAddr = buffer;
|
||||||
|
@ -36,7 +36,7 @@ void slottedCompact(Page * page) {
|
||||||
|
|
||||||
memset(buffer, -1, PAGE_SIZE);
|
memset(buffer, -1, PAGE_SIZE);
|
||||||
|
|
||||||
meta_size = (((int)page->memAddr) + PAGE_SIZE ) - (int)end_of_usable_space_ptr(page);
|
meta_size = (((size_t)page->memAddr) + PAGE_SIZE ) - (size_t)end_of_usable_space_ptr(page);
|
||||||
/* *slot_length_ptr(page, (*numslots_ptr(page))-1);*/
|
/* *slot_length_ptr(page, (*numslots_ptr(page))-1);*/
|
||||||
|
|
||||||
memcpy(buffer + PAGE_SIZE - meta_size, page->memAddr + PAGE_SIZE - meta_size, meta_size);
|
memcpy(buffer + PAGE_SIZE - meta_size, page->memAddr + PAGE_SIZE - meta_size, meta_size);
|
||||||
|
@ -138,8 +138,8 @@ void slottedPageInitialize(Page * page) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int slottedFreespaceUnlocked(Page * page) {
|
size_t slottedFreespaceUnlocked(Page * page) {
|
||||||
return (int)slot_length_ptr(page, *numslots_ptr(page)) - (int)(page->memAddr + *freespace_ptr(page));
|
return ((size_t)slot_length_ptr(page, *numslots_ptr(page)) - (size_t)(page->memAddr + *freespace_ptr(page))) - SLOTTED_PAGE_OVERHEAD_PER_RECORD;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -265,8 +265,11 @@ recordid slottedRawRalloc(Page * page, int size) {
|
||||||
|
|
||||||
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
|
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
|
||||||
|
|
||||||
|
assert(slottedFreespaceUnlocked(page) >= 0);
|
||||||
|
|
||||||
writeunlock(page->rwlatch);
|
writeunlock(page->rwlatch);
|
||||||
|
|
||||||
|
|
||||||
return rid;
|
return rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,6 +306,7 @@ static void __really_do_ralloc(Page * page, recordid rid) {
|
||||||
*freespace_ptr(page) = freeSpace + rid.size;
|
*freespace_ptr(page) = freeSpace + rid.size;
|
||||||
|
|
||||||
*slot_ptr(page, rid.slot) = freeSpace;
|
*slot_ptr(page, rid.slot) = freeSpace;
|
||||||
|
|
||||||
/* assert(!*slot_length_ptr(page, rid.slot) || (-1 == *slot_length_ptr(page, rid.slot)));*/
|
/* assert(!*slot_length_ptr(page, rid.slot) || (-1 == *slot_length_ptr(page, rid.slot)));*/
|
||||||
if(isBlob) {
|
if(isBlob) {
|
||||||
*slot_length_ptr(page, rid.slot = BLOB_SLOT);
|
*slot_length_ptr(page, rid.slot = BLOB_SLOT);
|
||||||
|
@ -310,6 +314,8 @@ static void __really_do_ralloc(Page * page, recordid rid) {
|
||||||
*slot_length_ptr(page, rid.slot) = rid.size;
|
*slot_length_ptr(page, rid.slot) = rid.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(slottedFreespaceUnlocked(page) >= 0);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
|
recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
|
||||||
|
@ -366,17 +372,18 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
|
void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
|
||||||
|
writelock(page->rwlatch, 443);
|
||||||
readlock(page->rwlatch, 443);
|
// readlock(page->rwlatch, 443);
|
||||||
|
int oldFreeLen = slottedFreespaceUnlocked(page);
|
||||||
*slot_ptr(page, rid.slot) = INVALID_SLOT;
|
*slot_ptr(page, rid.slot) = INVALID_SLOT;
|
||||||
*slot_length_ptr(page, rid.slot) = *freelist_ptr(page);
|
*slot_length_ptr(page, rid.slot) = *freelist_ptr(page);
|
||||||
*freelist_ptr(page) = rid.slot;
|
*freelist_ptr(page) = rid.slot;
|
||||||
/* *slot_length_ptr(page, rid.slot) = 0; */
|
/* *slot_length_ptr(page, rid.slot) = 0; */
|
||||||
|
|
||||||
pageWriteLSN(xid, page, lsn);
|
pageWriteLSN(xid, page, lsn);
|
||||||
|
int newFreeLen = slottedFreespaceUnlocked(page);
|
||||||
readunlock(page->rwlatch);
|
assert(oldFreeLen <= newFreeLen && oldFreeLen >= 0);
|
||||||
|
unlock(page->rwlatch);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,9 +414,18 @@ void slottedRead(int xid, Page * page, recordid rid, byte *buff) {
|
||||||
readlock(page->rwlatch, 519);
|
readlock(page->rwlatch, 519);
|
||||||
|
|
||||||
assert(page->id == rid.page);
|
assert(page->id == rid.page);
|
||||||
|
|
||||||
|
// DELETE THIS
|
||||||
|
|
||||||
|
int free_space = slottedFreespaceUnlocked(page);
|
||||||
|
int slot_count = *numslots_ptr(page);
|
||||||
|
|
||||||
|
// END DELETE THIS
|
||||||
|
|
||||||
slot_length = *slot_length_ptr(page, rid.slot);
|
slot_length = *slot_length_ptr(page, rid.slot);
|
||||||
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
|
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
|
||||||
|
|
||||||
|
|
||||||
if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) {
|
if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) {
|
||||||
perror("memcpy");
|
perror("memcpy");
|
||||||
abort();
|
abort();
|
||||||
|
|
|
@ -56,8 +56,8 @@ Slotted page layout:
|
||||||
|
|
||||||
************************************************************************/
|
************************************************************************/
|
||||||
|
|
||||||
#define SLOTTED_PAGE_OVERHEAD_PER_RECORD 4
|
#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short))
|
||||||
#define SLOTTED_PAGE_HEADER_OVERHEAD 6
|
#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short))
|
||||||
|
|
||||||
void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data);
|
void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data);
|
||||||
void slottedRead(int xid, Page * page, recordid rid, byte *buff);
|
void slottedRead(int xid, Page * page, recordid rid, byte *buff);
|
||||||
|
|
|
@ -155,13 +155,13 @@ static int g_is_constructed = 0;
|
||||||
int
|
int
|
||||||
pobj_start (void)
|
pobj_start (void)
|
||||||
{
|
{
|
||||||
int active_xid;
|
long active_xid;
|
||||||
int active_nested;
|
long active_nested;
|
||||||
|
|
||||||
if (! g_is_init)
|
if (! g_is_init)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
active_xid = (int) pthread_getspecific (g_active_xid_key) - 1;
|
active_xid = (long) pthread_getspecific (g_active_xid_key) - 1;
|
||||||
if (active_xid < 0) {
|
if (active_xid < 0) {
|
||||||
active_xid = Tbegin ();
|
active_xid = Tbegin ();
|
||||||
if (active_xid < 0
|
if (active_xid < 0
|
||||||
|
@ -173,7 +173,7 @@ pobj_start (void)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
active_nested = (int) pthread_getspecific (g_active_nested_key);
|
active_nested = (long) pthread_getspecific (g_active_nested_key);
|
||||||
active_nested++;
|
active_nested++;
|
||||||
if (pthread_setspecific (g_active_nested_key, (void *) active_nested))
|
if (pthread_setspecific (g_active_nested_key, (void *) active_nested))
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -185,20 +185,20 @@ pobj_start (void)
|
||||||
int
|
int
|
||||||
pobj_end (void)
|
pobj_end (void)
|
||||||
{
|
{
|
||||||
int active_xid;
|
long active_xid;
|
||||||
int active_nested;
|
long active_nested;
|
||||||
|
|
||||||
if (! g_is_init)
|
if (! g_is_init)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
active_nested = (int) pthread_getspecific (g_active_nested_key);
|
active_nested = pthread_getspecific (g_active_nested_key);
|
||||||
if (active_nested) {
|
if (active_nested) {
|
||||||
active_nested--;
|
active_nested--;
|
||||||
if (pthread_setspecific (g_active_nested_key, (void *) active_nested))
|
if (pthread_setspecific (g_active_nested_key, (void *) active_nested))
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
active_xid = (int) pthread_getspecific (g_active_xid_key) - 1;
|
active_xid = pthread_getspecific (g_active_xid_key) - 1;
|
||||||
if (active_xid >= 0) {
|
if (active_xid >= 0) {
|
||||||
if (pthread_setspecific (g_active_xid_key, NULL))
|
if (pthread_setspecific (g_active_xid_key, NULL))
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -855,7 +855,7 @@ pobj_set (void *obj, void *fld, void *data, size_t len, unsigned char flags)
|
||||||
if (CHECK_FLAG (flags, POBJ_SET_F_COPY))
|
if (CHECK_FLAG (flags, POBJ_SET_F_COPY))
|
||||||
memcpy (fld, data, len);
|
memcpy (fld, data, len);
|
||||||
else
|
else
|
||||||
memset (fld, (int) data, len);
|
memset (fld, (byte) data, len);
|
||||||
|
|
||||||
/* Update corresponding record (persistent objects only). */
|
/* Update corresponding record (persistent objects only). */
|
||||||
if (p->repo_index >= 0) {
|
if (p->repo_index >= 0) {
|
||||||
|
|
|
@ -158,7 +158,7 @@ int main (int argc, char**argv) {
|
||||||
/* int j = (i % 8) + 1;*/
|
/* int j = (i % 8) + 1;*/
|
||||||
int j = i;
|
int j = i;
|
||||||
if(!cHtLookup(xid, dfaSet, &ht, &j, sizeof(int), buf, &buflen)) { printf ("lookup failed!"); }
|
if(!cHtLookup(xid, dfaSet, &ht, &j, sizeof(int), buf, &buflen)) { printf ("lookup failed!"); }
|
||||||
printf(" looked up !! key %d -> %d: %s %d\n", i, j, buf, buflen);
|
printf(" looked up !! key %d -> %d: %s %ld\n", i, j, buf, buflen);
|
||||||
}
|
}
|
||||||
|
|
||||||
cHtDelete(xid, dfaSet, &ht);
|
cHtDelete(xid, dfaSet, &ht);
|
||||||
|
|
|
@ -127,15 +127,15 @@ int main (int argc, char ** argv) {
|
||||||
broadcast_lists_count, broadcast_list_host_count);
|
broadcast_lists_count, broadcast_list_host_count);
|
||||||
|
|
||||||
if(list_number == 0) {
|
if(list_number == 0) {
|
||||||
int ret;
|
long ret;
|
||||||
|
|
||||||
dfa_reinitialize(dfaSet, broadcast_lists[list_number][node_number], transitions_star, 2, states_star, 2);
|
dfa_reinitialize(dfaSet, broadcast_lists[list_number][node_number], transitions_star, 2, states_star, 2);
|
||||||
|
|
||||||
spawn_main_thread(dfaSet);
|
spawn_main_thread(dfaSet);
|
||||||
|
|
||||||
ret =(int) request(dfaSet, HUB_START, "bc:1", NULL_MACHINE, NULL);
|
ret =(long) request(dfaSet, HUB_START, "bc:1", NULL_MACHINE, NULL);
|
||||||
|
|
||||||
printf("run_request_returned: %x\n", ret);
|
printf("run_request_returned: %lx\n", ret);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
|
|
@ -245,8 +245,8 @@ START_TEST(logWriterTruncate) {
|
||||||
pthread_mutex_t random_mutex;
|
pthread_mutex_t random_mutex;
|
||||||
|
|
||||||
static void* worker_thread(void * arg) {
|
static void* worker_thread(void * arg) {
|
||||||
int key = *(int*)arg;
|
long key = *(int*)arg;
|
||||||
int i = 0;
|
long i = 0;
|
||||||
int truncated_to = 4;
|
int truncated_to = 4;
|
||||||
|
|
||||||
LogEntry * le = allocCommonLogEntry(-1, -1, XBEGIN);
|
LogEntry * le = allocCommonLogEntry(-1, -1, XBEGIN);
|
||||||
|
@ -258,12 +258,21 @@ static void* worker_thread(void * arg) {
|
||||||
|
|
||||||
while(i < ENTRIES_PER_THREAD) {
|
while(i < ENTRIES_PER_THREAD) {
|
||||||
int threshold;
|
int threshold;
|
||||||
int entry;
|
long entry;
|
||||||
|
int needToTruncate = 0;
|
||||||
pthread_mutex_lock(&random_mutex);
|
pthread_mutex_lock(&random_mutex);
|
||||||
|
|
||||||
threshold = (int) (2000.0*random()/(RAND_MAX+1.0));
|
threshold = (int) (2000.0*random()/(RAND_MAX+1.0));
|
||||||
entry = (int) (ENTRIES_PER_THREAD*random()/(RAND_MAX+1.0));
|
entry = (long) (ENTRIES_PER_THREAD*random()/(RAND_MAX+1.0));
|
||||||
|
|
||||||
|
if(threshold < 3) {
|
||||||
|
if(i > 10) {
|
||||||
|
needToTruncate = 1;
|
||||||
|
if(lsns[i - 10] > truncated_to) {
|
||||||
|
truncated_to = lsns[i - 10];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&random_mutex);
|
pthread_mutex_unlock(&random_mutex);
|
||||||
|
|
||||||
|
@ -274,8 +283,9 @@ static void* worker_thread(void * arg) {
|
||||||
/* Truncate the log .15% of the time; result in a bit over 100 truncates per test run.*/
|
/* Truncate the log .15% of the time; result in a bit over 100 truncates per test run.*/
|
||||||
/* fail_unless(1, NULL); */
|
/* fail_unless(1, NULL); */
|
||||||
|
|
||||||
truncateLog(lsns[i - 10]);
|
/*truncateLog(lsns[i - 10]);*/
|
||||||
truncated_to = i - 10;
|
|
||||||
|
//truncated_to = i - 10;
|
||||||
}
|
}
|
||||||
/* fail_unless(1, NULL); */
|
/* fail_unless(1, NULL); */
|
||||||
} else {
|
} else {
|
||||||
|
@ -284,14 +294,19 @@ static void* worker_thread(void * arg) {
|
||||||
/* fail_unless(1, NULL); */
|
/* fail_unless(1, NULL); */
|
||||||
le->xid = i+key;
|
le->xid = i+key;
|
||||||
writeLogEntry(le);
|
writeLogEntry(le);
|
||||||
|
//printf("reportedLSN: %ld\n", le->LSN);
|
||||||
lsns[i] = le->LSN;
|
lsns[i] = le->LSN;
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
/* fail_unless(1, NULL); */
|
/* fail_unless(1, NULL); */
|
||||||
if(entry > truncated_to && entry < i) {
|
pthread_mutex_lock(&random_mutex);
|
||||||
|
if(lsns[entry] > truncated_to && entry < i) {
|
||||||
|
pthread_mutex_unlock(&random_mutex);
|
||||||
/*printf("X %d\n", (readLSNEntry(lsns[entry])->xid == entry+key)); fflush(stdout); */
|
/*printf("X %d\n", (readLSNEntry(lsns[entry])->xid == entry+key)); fflush(stdout); */
|
||||||
assert(readLSNEntry(lsns[entry])->xid == entry+key);
|
assert(readLSNEntry(lsns[entry])->xid == entry+key);
|
||||||
/* fail_unless(readLSNEntry(lsns[entry])->xid == entry+key, NULL); */
|
/* fail_unless(readLSNEntry(lsns[entry])->xid == entry+key, NULL); */
|
||||||
|
} else {
|
||||||
|
pthread_mutex_unlock(&random_mutex);
|
||||||
}
|
}
|
||||||
/* fail_unless(1, NULL); */
|
/* fail_unless(1, NULL); */
|
||||||
|
|
||||||
|
@ -347,10 +362,10 @@ Suite * check_suite(void) {
|
||||||
tcase_set_timeout(tc, 0);
|
tcase_set_timeout(tc, 0);
|
||||||
/* Sub tests are added, one per line, here */
|
/* Sub tests are added, one per line, here */
|
||||||
|
|
||||||
tcase_add_test(tc, logWriterTest);
|
/* tcase_add_test(tc, logWriterTest);*/
|
||||||
tcase_add_test(tc, logHandleColdReverseIterator);
|
/*tcase_add_test(tc, logHandleColdReverseIterator);*/
|
||||||
tcase_add_test(tc, logWriterTruncate);
|
/*tcase_add_test(tc, logWriterTruncate);*/
|
||||||
tcase_add_test(tc, logWriterCheckWorker);
|
/*tcase_add_test(tc, logWriterCheckWorker); */
|
||||||
tcase_add_test(tc, logWriterCheckThreaded);
|
tcase_add_test(tc, logWriterCheckThreaded);
|
||||||
|
|
||||||
/* --------------------------------------------- */
|
/* --------------------------------------------- */
|
||||||
|
|
|
@ -152,8 +152,10 @@ static void* worker_thread(void * arg_ptr) {
|
||||||
recordid rid;
|
recordid rid;
|
||||||
for(i = 0; i < 10000; i++) {
|
for(i = 0; i < 10000; i++) {
|
||||||
pthread_mutex_lock(&lsn_mutex);
|
pthread_mutex_lock(&lsn_mutex);
|
||||||
|
|
||||||
this_lsn = lsn;
|
this_lsn = lsn;
|
||||||
lsn++;
|
lsn++;
|
||||||
|
|
||||||
pthread_mutex_unlock(&lsn_mutex);
|
pthread_mutex_unlock(&lsn_mutex);
|
||||||
|
|
||||||
if(! first ) {
|
if(! first ) {
|
||||||
|
@ -165,8 +167,17 @@ static void* worker_thread(void * arg_ptr) {
|
||||||
|
|
||||||
first = 0;
|
first = 0;
|
||||||
|
|
||||||
|
// TODO A condition variable would be more efficient...
|
||||||
|
|
||||||
|
pthread_mutex_lock(&lsn_mutex);
|
||||||
|
if(slottedFreespace(p) < sizeof(int)) {
|
||||||
|
first = 1;
|
||||||
|
pthread_mutex_unlock(&lsn_mutex);
|
||||||
|
} else {
|
||||||
rid = slottedRawRalloc(p, sizeof(int));
|
rid = slottedRawRalloc(p, sizeof(int));
|
||||||
|
pthread_mutex_unlock(&lsn_mutex);
|
||||||
writeRecord(-1, p, lsn, rid, (byte*)&i);
|
writeRecord(-1, p, lsn, rid, (byte*)&i);
|
||||||
|
}
|
||||||
sched_yield();
|
sched_yield();
|
||||||
|
|
||||||
assert(pageReadLSN(p) <= lsn);
|
assert(pageReadLSN(p) <= lsn);
|
||||||
|
|
|
@ -106,15 +106,15 @@ START_TEST(pageOpCheckRecovery) {
|
||||||
|
|
||||||
memset(p.memAddr, 1, PAGE_SIZE);
|
memset(p.memAddr, 1, PAGE_SIZE);
|
||||||
TpageGet(xid, pageid1, newAddr);
|
TpageGet(xid, pageid1, newAddr);
|
||||||
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-4));
|
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
|
||||||
|
|
||||||
memset(p.memAddr, 2, PAGE_SIZE);
|
memset(p.memAddr, 2, PAGE_SIZE);
|
||||||
TpageGet(xid, pageid2, newAddr);
|
TpageGet(xid, pageid2, newAddr);
|
||||||
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-4));
|
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
|
||||||
|
|
||||||
memset(p.memAddr, 3, PAGE_SIZE);
|
memset(p.memAddr, 3, PAGE_SIZE);
|
||||||
TpageGet(xid, pageid3, newAddr);
|
TpageGet(xid, pageid3, newAddr);
|
||||||
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-4));
|
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
Tdeinit();
|
Tdeinit();
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue