Added new dynamic version of multicolumn, and pagelayouts, etc to support it.

This commit is contained in:
Sears Russell 2008-03-08 03:16:08 +00:00
parent 922669bde4
commit 50c5123548
8 changed files with 141 additions and 60 deletions

View file

@ -125,7 +125,7 @@ class For {
};
For(void * mem): mem_(mem) { }
inline slot_index_t recordCount(int xid) {
inline slot_index_t recordCount() {
return *numdeltas_ptr();
}

View file

@ -44,6 +44,7 @@ Multicolumn<TUPLE>::Multicolumn(Page * p) :
exceptions_(p_->memAddr + *exceptions_offset_ptr()),
dispatcher_(*column_count_ptr()),
unpacked_(0) {
byte_off_t first_free = 0;
for(int i = 0; i < *column_count_ptr(); i++) {
@ -53,7 +54,8 @@ Multicolumn<TUPLE>::Multicolumn(Page * p) :
byte_off_t column_length = dispatcher_.bytes_used(i);
columns_[i] = p_->memAddr + *column_offset_ptr(i);
dispatcher_.set_plugin(columns_[i],i, *column_plugin_id_ptr(i));
// dispatcher_.set_plugin(columns_[i],i, *column_plugin_id_ptr(i));
// assert(columns_[i] == page_column_ptr);
first_free = *column_offset_ptr(i) + column_length;
}
@ -99,26 +101,7 @@ void Multicolumn<TUPLE>::pack() {
template <class TUPLE>
Multicolumn<TUPLE>::~Multicolumn() {
// XXX this is doing the wrong thing; it should be freeing memory, and
// doing nothing else; instead, it's pack()ing the page and leaking
// space.
//byte_off_t first_free = 0;
//byte_off_t last_free = (intptr_t)(first_header_byte_ptr() - p_->memAddr);
// if(unpacked_) {
//*exceptions_len_ptr() = USABLE_SIZE_OF_PAGE - first_exception_byte_;
//last_free -= *exceptions_len_ptr();
//*exceptions_offset_ptr() = last_free;
//memcpy(&(p_->memAddr[*exceptions_offset_ptr()]),
//exceptions_ + first_exception_byte_, *exceptions_len_ptr());
for(int i = 0; i < *column_count_ptr(); i++) {
//*column_offset_ptr(i) = first_free;
//byte_off_t bytes_used = dispatcher_.bytes_used(i);
//memcpy(column_base_ptr(i), columns_[i], bytes_used);
//first_free += bytes_used;
//assert(first_free <= last_free);
if(unpacked_) delete [] columns_[i];
}
@ -187,6 +170,39 @@ inline TUPLE* Multicolumn<TUPLE>::recordRead(int xid, slot_index_t slot,
return buf;
}
template <class TUPLE>
slot_index_t inline Multicolumn<TUPLE>::recordCount(int xid) {
slot_index_t rec = dispatcher_.recordCount(0);
// printf("column count ptr %d\n", *column_count_ptr());
for(slot_index_t i = 1; i < *column_count_ptr(); i++){
slot_index_t c = dispatcher_.recordCount(i);
rec = rec > c ? c : rec;
}
return rec;
}
template <class TUPLE>
inline TUPLE*
Multicolumn<TUPLE>::recordFind(int xid, TUPLE& val, TUPLE& scratch) {
column_number_t cols = scratch.column_count();
std::pair<slot_index_t,slot_index_t> pair_scratch;
std::pair<slot_index_t,slot_index_t> * ret = &pair_scratch;
pair_scratch.first = 0;
pair_scratch.second = recordCount(xid);
for(column_number_t i = 0; ret && i < cols; i++) {
ret = dispatcher_.recordFind(xid, i, ret->first, ret->second,
exceptions_, val.get(i), pair_scratch);
}
if(ret) {
recordRead(xid,ret->first, &scratch);
return &scratch;
} else {
return 0;
}
}
/// End performance-critical code ---------------------------------------------
/// Stuff below this line interfaces with Stasis' buffer manager --------------

View file

@ -92,7 +92,10 @@ template <class TUPLE> class Multicolumn {
}
inline slot_index_t append(int xid, TUPLE const & dat);
inline TUPLE * recordRead(int xid, slot_index_t slot, TUPLE * buf);
inline TUPLE * recordFind(int xid, TUPLE& val, TUPLE& scratch);
inline slot_index_t recordCount(int xid);
inline void pack();
private:
typedef struct column_header {
@ -157,7 +160,10 @@ template <class TUPLE> class Multicolumn {
byte ** columns_;
byte_off_t first_exception_byte_;
byte * exceptions_;
public:
PluginDispatcher dispatcher_;
private:
int bytes_left_;
int unpacked_;
friend void multicolumnLoaded<TUPLE>(Page *p);

View file

@ -62,7 +62,7 @@ namespace rose {
//// --- multicolumn static page layout
template <int N, class FORMAT>
class MultiColumnTypePageLayout {
class StaticMultiColumnTypePageLayout {
public:
typedef FORMAT FMT;
static inline void initPageLayout() {
@ -81,18 +81,6 @@ namespace rose {
}
static inline FORMAT * initPage(Page *p, const typename FORMAT::TUP * t) {
plugin_id_t plugins[N];
if(0 < N) plugins[0] = plugin_id<FORMAT, typename FORMAT::CMP0, typename FORMAT::CMP0::TYP>();
if(1 < N) plugins[1] = plugin_id<FORMAT, typename FORMAT::CMP1, typename FORMAT::CMP1::TYP>();
if(2 < N) plugins[2] = plugin_id<FORMAT, typename FORMAT::CMP2, typename FORMAT::CMP2::TYP>();
if(3 < N) plugins[3] = plugin_id<FORMAT, typename FORMAT::CMP3, typename FORMAT::CMP3::TYP>();
if(4 < N) plugins[4] = plugin_id<FORMAT, typename FORMAT::CMP4, typename FORMAT::CMP4::TYP>();
if(5 < N) plugins[5] = plugin_id<FORMAT, typename FORMAT::CMP5, typename FORMAT::CMP5::TYP>();
if(6 < N) plugins[6] = plugin_id<FORMAT, typename FORMAT::CMP6, typename FORMAT::CMP6::TYP>();
if(7 < N) plugins[7] = plugin_id<FORMAT, typename FORMAT::CMP7, typename FORMAT::CMP7::TYP>();
if(8 < N) plugins[8] = plugin_id<FORMAT, typename FORMAT::CMP8, typename FORMAT::CMP8::TYP>();
if(9 < N) plugins[9] = plugin_id<FORMAT, typename FORMAT::CMP9, typename FORMAT::CMP9::TYP>();
FORMAT * f = new FORMAT(-1,p);
if(0 < N) f->compressor0()->offset(*t->get0());
@ -119,14 +107,56 @@ namespace rose {
static int my_init_num;
};
template <int N, class FORMAT>
int MultiColumnTypePageLayout<N,FORMAT>::my_cmp_num = -1;
int StaticMultiColumnTypePageLayout<N,FORMAT>::my_cmp_num = -1;
template <int N, class FORMAT>
int MultiColumnTypePageLayout<N,FORMAT>::my_init_num = -1;
int StaticMultiColumnTypePageLayout<N,FORMAT>::my_init_num = -1;
template <class PAGELAYOUT>
recordid TlsmTableAlloc();
template <class FORMAT>
class DynamicMultiColumnTypePageLayout {
public:
typedef FORMAT FMT;
static inline void initPageLayout(plugin_id_t * plugins) {
stasis_page_impl_register(FMT::impl());
lsmTreeRegisterComparator(cmp_num, FMT::TUP::cmp);
lsmTreeRegisterPageInitializer
(init_num, (lsm_page_initializer_t)initPage);
my_cmp_num = cmp_num;
cmp_num++;
my_init_num = init_num;
init_num++;
my_plugins = plugins;
}
static inline FORMAT * initPage(Page *p, const typename FORMAT::TUP * t) {
const column_number_t column_count = t->column_count();
FORMAT * f = new FORMAT(-1, p, column_count, my_plugins);
for(column_number_t i = 0; i < column_count; i++) {
f->dispatcher_.offset(i, t->get(i));
}
return f;
}
static inline int cmp_id() {
return my_cmp_num;
}
static inline int init_id() {
return my_init_num;
}
private:
static int my_cmp_num;
static plugin_id_t* my_plugins;
static int my_init_num;
};
template <class FORMAT>
int DynamicMultiColumnTypePageLayout<FORMAT>::my_cmp_num = -1;
template <class FORMAT>
plugin_id_t* DynamicMultiColumnTypePageLayout<FORMAT>::my_plugins = 0;
template <class FORMAT>
int DynamicMultiColumnTypePageLayout<FORMAT>::my_init_num = -1;
}
#endif // _ROSE_COMPRESSION_PAGELAYOUT_H__

View file

@ -79,6 +79,9 @@ class PluginDispatcher{
#define caseRead(off,plug_type,col,type,m,ret,fcn,xid,slot,except,scratch) \
case off: { ret = ((plug_type*)plugins_[col])->fcn(xid,slot,except,(type*)scratch); } break
#define caseFind(off,plug_type,col,type,ret,fcn,xid,first,last,except,key,scratch) \
case off: { ret = ((plug_type*)plugins_[col])->fcn(xid,first,last,except,*(type*)key,scratch); } break
#define caseNoArg(off,plug_type,col,type,m,ret,fcn) \
case off: { ret = ((plug_type*)plugins_[col])->fcn(); } break
@ -91,14 +94,17 @@ class PluginDispatcher{
#define caseCompressor(off,plug_type,col,type,nil) \
case off: { ret = (plug_type*)plugins_[col]; } break
inline slot_index_t recordAppend(int xid, column_number_t col,
const void *dat, byte_off_t* except,
byte *exceptions, int *free_bytes) {
slot_index_t ret;
dispatchSwitch(col,caseAppend,append,ret,xid,dat,except,exceptions,
free_bytes);
return ret;
}
#define caseOffset(off,plug_type,col,type,val) \
case off: { ((plug_type*)plugins_[col])->offset(*(type*)val); } break
inline slot_index_t recordAppend(int xid, column_number_t col,
const void *dat, byte_off_t* except,
byte *exceptions, int *free_bytes) {
slot_index_t ret;
dispatchSwitch(col,caseAppend,append,ret,xid,dat,except,exceptions,
free_bytes);
return ret;
}
inline void *recordRead(int xid, byte *mem, column_number_t col,
slot_index_t slot, byte* exceptions, void *scratch) {
@ -106,6 +112,13 @@ class PluginDispatcher{
dispatchSwitch(col,caseRead,mem,ret,recordRead,xid,slot,exceptions,scratch);
return ret;
}
inline std::pair<slot_index_t,slot_index_t> *recordFind(int xid, column_number_t col,
slot_index_t first, slot_index_t last, byte* exceptions,
const void * key, std::pair<slot_index_t,slot_index_t>& pair_scratch) {
std::pair<slot_index_t,slot_index_t> * ret;
dispatchSwitch(col,caseFind,ret,recordFind,xid,first,last,exceptions,key,pair_scratch);
return ret;
}
inline byte_off_t bytes_used(column_number_t col) {
byte_off_t ret;
@ -113,6 +126,12 @@ class PluginDispatcher{
return ret;
}
inline slot_index_t recordCount(column_number_t col) {
byte_off_t ret;
dispatchSwitch(col,caseNoArg,mem,ret,recordCount);
return ret;
}
inline void init_mem(byte * mem, column_number_t col) {
dispatchSwitch(col,caseInitMem,mem);
}
@ -125,25 +144,29 @@ class PluginDispatcher{
dispatchSwitch(col,caseCompressor,0);
return ret;
}
inline void offset(column_number_t col, void * val) {
dispatchSwitch(col,caseOffset,val);
}
PluginDispatcher(column_number_t column_count) :
column_count_(column_count), plugin_ids_(new plugin_id_t[column_count]), plugins_(new void*[column_count]) {
for(column_number_t i = 0; i < column_count; i++) {
plugin_ids_[i] = 0;
plugins_[i] = 0;
}
}
PluginDispatcher(int xid, byte *mem,column_number_t column_count, plugin_id_t * plugins) :
/* PluginDispatcher(int xid, byte *mem,column_number_t column_count, plugin_id_t * plugins) :
column_count_(column_count), plugin_ids_(new plugin_id_t[column_count]), plugins_(new void*[column_count]) {
for(column_number_t i = 0; i < column_count; i++) {
plugin_ids_[i] = 0;
set_plugin(mem,i,plugins[i]);
}
}
} */
inline void set_plugin(byte *mem,column_number_t c, plugin_id_t p) {
if(plugin_ids_[c]) {
/* if(plugin_ids_[c]) {
dispatchSwitch(c,caseDelPlugin,0);
}
} */
plugin_ids_[c] = p;
dispatchSwitch(c,caseSetPlugin,mem);
}
@ -160,9 +183,11 @@ class PluginDispatcher{
#undef caseSetPlugin
#undef caseDelPlugin
#undef caseRead
#undef caseFind
#undef caseNoArg
#undef caseInitMem
#undef caseCompressor
#undef caseOffset
private:

View file

@ -56,7 +56,7 @@ class Rle {
n->copies = 0;
n->data = 0;
}
inline slot_index_t recordCount(int xid) {
inline slot_index_t recordCount() {
triple_t *n = last_block_ptr();
return (n->index) + (n->copies);
}

View file

@ -232,16 +232,16 @@ class StaticMulticolumn {
slot_index_t recordCount;
slot_index_t c;
// XXX memoize this function
if(0 < N) recordCount = plugin0->recordCount(xid);
if(1 < N) { c = plugin1->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(2 < N) { c = plugin2->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(3 < N) { c = plugin3->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(4 < N) { c = plugin4->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(5 < N) { c = plugin5->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(6 < N) { c = plugin6->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(7 < N) { c = plugin7->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(8 < N) { c = plugin8->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(9 < N) { c = plugin9->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(0 < N) recordCount = plugin0->recordCount();
if(1 < N) { c = plugin1->recordCount(); recordCount = recordCount > c ? c :recordCount; }
if(2 < N) { c = plugin2->recordCount(); recordCount = recordCount > c ? c :recordCount; }
if(3 < N) { c = plugin3->recordCount(); recordCount = recordCount > c ? c :recordCount; }
if(4 < N) { c = plugin4->recordCount(); recordCount = recordCount > c ? c :recordCount; }
if(5 < N) { c = plugin5->recordCount(); recordCount = recordCount > c ? c :recordCount; }
if(6 < N) { c = plugin6->recordCount(); recordCount = recordCount > c ? c :recordCount; }
if(7 < N) { c = plugin7->recordCount(); recordCount = recordCount > c ? c :recordCount; }
if(8 < N) { c = plugin8->recordCount(); recordCount = recordCount > c ? c :recordCount; }
if(9 < N) { c = plugin9->recordCount(); recordCount = recordCount > c ? c :recordCount; }
return recordCount;
}
/* inline slot_index_t recordCount(int xid) {

View file

@ -12,6 +12,10 @@ namespace rose {
static const char TOMBSTONE = 1;
static const int TUPLE_ID = 1;
static const int NN = N;
/** Compatibility for dynamic dispatch stuff */
inline int column_count() const { return NN; }
inline void* get(int i) const { return ((byte*)&s) + cols_[i]; }
typedef TYPE0 TYP0;
typedef TYPE1 TYP1;
typedef TYPE2 TYP2;