Added support for segment based recovery. For now, Stasis doesn't support hybrid recovery, so mixing page deallocation and segments will make recovery corrupt data / segfault.

This commit is contained in:
Sears Russell 2009-07-13 17:18:01 +00:00
parent d6c91241d6
commit 4493dbb88b
16 changed files with 281 additions and 55 deletions

View file

@ -29,6 +29,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c
operations/alloc.c operations/noop.c
operations/arrayList.c
operations/lsnFreeSet.c
operations/segmentFile.c
operations/group/logStructured.c
hash.c
operations/naiveLinearHash.c

View file

@ -24,6 +24,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common
operations/regions.c operations/lsmTree.c \
operations/lsnFreeSet.c \
operations/group/logStructured.c \
operations/segmentFile.c \
io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c \
io/debug.c io/handle.c \
bufferManager.c \

View file

@ -103,13 +103,12 @@ static lsn_t stasis_log_write_prepare(stasis_log_t* log, TransactionLog * l) {
}
LogEntry * stasis_log_write_update(stasis_log_t* log, TransactionLog * l,
Page * p, unsigned int op,
pageid_t page, unsigned int op,
const byte * arg, size_t arg_size) {
LogEntry * e = allocUpdateLogEntry(l->prevLSN, l->xid, op,
p ? p->id : INVALID_PAGE,
page,
arg, arg_size);
log->write_entry(log, e);
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size);

View file

@ -13,7 +13,7 @@ static void* stasis_log_reordering_handle_worker(void * a) {
while(chunk_len < h->chunk_len && h->cur_len) {
LogEntry * e = stasis_log_write_update(h->log,
h->l,
h->queue[h->cur_off].p,
h->queue[h->cur_off].p->id,
h->queue[h->cur_off].op,
h->queue[h->cur_off].arg,
h->queue[h->cur_off].arg_size);

View file

@ -113,23 +113,23 @@ void stasis_operation_table_init() {
stasis_operation_impl_register(stasis_op_impl_region_dealloc());
stasis_operation_impl_register(stasis_op_impl_region_dealloc_inverse());
stasis_operation_impl_register(stasis_op_impl_segment_file_pwrite());
stasis_operation_impl_register(stasis_op_impl_segment_file_pwrite_inverse());
}
void stasis_operation_do(const LogEntry * e, Page * p) {
if(p) {
assertlocked(p->rwlatch);
assert(e->update.funcID != OPERATION_INVALID);
stasis_operation_table[e->update.funcID].run(e, p);
DEBUG("OPERATION xid %d Do, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
if(p) assertlocked(p->rwlatch);
assert(e->update.funcID != OPERATION_INVALID);
stasis_operation_table[e->update.funcID].run(e, p);
stasis_page_lsn_write(e->xid, p, e->LSN);
} else {
stasis_operation_table[e->update.funcID].run(e,NULL);
}
DEBUG("OPERATION xid %d Do, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, p ? stasis_page_lsn_read(p) : -1);
if(p) stasis_page_lsn_write(e->xid, p, e->LSN);
}
@ -144,7 +144,7 @@ void stasis_operation_redo(const LogEntry * e, Page * p) {
if(stasis_operation_table[e->update.funcID].redo == OPERATION_NOOP) {
return;
}
if(stasis_page_lsn_read(p) < e->LSN ||
if((!p) || stasis_page_lsn_read(p) < e->LSN ||
e->update.funcID == OPERATION_SET_LSN_FREE ||
e->update.funcID == OPERATION_SET_LSN_FREE_INVERSE) {
DEBUG("OPERATION xid %d Redo, %lld {%lld:%lld}\n", e->xid,
@ -157,7 +157,7 @@ void stasis_operation_redo(const LogEntry * e, Page * p) {
assert(stasis_operation_table[e->update.funcID].redo != OPERATION_INVALID);
stasis_operation_table[stasis_operation_table[e->update.funcID].redo]
.run(e,p);
stasis_page_lsn_write(e->xid, p, e->LSN);
if(p) stasis_page_lsn_write(e->xid, p, e->LSN);
} else {
DEBUG("OPERATION xid %d skip redo, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
@ -172,8 +172,8 @@ void stasis_operation_undo(const LogEntry * e, lsn_t effective_lsn, Page * p) {
int undo = stasis_operation_table[e->update.funcID].undo;
assert(undo != OPERATION_INVALID);
if(e->update.page == INVALID_PAGE) {
// logical undos are excuted unconditionally.
if(e->update.page == INVALID_PAGE || e->update.page == SEGMENT_PAGEID) {
// logical undos are executed unconditionally, as are segment-based undos
DEBUG("OPERATION xid %d FuncID %d Undo, %d LSN %lld {logical}\n", e->xid,
e->update.funcID, undo, e->LSN);

View file

@ -0,0 +1,147 @@
/*
* segmentFile.c
*
* Created on: Jul 7, 2009
* Author: sears
*/
#include <stasis/transactional.h>
#include <string.h>
static inline off_t stasis_offset_from_page(pageid_t pid, int pageoffset) {
return (pid * PAGE_SIZE) + pageoffset;
}
static inline pageid_t stasis_page_from_offset(off_t off) {
return off / PAGE_SIZE;
}
static inline int stasis_page_offset_from_offset(off_t off) {
return off % PAGE_SIZE;
}
static inline off_t stasis_next_page_boundary(off_t off) {
return 1 + (off/PAGE_SIZE);
}
static inline off_t stasis_min_offset(off_t a, off_t b) {
return a < b ? a : b;
}
static ssize_t read_write_helper(int read, int xid, lsn_t lsn, byte* buf, size_t count, off_t offset) {
// the first page that has at least one byte for us on it
pageid_t start = stasis_page_from_offset(offset);
// the last page that has such a byte
pageid_t stop = stasis_page_from_offset(offset + count - 1);
// copy the first page
Page * p = loadPageOfType(xid, start, SEGMENT_PAGE);
if(read) { readlock(p->rwlatch, 0); } else { writelock(p->rwlatch,0); }
off_t start_offset = stasis_page_offset_from_offset(offset);
byte * page_buf = p->memAddr + start_offset;
byte * user_buf = buf;
size_t sz = stasis_min_offset(start_offset + count, PAGE_SIZE) - start_offset;
if(read) {
memcpy(user_buf, page_buf, sz);
} else {
memcpy(page_buf, user_buf, sz);
stasis_page_lsn_write(xid, p, lsn);
}
unlock(p->rwlatch);
releasePage(p);
pageid_t n = 0;
// calculate the number of bytes copied above
// (assuming there is mbore data to copy; otherwise, this value will not be used)
off_t buf_phase = PAGE_SIZE - start_offset;
// copy all pages except for the first and last
for(pageid_t i = start+1; i < stop-1; i++) {
p = loadPageOfType(xid, i, SEGMENT_PAGE);
if(read) { readlock(p->rwlatch, 0); } else { writelock(p->rwlatch,0); }
page_buf = p->memAddr;
user_buf = buf + buf_phase + (n * PAGE_SIZE);
if(read) {
memcpy(user_buf, page_buf, PAGE_SIZE);
} else {
memcpy(page_buf, user_buf, PAGE_SIZE);
stasis_page_lsn_write(xid, p, lsn);
}
unlock(p->rwlatch);
releasePage(p);
n++;
}
// copy the last page (if necessary)
if(start != stop) {
p = loadPage(xid, stop);
if(read) { readlock(p->rwlatch, 0); } else { writelock(p->rwlatch,0); }
user_buf = buf + buf_phase + (n * PAGE_SIZE);
page_buf = p->memAddr;
sz = count % PAGE_SIZE;
if(read) {
memcpy(user_buf, page_buf, sz);
} else {
memcpy(page_buf, user_buf, sz);
stasis_page_lsn_write(xid, p, lsn);
}
unlock(p->rwlatch);
releasePage(p);
}
return count;
}
typedef struct {
off_t offset;
} segment_file_arg_t;
ssize_t Tpread(int xid, byte* buf, size_t count, off_t offset) {
return read_write_helper(1, xid, -1, buf, count, offset);
}
ssize_t Tpwrite(int xid, const byte * buf, size_t count, off_t offset) {
byte * buf2 = malloc(count);
read_write_helper(1, xid, -1, buf2, count, offset);
size_t entrylen = sizeof(segment_file_arg_t) + 2*count;
segment_file_arg_t * entry = malloc(entrylen);
entry->offset = offset;
memcpy((entry+1), buf, count);
memcpy(((byte*)(entry+1))+count, buf2, count);
free(buf2);
Tupdate(xid, SEGMENT_PAGEID, entry, entrylen, OPERATION_SEGMENT_FILE_PWRITE);
return count;
}
static int op_segment_file_pwrite(const LogEntry* e, Page* p) {
assert(p == 0);
size_t count = (e->update.arg_size - sizeof(segment_file_arg_t)) / 2;
segment_file_arg_t * arg = (segment_file_arg_t*)getUpdateArgs(e);
off_t offset = arg->offset;
read_write_helper(0, e->xid, e->LSN, (byte*)(arg+1), count, offset);
return 0;
}
static int op_segment_file_pwrite_inverse(const LogEntry* e, Page* p) {
assert(p == 0);
size_t count = (e->update.arg_size - sizeof(segment_file_arg_t)) / 2;
segment_file_arg_t * arg = (segment_file_arg_t*)getUpdateArgs(e);
off_t offset = arg->offset;
read_write_helper(0, e->xid, e->LSN, ((byte*)(arg+1))+count, count, offset);
return 0;
}
stasis_operation_impl stasis_op_impl_segment_file_pwrite() {
static stasis_operation_impl o = {
OPERATION_SEGMENT_FILE_PWRITE,
OPERATION_SEGMENT_FILE_PWRITE,
OPERATION_SEGMENT_FILE_PWRITE_INVERSE,
op_segment_file_pwrite
};
return o;
}
stasis_operation_impl stasis_op_impl_segment_file_pwrite_inverse() {
static stasis_operation_impl o = {
OPERATION_SEGMENT_FILE_PWRITE_INVERSE,
OPERATION_SEGMENT_FILE_PWRITE_INVERSE,
OPERATION_SEGMENT_FILE_PWRITE,
op_segment_file_pwrite_inverse
};
return o;
}

View file

@ -284,7 +284,7 @@ void stasis_page_loaded(Page * p, pagetype_t type){
p->pageType = (type == UNKNOWN_TYPE_PAGE) ? *stasis_page_type_ptr(p) : type;
if(p->pageType) {
assert(page_impls[p->pageType].page_type == p->pageType);
page_impls[p->pageType].pageLoaded(p);
if (page_impls[p->pageType].pageLoaded) page_impls[p->pageType].pageLoaded(p);
} else {
p->LSN = *stasis_page_lsn_ptr(p); // XXX kludge - shouldn't special-case UNINITIALIZED_PAGE
}
@ -298,7 +298,7 @@ void stasis_page_flushed(Page * p){
*stasis_page_type_ptr(p)= type;
*stasis_page_lsn_ptr(p) = p->LSN;
}
page_impls[type].pageFlushed(p);
if(page_impls[type].pageFlushed) page_impls[type].pageFlushed(p);
} else {
*stasis_page_type_ptr(p)= type;
*stasis_page_lsn_ptr(p) = p->LSN;
@ -308,7 +308,7 @@ void stasis_page_cleanup(Page * p) {
short type = p->pageType;
if(type) {
assert(page_impls[type].page_type == type);
page_impls[type].pageCleanup(p);
if(page_impls[type].pageCleanup) page_impls[type].pageCleanup(p);
}
}

View file

@ -187,19 +187,21 @@ static void stasis_recovery_redo(stasis_log_t* log) {
// Check to see if this entry's action needs to be redone
switch(e->type) {
case UPDATELOG:
{
if(e->update.page == INVALID_PAGE) {
// logical redo; ignore
} else {
Page * p = loadPage(e->xid, e->update.page);
writelock(p->rwlatch,0);
stasis_operation_redo(e,p);
unlock(p->rwlatch);
releasePage(p);
}
} break;
{
if(e->update.page == INVALID_PAGE) {
// this entry specifies a logical undo operation; ignore it.
} else if(e->update.page == SEGMENT_PAGEID) {
stasis_operation_redo(e,0);
} else {
Page * p = loadPage(e->xid, e->update.page);
writelock(p->rwlatch,0);
stasis_operation_redo(e,p);
unlock(p->rwlatch);
releasePage(p);
}
} break;
case CLRLOG:
{
{
// if compensated_lsn == -1, then this clr is closing a nested top
// action that was performed during undo. Therefore, we do not
// want to undo it again.

View file

@ -119,7 +119,7 @@ int Tbegin() {
int i, index = 0;
int xidCount_tmp;
assert(stasis_initted);
assert(stasis_initted);
pthread_mutex_lock(&stasis_transaction_table_mutex);
@ -152,27 +152,33 @@ int Tbegin() {
return stasis_transaction_table[index].xid;
}
static compensated_function void TactionHelper(int xid,
const void * dat, size_t datlen, int op,
Page * p) {
compensated_function void Tupdate(int xid, pageid_t page,
const void * dat, size_t datlen, int op) {
assert(stasis_initted);
assert(page != INVALID_PAGE);
LogEntry * e;
assert(xid >= 0 && stasis_transaction_table[xid % MAX_TRANSACTIONS].xid == xid);
Page * p = (page == SEGMENT_PAGEID) ? 0 : loadPage(xid, page);
try {
if(globalLockManager.writeLockPage) {
globalLockManager.writeLockPage(xid, p->id);
if(globalLockManager.writeLockPage && p) {
globalLockManager.writeLockPage(xid, page);
}
} end;
writelock(p->rwlatch,0);
if(p) writelock(p->rwlatch,0);
e = stasis_log_write_update(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS],
p, op, dat, datlen);
page, op, dat, datlen);
assert(stasis_transaction_table[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
stasis_operation_do(e, p);
freeLogEntry(e);
unlock(p->rwlatch);
if(p) unlock(p->rwlatch);
if(p) releasePage(p);
}
void TreorderableUpdate(int xid, void * hp, pageid_t page,
@ -234,14 +240,6 @@ compensated_function void TupdateStr(int xid, pageid_t page,
Tupdate(xid, page, dat, datlen, op);
}
compensated_function void Tupdate(int xid, pageid_t page,
const void *dat, size_t datlen, int op) {
Page * p = loadPage(xid, page);
assert(stasis_initted);
TactionHelper(xid, dat, datlen, op, p);
releasePage(p);
}
compensated_function void TreadStr(int xid, recordid rid, char * dat) {
Tread(xid, rid, dat);
}

View file

@ -93,8 +93,11 @@ BEGIN_C_DECLS
*/
Page * loadPage(int xid, pageid_t pageid);
Page * loadPageOfType(int xid, pageid_t pageid, pagetype_t type);
Page * loadUninitializedPage(int xid, pageid_t pageid);
/**
This is the function pointer that stasis_buffer_manager_open sets in order to
override loadPage.

View file

@ -130,8 +130,10 @@ terms specified in this license.
#define OPERATION_PAGE_SET_RANGE 10
#define OPERATION_PAGE_SET_RANGE_INVERSE 11
// 12
// 13
#define OPERATION_SEGMENT_FILE_PWRITE 12
#define OPERATION_SEGMENT_FILE_PWRITE_INVERSE 13
// 14
// 15
// 16
@ -189,6 +191,7 @@ terms specified in this license.
#define SLOT_TYPE_END (-4)
#define INVALID_PAGE (-1)
#define SEGMENT_PAGEID (-2)
// TODO unify naming convention for ROOT_RECORD, NULLRID
static const recordid ROOT_RECORD = {1, 0, -1};

View file

@ -309,7 +309,7 @@ lsn_t stasis_log_end_aborted_transaction (stasis_log_t* log, TransactionLog * l)
state of the parameter l.
*/
LogEntry * stasis_log_write_update(stasis_log_t* log,
TransactionLog * l, Page * p, unsigned int operation,
TransactionLog * l, pageid_t page, unsigned int operation,
const byte * arg, size_t arg_size);
/**

View file

@ -150,6 +150,8 @@ typedef struct {
#include "operations/lsmTree.h"
#include "operations/lsnFreeSet.h"
#include "operations/group.h"
#include "operations/segmentFile.h"
/**
Initialize stasis' operation table.
*/

View file

@ -0,0 +1,17 @@
/*
* segmentFile.h
*
* Created on: Jul 9, 2009
* Author: sears
*/
#ifndef SEGMENTFILE_H_
#define SEGMENTFILE_H_
ssize_t Tpread(int xid, byte* buf, size_t count, off_t offset);
ssize_t Tpwrite(int xid, const byte * buf, size_t count, off_t offset);
stasis_operation_impl stasis_op_impl_segment_file_pwrite();
stasis_operation_impl stasis_op_impl_segment_file_pwrite_inverse();
#endif /* SEGMENTFILE_H_ */

View file

@ -450,7 +450,7 @@ void reopenLogWorkload(int truncating) {
for(int i = 0; i < ENTRY_COUNT; i++) {
entries[i] = stasis_log_write_update(stasis_log_file,
&l, NULL, OPERATION_NOOP, NULL, 0);
&l, 0, OPERATION_NOOP, NULL, 0);
if(i == SYNC_POINT) {
if(truncating) {
@ -497,7 +497,7 @@ void reopenLogWorkload(int truncating) {
LogEntry * entries2[ENTRY_COUNT];
for(int i = 0; i < ENTRY_COUNT; i++) {
entries2[i] = stasis_log_write_update(stasis_log_file, &l, NULL, OPERATION_NOOP,
entries2[i] = stasis_log_write_update(stasis_log_file, &l, 0, OPERATION_NOOP,
NULL, 0);
if(i == SYNC_POINT) {
stasis_log_file->force_tail(stasis_log_file, LOG_FORCE_COMMIT);

View file

@ -685,6 +685,58 @@ START_TEST(operation_reorderable) {
} END_TEST
START_TEST(operation_segment) {
const int SEGMENT_TEST = 42;
Tinit();
int xid = Tbegin();
recordid regionrid = Talloc(xid, sizeof(pageid_t));
pageid_t region_start = TregionAlloc(xid, 10000, SEGMENT_TEST);
Tset(xid, regionrid, &region_start);
Tcommit(xid);
xid = Tbegin();
for(int i = 0; i < 10000 / sizeof(i); i++) {
int ret = Tpwrite(xid, (byte*)&i, sizeof(i), region_start * PAGE_SIZE + i * sizeof(i));
assert(ret == sizeof(i));
}
Tcommit(xid);
TuncleanShutdown();
Tinit();
xid = Tbegin();
for(int i = 0; i < 10000 / sizeof(i); i++) {
int buf;
int ret = Tpread(xid, (byte*)&buf, sizeof(buf), region_start * PAGE_SIZE + i * sizeof(i));
assert(buf == i);
assert(ret == sizeof(buf));
if(i % 2) {
buf = 0;
Tpwrite(xid, (byte*)&buf, sizeof(buf), region_start * PAGE_SIZE + i * sizeof(i));
}
}
for(int i = 0; i < 10000 / sizeof(i); i++) {
int buf;
int ret = Tpread(xid, (byte*)&buf, sizeof(buf), region_start * PAGE_SIZE + i * sizeof(i));
assert(buf == (i % 2 ? 0 : i));
assert(ret == sizeof(buf));
}
Tabort(xid);
xid = Tbegin();
for(int i = 0; i < 10000 / sizeof(i); i++) {
int buf;
int ret = Tpread(xid, (byte*)&buf, sizeof(buf), region_start * PAGE_SIZE + i * sizeof(i));
assert(buf == i);
assert(ret == sizeof(buf));
}
Tcommit(xid);
Tdeinit();
} END_TEST
/**
Add suite declarations here
*/
@ -706,6 +758,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, operation_array_list);
tcase_add_test(tc, operation_lsn_free);
tcase_add_test(tc, operation_reorderable);
tcase_add_test(tc, operation_segment);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);