more log refactoring. safeWrites now holds a latch across reserve_entry, write_entry, and write_entry_done

This commit is contained in:
Sears Russell 2010-01-19 02:14:09 +00:00
parent 94b356f837
commit b1f7d1947a
8 changed files with 150 additions and 68 deletions

View file

@ -46,6 +46,14 @@ terms specified in this license.
#include <assert.h>
LogEntry * mallocScratchCommonLogEntry(lsn_t LSN, lsn_t prevLSN, int xid, unsigned int type) {
LogEntry * ret = calloc(1, sizeof(struct __raw_log_entry));
ret->LSN = LSN;
ret->prevLSN = prevLSN;
ret->xid = xid;
ret->type = type;
return ret;
}
LogEntry * allocCommonLogEntry(stasis_log_t* log, lsn_t prevLSN, int xid, unsigned int type) {
LogEntry * ret = log->reserve_entry(log,sizeof(struct __raw_log_entry));
ret->prevLSN = prevLSN;
@ -84,6 +92,25 @@ lsn_t getPrepareRecLSN(const LogEntry *e) {
return ret;
}
// XXX get rid of the mallocScratch* functions.
LogEntry * mallocScratchUpdateLogEntry(lsn_t LSN, lsn_t prevLSN, int xid,
unsigned int op, pageid_t page,
unsigned int arg_size) {
size_t logentrysize =
sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + arg_size;
LogEntry * ret = calloc(1, logentrysize);
ret->LSN = LSN;
ret->prevLSN = prevLSN;
ret->xid = xid;
ret->type = UPDATELOG;
ret->update.funcID = op;
ret->update.page = page;
ret->update.arg_size = arg_size;
return ret;
}
LogEntry * allocUpdateLogEntry(stasis_log_t* log, lsn_t prevLSN, int xid,
unsigned int op, pageid_t page,
unsigned int arg_size) {

View file

@ -112,20 +112,26 @@ LogEntry * stasis_log_write_update(stasis_log_t* log, stasis_transaction_table_e
// pthread_mutex_unlock(&l->mut);
return e;
}
// XXX change nta interface so that arg gets passed into end_nta, not begin_nta.
LogEntry * stasis_log_begin_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, unsigned int op,
const byte * arg, size_t arg_size) {
LogEntry * e = allocUpdateLogEntry(log, l->prevLSN, l->xid, op, INVALID_PAGE, arg_size);
LogEntry * e = mallocScratchUpdateLogEntry(INVALID_LSN, l->prevLSN, l->xid, op, INVALID_PAGE, arg_size);
memcpy(stasis_log_entry_update_args_ptr(e), arg, arg_size);
return e;
}
lsn_t stasis_log_end_nta(stasis_log_t* log, stasis_transaction_table_entry_t * l, LogEntry * e) {
log->write_entry(log, e);
LogEntry * realEntry = allocUpdateLogEntry(log, e->prevLSN, e->xid, e->update.funcID, e->update.page, e->update.arg_size);
memcpy(stasis_log_entry_update_args_ptr(realEntry), stasis_log_entry_update_args_cptr(e), e->update.arg_size);
log->write_entry(log, realEntry);
// pthread_mutex_lock(&l->mut);
if(l->prevLSN == INVALID_LSN) { l->recLSN = e->LSN; }
lsn_t ret = l->prevLSN = e->LSN;
if(l->prevLSN == INVALID_LSN) { l->recLSN = realEntry->LSN; }
lsn_t ret = l->prevLSN = realEntry->LSN;
// pthread_mutex_unlock(&l->mut);
log->write_entry_done(log, e);
log->write_entry_done(log, realEntry);
free(e);
return ret;
}
@ -143,10 +149,10 @@ lsn_t stasis_log_write_clr(stasis_log_t* log, const LogEntry * old_e) {
lsn_t stasis_log_write_dummy_clr(stasis_log_t* log, int xid, lsn_t prevLSN) {
// XXX waste of log bandwidth.
LogEntry * e = allocUpdateLogEntry(log, prevLSN, xid, OPERATION_NOOP,
LogEntry * e = mallocScratchUpdateLogEntry(INVALID_LSN, prevLSN, xid, OPERATION_NOOP,
INVALID_PAGE, 0);
lsn_t ret = stasis_log_write_clr(log, e);
log->write_entry_done(log, e);
free(e);
return ret;
}

View file

@ -236,21 +236,20 @@ static inline lsn_t nextEntry_LogWriter(stasis_log_t* log,
return e->LSN + sizeofLogEntry(log, e) + sizeof(lsn_t);
}
// crc handling
static inline void log_crc_reset(stasis_log_safe_writes_state* sw) {
sw->crc = 0;
}
static inline void log_crc_update(stasis_log_t* log, const LogEntry * e, unsigned int * crc) {
static inline void log_crc_update(stasis_log_t* log, const LogEntry * e, uint32_t * crc) {
*crc = stasis_crc32(e, sizeofLogEntry(log, e), *crc);
}
static LogEntry* log_crc_dummy_entry(stasis_log_t *log) {
LogEntry* ret = allocCommonLogEntry(log, 0, -1, INTERNALLOG);
assert(ret->prevLSN == 0);
static LogEntry* log_crc_dummy_entry(lsn_t lsn) {
LogEntry* ret = mallocScratchCommonLogEntry(lsn, 0, -1, INTERNALLOG);
return ret;
}
static LogEntry* log_crc_entry(stasis_log_t *log, unsigned int crc) {
LogEntry* ret = allocCommonLogEntry(log, crc, -1, INTERNALLOG);
static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e, int clearcrc);
static lsn_t log_crc_entry(stasis_log_t *log) {
LogEntry* e= allocCommonLogEntry(log, -1, -1, INTERNALLOG);
writeLogEntryUnlocked(log, e, 1); // 1-> reset crc.
lsn_t ret = e->LSN;
log->write_entry_done(log, e); // XXX depends on implementation of write_entry_done, which will change.
return ret;
}
@ -267,11 +266,12 @@ static inline lsn_t log_crc_next_lsn(stasis_log_t* log, lsn_t ret) {
// Using readLogEntry() bypasses checks to see if we're past the end
// of the log.
LogEntry * le;
unsigned int crc = 0;
uint32_t crc = 0;
while((le = readLogEntry(sw))) {
if(le->type == INTERNALLOG) {
if (!(le->prevLSN) || (crc == (unsigned int) le->prevLSN)) {
if ((!le->prevLSN) || (crc == (uint32_t) le->prevLSN)) {
DEBUG("read matching crc entry %x\n", crc);
ret = nextEntry_LogWriter(log, le);
crc = 0;
} else {
@ -307,7 +307,7 @@ static inline lsn_t log_crc_next_lsn(stasis_log_t* log, lsn_t ret) {
LSN encountered so far to the end of the log.
*/
static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) {
static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e, int clearcrc) {
stasis_log_safe_writes_state* sw = log->impl;
@ -317,8 +317,16 @@ static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) {
pthread_mutex_lock(&sw->nextAvailableLSN_mutex);
e->LSN = sw->nextAvailableLSN;
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
assert(clearcrc == (e->type == INTERNALLOG));
log_crc_update(log, e, &sw->crc);
if(clearcrc) {
// Reset log_crc to zero each time a crc entry is written.
e->prevLSN = sw->crc;
DEBUG("wrote crc entry %x\n", sw->crc);
sw->crc = 0;
} else {
log_crc_update(log, e, &sw->crc);
}
DEBUG("Writing Log entry type = %d lsn = %ld, size = %ld\n",
e->type, e->LSN, size);
@ -330,6 +338,8 @@ static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) {
if(ferror(sw->fp)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n",
ferror(sw->fp));
errno = ferror(sw->fp);
perror("error was:");
abort();
}
abort();
@ -344,6 +354,8 @@ static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) {
if(ferror(sw->fp)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n",
ferror(sw->fp));
errno = ferror(sw->fp);
perror("error was:");
abort();
}
abort();
@ -361,18 +373,22 @@ static int writeLogEntryUnlocked(stasis_log_t* log, LogEntry * e) {
static int writeLogEntry_LogWriter(stasis_log_t* log, LogEntry * e) {
stasis_log_safe_writes_state* sw = log->impl;
pthread_mutex_lock(&sw->write_mutex);
int ret = writeLogEntryUnlocked(log, e);
pthread_mutex_unlock(&sw->write_mutex);
// Make sure that our caller holds write_mutex.
assert(pthread_mutex_trylock(&sw->write_mutex));
int ret = writeLogEntryUnlocked(log, e, 0);
return ret;
}
LogEntry* reserveEntry_LogWriter(struct stasis_log_t* log, size_t sz) {
stasis_log_safe_writes_state* sw = log->impl;
pthread_mutex_lock(&sw->write_mutex);
// XXX need to assign LSN here
return malloc(sz);
return calloc(1, sz);
}
int entryDone_LogWriter(struct stasis_log_t* log, LogEntry* e) {
stasis_log_safe_writes_state* sw = log->impl;
pthread_mutex_unlock(&sw->write_mutex);
// int ret = writeLogEntry_LogWriter(log, e);
free(e);
return 0;
@ -405,19 +421,7 @@ static void syncLog_LogWriter(stasis_log_t * log,
stasis_log_safe_writes_state* sw = log->impl;
lsn_t newFlushedLSN;
pthread_mutex_lock(&sw->write_mutex);
pthread_mutex_lock(&sw->nextAvailableLSN_mutex);
newFlushedLSN = sw->nextAvailableLSN;
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
LogEntry* crc_entry = log_crc_entry(log, sw->crc);
writeLogEntryUnlocked(log, crc_entry);
free(crc_entry);
// Reset log_crc to zero each time a crc entry is written.
log_crc_reset(sw);
pthread_mutex_unlock(&sw->write_mutex);
newFlushedLSN = log_crc_entry(log);
fflush(sw->fp);
@ -596,7 +600,7 @@ static int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
pthread_mutex_unlock(&sw->truncate_mutex);
return 0;
}
DEBUG("truncating\n"); fflush(stdout);
/* w+ = truncate, and open for writing. */
tmpLog = fopen(sw->scratch_filename, "w+");
@ -652,10 +656,9 @@ static int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
if(firstCRC) { free(firstCRC); }
}
freeLogHandle(lh);
LogEntry * crc_entry = log_crc_dummy_entry(log);
pthread_mutex_lock(&sw->nextAvailableLSN_mutex);
crc_entry->LSN = sw->nextAvailableLSN;
LogEntry * crc_entry = log_crc_dummy_entry(sw->nextAvailableLSN);
DEBUG("Crc entry: lsn = %ld, crc = %x\n", crc_entry->LSN,
crc_entry->prevLSN);
@ -665,7 +668,7 @@ static int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
sw->nextAvailableLSN = nextEntry_LogWriter(log, crc_entry);
log_crc_reset(sw);
sw->crc = 0;
pthread_mutex_unlock(&sw->nextAvailableLSN_mutex);
@ -729,15 +732,14 @@ static int truncateLog_LogWriter(stasis_log_t* log, lsn_t LSN) {
sw->global_offset = LSN - sizeof(lsn_t);
lsn_t logPos;
if((logPos = myFseek(sw->fp, 0, SEEK_END))
!= sw->nextAvailableLSN - sw->global_offset) {
lsn_t logPos = myFseek(sw->fp, 0, SEEK_END);
if(logPos != sw->nextAvailableLSN - sw->global_offset) {
if(logPos == -1) {
perror("Truncation couldn't seek");
} else {
printf("logfile was wrong length after truncation. "
"Expected %lld, found %lld\n",
sw->nextAvailableLSN - sw->global_offset, logPos);
"Expected %lld, copied %lld, found %lld\n",
sw->nextAvailableLSN - sw->global_offset, lengthOfCopiedLog, logPos);
fflush(stdout);
abort();
}
@ -915,7 +917,7 @@ stasis_log_t* stasis_log_safe_writes_open(const char * filename,
}
// Reset log_crc to zero (nextAvailableLSN immediately follows a crc entry).
log_crc_reset(sw);
sw->crc = 0;
sw->flushedLSN_wal = sw->nextAvailableLSN;
sw->flushedLSN_commit = sw->nextAvailableLSN;

View file

@ -180,7 +180,7 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page,
pthread_mutex_lock(&h->mut);
LogEntry * e = allocUpdateLogEntry(h->log, -1, h->l->xid, op,
LogEntry * e = mallocScratchUpdateLogEntry(INVALID_LSN, INVALID_LSN, h->l->xid, op,
p->id, datlen);
memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen);
@ -193,11 +193,12 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page,
unlock(p->rwlatch);
pthread_mutex_unlock(&h->mut);
// page will be released by the log handle...
stasis_log_file->write_entry_done(stasis_log_file, e);
//stasis_log_file->write_entry_done(stasis_log_file, e);
free(e);
}
lsn_t TwritebackUpdate(int xid, pageid_t page,
const void *dat, size_t datlen, int op) {
LogEntry * e = allocUpdateLogEntry(stasis_log_file, -1, xid, op, page, datlen);
LogEntry * e = allocUpdateLogEntry(stasis_log_file, INVALID_LSN, xid, op, page, datlen);
memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen);
stasis_transaction_table_entry_t* l = stasis_transaction_table_get(stasis_transaction_table, xid);
@ -218,10 +219,11 @@ void TreorderableWritebackUpdate(int xid, void* hp,
stasis_log_reordering_handle_t* h = hp;
assert(stasis_transaction_table_is_active(stasis_transaction_table, xid));
pthread_mutex_lock(&h->mut);
LogEntry * e = allocUpdateLogEntry(stasis_log_file, -1, xid, op, page, datlen);
LogEntry * e = mallocScratchUpdateLogEntry(INVALID_LSN, INVALID_LSN, xid, op, page, datlen);
memcpy(stasis_log_entry_update_args_ptr(e), dat, datlen);
stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(0, e));
pthread_mutex_unlock(&h->mut);
free(e);
}
compensated_function void TupdateStr(int xid, pageid_t page,
const char *dat, size_t datlen, int op) {
@ -416,7 +418,10 @@ int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) {
void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
assert(xid >= 0);
void * ret = stasis_log_begin_nta(stasis_log_file, stasis_transaction_table_get(stasis_transaction_table, xid), op, dat, datSize);
LogEntry * e = stasis_log_begin_nta(stasis_log_file, stasis_transaction_table_get(stasis_transaction_table, xid), op, dat, datSize);
LogEntry * ret = malloc(sizeofLogEntry(stasis_log_file, e));
memcpy(ret, e, sizeofLogEntry(stasis_log_file, e));
stasis_log_file->write_entry_done(stasis_log_file, e);
DEBUG("Begin Nested Top Action e->LSN: %ld\n", e->LSN);
return ret;
}

View file

@ -88,6 +88,7 @@ struct LogEntry {
UpdateLogEntry update;
};
LogEntry * mallocScratchCommonLogEntry(lsn_t lsn, lsn_t prevLSN, int xid, unsigned int type);
/**
Allocate a log entry that does not contain any extra payload
information. (Eg: Tbegin, Tcommit, etc.)
@ -106,6 +107,8 @@ LogEntry * allocPrepareLogEntry(stasis_log_t *log, lsn_t prevLSN, int xid, lsn_t
*/
LogEntry * allocUpdateLogEntry(stasis_log_t *log, lsn_t prevLSN, int xid,
unsigned int op, pageid_t page, unsigned int arg_size);
LogEntry * mallocScratchUpdateLogEntry(lsn_t LSN, lsn_t prevLSN, int xid,
unsigned int op, pageid_t page, unsigned int arg_size);
/**
Allocate a CLR entry. These are written during recovery as log

View file

@ -56,6 +56,8 @@ START_TEST(rawLogEntryAlloc)
assert(log->xid == 1);
assert(log->type == XABORT);
assert(sizeofLogEntry(0, log) == sizeof(struct __raw_log_entry));
l->write_entry(l, log);
l->write_entry_done(l, log);
Tdeinit();
}
@ -97,7 +99,9 @@ START_TEST(updateLogEntryAlloc)
// printf("sizes %d %d\n",sizeofLogEntry(log),(sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + (sizeof(char))));
assert(sizeofLogEntry(0, log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 3 * (sizeof(char))));
free(log);
l->write_entry(l, log);
l->write_entry_done(l, log);
Tdeinit();
}
END_TEST
@ -123,7 +127,9 @@ START_TEST(updateLogEntryAllocNoExtras)
assert(stasis_log_entry_update_args_ptr(log) == NULL);
assert(sizeofLogEntry(0, log) == (sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + 0 * (sizeof(int)+sizeof(char))));
free(log);
l->write_entry(l, log);
l->write_entry_done(l, log);
Tdeinit();
}

View file

@ -62,6 +62,12 @@ terms specified in this license.
#define LOG_NAME "check_logWriter.log"
LogEntry * dupLogEntry(stasis_log_t * log, const LogEntry *e) {
LogEntry * ret = malloc(sizeofLogEntry(log, e));
memcpy(ret,e,sizeofLogEntry(log, e));
return ret;
}
static stasis_log_t * setup_log() {
int i;
lsn_t prevLSN = -1;
@ -88,6 +94,10 @@ static stasis_log_t * setup_log() {
stasis_log_file->write_entry(stasis_log_file,e);
prevLSN = e->LSN;
LogEntry * tmp = dupLogEntry(stasis_log_file,e);
stasis_log_file->write_entry_done(stasis_log_file, e);
e = tmp;
assert(test <= e->LSN);
if(first) {
@ -99,8 +109,8 @@ static stasis_log_t * setup_log() {
fail_unless(sizeofLogEntry(0, e) == sizeofLogEntry(0, f), "Log entry changed size!!");
fail_unless(0 == memcmp(e,f,sizeofLogEntry(0, e)), "Log entries did not agree!!");
stasis_log_file->write_entry_done(stasis_log_file, e);
free(e);
// stasis_log_file->write_entry_done(stasis_log_file, e);
stasis_log_file->read_entry_done(stasis_log_file, f);
e = allocUpdateLogEntry(stasis_log_file, prevLSN, xid, 1, rid.page, args_size);
@ -108,6 +118,10 @@ static stasis_log_t * setup_log() {
stasis_log_file->write_entry(stasis_log_file,e);
prevLSN = e->prevLSN;
tmp = dupLogEntry(stasis_log_file,e);
stasis_log_file->write_entry_done(stasis_log_file, e);
e = tmp;
// LogEntry * g = allocCLRLogEntry(100, 1, 200, rid, 0); //prevLSN);
LogEntry * g = allocCLRLogEntry(stasis_log_file, e); // XXX will probably break
g->prevLSN = firstLSN;
@ -115,7 +129,7 @@ static stasis_log_t * setup_log() {
assert (g->type == CLRLOG);
prevLSN = g->LSN;
stasis_log_file->write_entry_done(stasis_log_file, e);
free(e);
stasis_log_file->write_entry_done(stasis_log_file, g);
}
return stasis_log_file;
@ -313,7 +327,6 @@ static void* worker_thread(void * arg) {
stasis_log_t * stasis_log_file = stasis_log();
while(i < ENTRIES_PER_THREAD) {
LogEntry * le = allocCommonLogEntry(stasis_log_file, -1, -1, XBEGIN);
int threshold;
long entry;
int needToTruncate = 0;
@ -348,6 +361,7 @@ static void* worker_thread(void * arg) {
if(threshold < 3) {
} else {
LogEntry * le = allocCommonLogEntry(stasis_log_file, -1, -1, XBEGIN);
le->xid = i+key;
#ifdef NO_CONCURRENCY
pthread_mutex_lock(&big);
@ -358,6 +372,7 @@ static void* worker_thread(void * arg) {
#endif
lsns[i] = le->LSN;
i++;
stasis_log_file->write_entry_done(stasis_log_file,le);
}
pthread_mutex_lock(&random_mutex);
#ifdef NO_CONCURRENCY
@ -385,7 +400,7 @@ static void* worker_thread(void * arg) {
/* Try to interleave requests as much as possible */
sched_yield();
stasis_log_file->write_entry_done(stasis_log_file, le);
// stasis_log_file->write_entry_done(stasis_log_file, le);
}
@ -469,6 +484,10 @@ void reopenLogWorkload(int truncating) {
entries[i] = stasis_log_write_update(stasis_log_file,
&l, 0, OPERATION_NOOP, NULL, 0);
LogEntry * e = dupLogEntry(stasis_log_file, entries[i]);
stasis_log_file->write_entry_done(stasis_log_file, entries[i]);
entries[i] = e;
if(i == SYNC_POINT) {
if(truncating) {
stasis_log_file->truncate(stasis_log_file,entries[i]->LSN);
@ -517,6 +536,10 @@ void reopenLogWorkload(int truncating) {
for(int i = 0; i < ENTRY_COUNT; i++) {
entries2[i] = stasis_log_write_update(stasis_log_file, &l, 0, OPERATION_NOOP,
NULL, 0);
LogEntry * e = dupLogEntry(stasis_log_file, entries2[i]);
stasis_log_file->write_entry_done(stasis_log_file, entries2[i]);
entries2[i] = e;
if(i == SYNC_POINT) {
stasis_log_file->force_tail(stasis_log_file, LOG_FORCE_COMMIT);
}
@ -550,8 +573,8 @@ void reopenLogWorkload(int truncating) {
assert(i == (ENTRY_COUNT * 2));
for(int i = 0; i < ENTRY_COUNT; i++) {
stasis_log_file->write_entry_done(stasis_log_file, entries[i]);
stasis_log_file->write_entry_done(stasis_log_file, entries2[i]);
free(entries[i]);
free(entries2[i]);
}
stasis_truncation_automatic = 1;

View file

@ -90,6 +90,8 @@ START_TEST(operation_physical_do_undo) {
// XXX fails; set log format has changed
setToTwo = allocUpdateLogEntry(stasis_log(), -1, xid, OPERATION_SET, rid.page,
sizeof(slotid_t) + sizeof(int64_t) + 2 * sizeof(int));
lsn_t setToTwo_lsn = setToTwo->LSN;
memcpy(stasis_log_entry_update_args_ptr(setToTwo), arg, sizeof(slotid_t) + sizeof(int64_t) + 2 * sizeof(int));
/* Do, undo and redo operation without updating the LSN field of the page. */
@ -179,10 +181,18 @@ START_TEST(operation_physical_do_undo) {
*/
// XXX This is a hack to put some stuff in the log. Otherwise, Tdeinit() fails.
for(int i = 0; i < 10; i++)
((stasis_log_t*)stasis_log())->write_entry(stasis_log(),
allocCommonLogEntry(stasis_log(), -1, -1, -1));
stasis_log_t * log = stasis_log();
setToTwo->LSN = setToTwo_lsn; // XXX hack...
log->write_entry(log, setToTwo);
log->write_entry_done(log, setToTwo);
for(int i = 0; i < 10; i++) {
LogEntry *e = allocCommonLogEntry(log, -1, -1, -1);
log->write_entry(log, e);
log->write_entry_done(log, e);
}
/** @todo need to re-think check_operations. The test is pretty broken. */
Tcommit(xid);
Tdeinit();