add support for lsn-free set (but not alloc, etc...)

This commit is contained in:
Sears Russell 2009-03-15 07:21:37 +00:00
parent bd2015443f
commit 45a2410a25
15 changed files with 208 additions and 28 deletions

View file

@ -13,13 +13,14 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c
logger/filePool.c
logger/inMemoryLog.c
logger/logHandle.c logger/logger2.c
logger/logMemory.c page/raw.c page/slotted.c
logger/logMemory.c page/raw.c page/slotted.c page/lsnFree.c
page/fixed.c compensations.c
operations/pageOperations.c page/indirect.c
operations/decrement.c operations/increment.c
operations/prepare.c operations/set.c
operations/alloc.c operations/noop.c
operations/arrayList.c
operations/lsnFreeSet.c
hash.c
operations/naiveLinearHash.c
operations/linearHashNTA.c

View file

@ -10,7 +10,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common
logger/filePool.c \
logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \
logger/logMemory.c \
page/raw.c page/slotted.c page/fixed.c compensations.c \
page/raw.c page/slotted.c page/lsnFree.c page/fixed.c compensations.c \
operations/pageOperations.c page/indirect.c operations/decrement.c \
operations/increment.c operations/prepare.c operations/set.c \
operations/alloc.c operations/noop.c \
@ -19,6 +19,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common
operations/linearHashNTA.c operations/linkedListNTA.c \
operations/pageOrientedListNTA.c \
operations/regions.c operations/lsmTree.c \
operations/lsnFreeSet.c \
io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c \
io/debug.c \
bufferManager.c \

View file

@ -78,7 +78,9 @@ void redoUpdate(const LogEntry * e) {
Page * p = loadPage(e->xid, e->update.page);
writelock(p->rwlatch,0);
if(stasis_page_lsn_read(p) < e->LSN) {
if(stasis_page_lsn_read(p) < e->LSN ||
e->update.funcID == OPERATION_SET_LSN_FREE ||
e->update.funcID == OPERATION_SET_LSN_FREE_INVERSE) {
DEBUG("OPERATION xid %d Redo, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
// Need to check the id field to find out what the REDO_action

View file

@ -0,0 +1,77 @@
#include <stasis/operations.h>
#include <stasis/page.h>
#include <string.h>
static int op_lsn_free_set(const LogEntry *e, Page *p) {
if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { abort() ; }
assert(e->update.arg_size >= (sizeof(pageoff_t) * 2));
int size = e->update.arg_size;
size -= (2*sizeof(pageoff_t));
const pageoff_t * a = (const pageoff_t*)getUpdateArgs(e);
const byte* b = (const byte*)&(a[2]);
assertlocked(p->rwlatch);
memcpy(p->memAddr + a[0], b, a[1]);
return 0;
}
static int op_lsn_free_unset(const LogEntry *e, Page *p) {
if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { return 0; }
assert(e->update.arg_size >= (sizeof(pageoff_t) * 2));
int size = e->update.arg_size;
size -= (2*sizeof(pageoff_t));
const pageoff_t * a = (const pageoff_t*)getUpdateArgs(e);
const byte* b = (const byte*)&(a[2]);
assertlocked(p->rwlatch);
memcpy(p->memAddr + a[0], b+a[1], a[1]);
return 0;
}
int TsetLSNFree(int xid, recordid rid, const void * dat) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
rid = stasis_record_dereference(xid,p,rid);
short type = stasis_record_type_read(xid,p,rid);
if(type == BLOB_SLOT) {
fprintf(stderr, "LSN-Free blobs are not implemented!\n");
fflush(stderr);
abort();
unlock(p->rwlatch);
} else {
rid.size = stasis_record_type_to_size(rid.size);
intptr_t sz = 2 * (sizeof(pageoff_t) + rid.size);
byte * buf = calloc(sz, 1);
pageoff_t * a = (pageoff_t*)buf;
// XXX hack!
byte * writeBuf = stasis_record_write_begin(xid, p, rid);
a[0] = writeBuf - p->memAddr;
stasis_record_write_done(xid, p, rid, writeBuf);
a[1] = rid.size;
byte * b = (byte*)&(a[2]);
// postimage
memcpy(b,dat,rid.size);
// preimage
stasis_record_read(xid, p, rid, b+rid.size);
unlock(p->rwlatch);
Tupdate(xid,rid.page,buf,sz,OPERATION_SET_LSN_FREE);
free(buf);
}
return 0;
}
Operation getSetLsnFree() {
Operation o = {
OPERATION_SET_LSN_FREE,
OPERATION_SET_LSN_FREE_INVERSE,
op_lsn_free_set
};
return o;
}
Operation getSetLsnFreeInverse() {
Operation o = {
OPERATION_SET_LSN_FREE_INVERSE,
OPERATION_SET_LSN_FREE,
op_lsn_free_unset
};
return o;
}

View file

@ -94,9 +94,6 @@ terms specified in this license.
static page_impl page_impls[MAX_PAGE_TYPE];
/**
XXX latching for pageWriteLSN...
*/
void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
assertlocked(page->rwlatch);
@ -106,9 +103,7 @@ void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
dirtyPages_add(page);
return;
}
/**
XXX latching for pageReadLSN...
*/
lsn_t stasis_page_lsn_read(const Page * page) {
assertlocked(page->rwlatch);
return page->LSN;
@ -131,6 +126,7 @@ void stasis_page_init() {
stasis_page_impl_register(blobImpl());
stasis_page_impl_register(indirectImpl());
stasis_page_impl_register(lsmRootImpl());
stasis_page_impl_register(slottedLsnFreeImpl());
}
void stasis_page_deinit() {

22
src/stasis/page/lsnFree.c Normal file
View file

@ -0,0 +1,22 @@
#include <stasis/page.h>
#include <stasis/page/indirect.h>
#include <stasis/page/slotted.h>
#include <stasis/logger/logger2.h>
void stasis_slotted_lsn_free_initialize_page(Page * p) {
stasis_slotted_initialize_page(p);
*stasis_page_type_ptr(p) = SLOTTED_LSN_FREE_PAGE;
*stasis_page_lsn_ptr(p) = -1;
}
static void lsnFreeLoaded(Page * p) {
p->LSN = stasis_log_file->next_available_lsn(stasis_log_file);
}
static void lsnFreeFlushed(Page * p) { }
page_impl slottedLsnFreeImpl() {
page_impl pi = slottedImpl();
pi.page_type = SLOTTED_LSN_FREE_PAGE;
pi.pageLoaded = lsnFreeLoaded;
pi.pageLoaded = lsnFreeFlushed;
return pi;
}

View file

@ -29,7 +29,8 @@ static inline void slottedFsck(const Page const * page) {
const long slotListStart = (long)slot_length_ptr(&dummy, numslots-1);
assert(slotListStart < PAGE_SIZE && slotListStart >= 0);
assert(page_type == SLOTTED_PAGE ||
page_type == BOUNDARY_TAG_PAGE);
page_type == BOUNDARY_TAG_PAGE ||
page_type == SLOTTED_LSN_FREE_PAGE);
assert(numslots >= 0);
assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE);
assert(freespace >= 0);

View file

@ -58,7 +58,10 @@ void setupOperationsTable() {
operationsTable[OPERATION_INCREMENT] = getIncrement();
operationsTable[OPERATION_DECREMENT] = getDecrement();
operationsTable[OPERATION_ALLOC] = getAlloc();
// operationsTable[OPERATION_PREPARE] = getPrepare();
operationsTable[OPERATION_PREPARE] = getPrepare();
operationsTable[OPERATION_SET_LSN_FREE] = getSetLsnFree();
operationsTable[OPERATION_SET_LSN_FREE_INVERSE]
= getSetLsnFreeInverse();
/* operationsTable[OPERATION_LHINSERT] = getLHInsert();
operationsTable[OPERATION_LHREMOVE] = getLHRemove(); */
operationsTable[OPERATION_DEALLOC] = getDealloc();

View file

@ -107,7 +107,8 @@ typedef long long pageid_t;
#define PAGEID_T_MAX INT64_MAX
typedef int32_t slotid_t;
#define SLOTID_T_MAX INT32_MAX
typedef uint16_t pageoff_t;
#define PAGEOFF_T_MAX UINT16_MAX;
/*#define DEBUGGING */
/*#define PROFILE_LATCHES*/

View file

@ -115,20 +115,21 @@ terms specified in this license.
#define OPERATION_DECREMENT 3
#define OPERATION_ALLOC 4
#define OPERATION_PREPARE 5
//#define OPERATION_LHINSERT 6
//#define OPERATION_LHREMOVE 7
#define OPERATION_SET_LSN_FREE 6
#define OPERATION_SET_LSN_FREE_INVERSE 7
#define OPERATION_DEALLOC 8
#define OPERATION_REALLOC 9
#define OPERATION_PAGE_SET_RANGE 10
#define OPERATION_PAGE_SET_RANGE_INVERSE 11
/*#define OPERATION_UPDATE_FREESPACE 12
#define OPERATION_UPDATE_FREESPACE_INVERSE 13
#define OPERATION_UPDATE_FREELIST 14
#define OPERATION_UPDATE_FREELIST_INVERSE 15
#define OPERATION_FREE_PAGE 16
#define OPERATION_ALLOC_FREED 17
#define OPERATION_UNALLOC_FREED 18 */
// 12
// 13
//#define OPERATION_UPDATE_FREELIST 14
//#define OPERATION_UPDATE_FREELIST_INVERSE 15
//#define OPERATION_FREE_PAGE 16
//#define OPERATION_ALLOC_FREED 17
//#define OPERATION_UNALLOC_FREED 18
#define OPERATION_NOOP 19
#define OPERATION_ARRAY_LIST_ALLOC 21
@ -230,6 +231,7 @@ extern const short SLOT_TYPE_LENGTHS[];
#define BOUNDARY_TAG_PAGE 7
#define BLOB_PAGE 8
#define LSM_ROOT_PAGE 9
#define SLOTTED_LSN_FREE_PAGE 10
#define USER_DEFINED_PAGE(n) (100+n) // 0 <= n < 155
#define MAX_PAGE_TYPE 255

View file

@ -143,7 +143,7 @@ typedef struct {
#include "operations/linearHashNTA.h"
#include "operations/regions.h"
#include "operations/lsmTree.h"
#include "operations/lsnFreeSet.h"
extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */
/**

View file

@ -0,0 +1,6 @@
#ifndef __LSN_FREE_SET_H
#define __LSN_FREE_SET_H
Operation getSetLsnFree();
Operation getSetLsnFreeInverse();
int TsetLSNFree(int xid, recordid rid, const void *dat);
#endif //__LSN_FREE_SET_H

View file

@ -875,10 +875,13 @@ int stasis_page_impl_register(page_impl impl);
returned by loadPage()
*/
void stasis_slotted_initialize_page(Page * p);
void stasis_slotted_lsn_free_initialize_page(Page * p);
void stasis_fixed_initialize_page(Page * page, size_t size, int count);
void stasis_indirect_initialize_page(Page * p, int height);
int stasis_fixed_records_per_page(size_t size);
void stasis_blob_initialize_page(Page * p);
page_impl slottedLsnFreeImpl();
END_C_DECLS
#endif

View file

@ -550,6 +550,70 @@ START_TEST(operation_array_list) {
} END_TEST
START_TEST(operation_lsn_free) {
Tinit();
recordid rid[100];
{
int xid = Tbegin();
pageid_t pid = TpageAlloc(xid);
Page * p = loadPage(xid,pid);
writelock(p->rwlatch,0);
stasis_slotted_lsn_free_initialize_page(p);
// XXX hack!
byte * old = malloc(PAGE_SIZE);
memcpy(old, p->memAddr, PAGE_SIZE);
int fortyTwo = 42;
for(int i = 0; i < 100; i++) {
rid[i] = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rid[i]);
stasis_record_write(xid, p, -1, rid[i], (const byte*)&fortyTwo);
}
byte * new = malloc(PAGE_SIZE);
memcpy(new, p->memAddr, PAGE_SIZE);
memcpy(p->memAddr, old, PAGE_SIZE);
unlock(p->rwlatch);
releasePage(p);
TpageSet(xid, pid, new);
free(old);
free(new);
Tcommit(xid);
}
{
int xid[2];
xid[0] = Tbegin();
xid[1] = Tbegin();
for(int i = 0; i < 100; i++) {
int foo;
Tread(xid[i%2],rid[i], &foo);
assert(foo == 42);
TsetLSNFree(xid[i%2], rid[i], &i);
Tread(xid[i%2],rid[i], &foo);
assert(foo == i);
}
Tcommit(xid[0]);
Tabort(xid[1]);
}
Tdeinit();
Tinit();
{
int xid = Tbegin();
for(int i = 0; i < 100; i++) {
int foo;
Tread(xid, rid[i], &foo);
if(i%2) {
assert(foo == 42);
} else {
assert(foo == i);
}
}
Tcommit(xid);
}
Tdeinit();
} END_TEST
/**
Add suite declarations here
*/
@ -569,6 +633,7 @@ Suite * check_suite(void) {
}
tcase_add_test(tc, operation_alloc_test);
tcase_add_test(tc, operation_array_list);
tcase_add_test(tc, operation_lsn_free);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);