fixed unit test bug; logWriters may now return NULL when readers attempt to read before the beginning of the log
This commit is contained in:
parent
4b2cfb2b23
commit
5df5512d91
3 changed files with 27 additions and 12 deletions
|
@ -145,9 +145,15 @@ static const LogEntry * stasis_log_impl_in_memory_read_entry(stasis_log_t* log,
|
||||||
lsn_t lsn) {
|
lsn_t lsn) {
|
||||||
stasis_log_impl_in_memory * impl = log->impl;
|
stasis_log_impl_in_memory * impl = log->impl;
|
||||||
DEBUG("lsn: %ld\n", lsn);
|
DEBUG("lsn: %ld\n", lsn);
|
||||||
if(lsn >= impl->nextAvailableLSN) { return 0; }
|
|
||||||
assert(lsn-impl->globalOffset >= 0 && lsn-impl->globalOffset< impl->bufferLen);
|
|
||||||
readlock(impl->globalOffset_lock, 0);
|
readlock(impl->globalOffset_lock, 0);
|
||||||
|
if(lsn >= impl->nextAvailableLSN) {
|
||||||
|
unlock(impl->globalOffset_lock);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if(!(lsn-impl->globalOffset >= 0 && lsn-impl->globalOffset< impl->bufferLen)) {
|
||||||
|
unlock(impl->globalOffset_lock);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
LogEntry * ptr = impl->buffer[lsn - impl->globalOffset];
|
LogEntry * ptr = impl->buffer[lsn - impl->globalOffset];
|
||||||
unlock(impl->globalOffset_lock);
|
unlock(impl->globalOffset_lock);
|
||||||
assert(ptr);
|
assert(ptr);
|
||||||
|
|
|
@ -501,7 +501,7 @@ static const LogEntry * readLSNEntry_LogWriter(stasis_log_t * log, const lsn_t L
|
||||||
|
|
||||||
if(LSN >= sw->nextAvailableLSN) {
|
if(LSN >= sw->nextAvailableLSN) {
|
||||||
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
|
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
|
||||||
return 0;
|
return NULL;
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
|
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
|
||||||
|
|
||||||
|
@ -514,7 +514,11 @@ static const LogEntry * readLSNEntry_LogWriter(stasis_log_t * log, const lsn_t L
|
||||||
assert(flushedLSNInternal(sw) > LSN);
|
assert(flushedLSNInternal(sw) > LSN);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(sw->global_offset <= LSN);
|
if(sw->global_offset > LSN) {
|
||||||
|
// Return NULL; the caller read before the beginning of the log.
|
||||||
|
pthread_mutex_unlock(&sw->read_mutex);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
off_t newPosition = LSN - sw->global_offset;
|
off_t newPosition = LSN - sw->global_offset;
|
||||||
newPosition = lseek(sw->ro_fd, newPosition, SEEK_SET);
|
newPosition = lseek(sw->ro_fd, newPosition, SEEK_SET);
|
||||||
|
|
|
@ -312,11 +312,11 @@ static void* worker_thread(void * arg) {
|
||||||
|
|
||||||
if(threshold < 3) {
|
if(threshold < 3) {
|
||||||
if(i > 10) {
|
if(i > 10) {
|
||||||
needToTruncate = 1;
|
needToTruncate = 1;
|
||||||
if(lsns[i - 10] > truncated_to) {
|
if(lsns[i - 10] > truncated_to) {
|
||||||
truncated_to = lsns[i - 10];
|
truncated_to = lsns[i - 10];
|
||||||
myTruncVal = truncated_to;
|
myTruncVal = truncated_to;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,9 +355,14 @@ static void* worker_thread(void * arg) {
|
||||||
pthread_mutex_unlock(&random_mutex);
|
pthread_mutex_unlock(&random_mutex);
|
||||||
|
|
||||||
const LogEntry * e = stasis_log_file->read_entry(stasis_log_file, lsn);
|
const LogEntry * e = stasis_log_file->read_entry(stasis_log_file, lsn);
|
||||||
|
if(e == NULL) {
|
||||||
assert(e->xid == entry+key);
|
pthread_mutex_lock(&random_mutex);
|
||||||
freeLogEntry(e);
|
assert(lsn < truncated_to);
|
||||||
|
pthread_mutex_unlock(&random_mutex);
|
||||||
|
} else {
|
||||||
|
assert(e->xid == entry+key);
|
||||||
|
freeLogEntry(e);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pthread_mutex_unlock(&random_mutex);
|
pthread_mutex_unlock(&random_mutex);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue