Added supoort for binary search to page lookups

This commit is contained in:
Sears Russell 2008-06-08 20:28:53 +00:00
parent 651299716e
commit 75f857bc26
4 changed files with 223 additions and 9 deletions

View file

@ -10,6 +10,8 @@
namespace rose {
#define COMPRESSION_BINARY_FIND
typedef int8_t record_size_t;
typedef uint16_t byte_off_t;
typedef uint16_t slot_index_t;

View file

@ -1,6 +1,8 @@
#ifndef _ROSE_COMPRESSION_FOR_IMPL_H__
#define _ROSE_COMPRESSION_FOR_IMPL_H__
#include <stasis/page/compression/binary_search.h>
// Copyright 2007 Google Inc. All Rights Reserved.
// Author: sears@google.com (Rusty Sears)
@ -82,6 +84,8 @@ For<TYPE>::recordRead(int xid, slot_index_t slot, byte *exceptions,
return scratch;
}
}
#ifndef COMPRESSION_BINARY_FIND
template <class TYPE>
inline std::pair<slot_index_t,slot_index_t>*
For<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
@ -126,5 +130,87 @@ For<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
}
return ret;
}
#else // COMPRESSION_BINARY_FIND
template <class TYPE>
inline std::pair<slot_index_t,slot_index_t>*
For<TYPE>::recordFind(int xid, slot_index_t low, slot_index_t high,
byte *exceptions, TYPE value,
std::pair<slot_index_t,slot_index_t>& scratch) {
delta_t delta = value - *base_ptr();
int64_t bs_ret;
//printf("delta = %d\n", delta);
if(delta >= 0) {
{
int64_t bs_low = low;
int64_t bs_high = high;
while(nth_delta_ptr(bs_low) < 0 && bs_low < bs_high) { bs_low++; }
while(nth_delta_ptr(bs_high) < 0 && bs_low < bs_high) { bs_high--; }
DEBUG("low: %d->%ld, high %d->%ld\n", low, bs_low, high, bs_high);
delta_t bs_value = delta;
rose_binary_search(nth_delta_ptr);
}
if(bs_ret == -1) { printf("not found by for\n"); return 0; }
while(scratch.first != low) {
if(*nth_delta_ptr(scratch.first-1) == delta) {
scratch.first --;
} else {
break;
}
}
while(scratch.second != high) {
if(*nth_delta_ptr(scratch.second+1) == delta) {
scratch.second++;
} else {
break;
}
}
DEBUG("front: %ld->%d, back: %ld\n",bs_ret, scratch.first, scratch.second);
return &scratch;
} else { // @todo Optimize lookup of exceptional data. (It can be binary searched too...)
std::pair<slot_index_t,slot_index_t>* ret = 0;
slot_index_t i;
for(i = low; i < high; i++) {
delta_t d = *nth_delta_ptr(i);
if(d >= 0) {
if(d == delta) {
scratch.first = i;
scratch.second = high;
ret = &scratch;
i++;
break;
}
} else {
if(value == *(TYPE*)(exceptions + d + PAGE_SIZE - sizeof(TYPE))) {
scratch.first = i;
scratch.second = high;
ret = &scratch;
i++;
break;
}
}
}
for(;i < high; i++) {
delta_t d = *nth_delta_ptr(i);
if(d >= 0) {
if(d != delta) {
scratch.second = i;
break;
}
} else {
if(value != *(TYPE*)(exceptions +d + PAGE_SIZE - sizeof(TYPE))) {
scratch.second = i;
break;
}
}
}
return ret;
}
}
#endif // COMPRESSION_BINARY_FIND
} // namespace rose
#endif // _ROSE_COMPRESSION_FOR_IMPL_H__

View file

@ -25,6 +25,8 @@ Rle<TYPE>::append(int xid, const TYPE dat,
int *free_bytes) {
int64_t ret;
DEBUG("\trle got %lld\n", (long long)dat);
ret = last_block_ptr()->index + last_block_ptr()->copies;
if(ret == MAX_INDEX) {
@ -58,7 +60,6 @@ inline TYPE *
Rle<TYPE>::recordRead(int xid, slot_index_t slot, byte* exceptions,
TYPE * scratch) {
block_index_t n = nth_block_ptr(last_)->index <= slot ? last_ : 0;
// while (n < *block_count_ptr()) {
do {
triple_t * t = nth_block_ptr(n);
if (t->index <= slot && t->index + t->copies > slot) {
@ -70,6 +71,7 @@ Rle<TYPE>::recordRead(int xid, slot_index_t slot, byte* exceptions,
} while (n < *block_count_ptr());
return 0;
}
#ifndef COMPRESSION_BINARY_FIND
template <class TYPE>
inline std::pair<slot_index_t,slot_index_t>*
Rle<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
@ -80,13 +82,15 @@ Rle<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
do {
triple_t * t = nth_block_ptr(n);
if(t->data >= value) {
scratch.first = t->index;
do {
scratch.second = t->index + t->copies;
n++;
t = nth_block_ptr(n);
} while(n < *block_count_ptr() && t->data == value);
ret = &scratch;
if(t->data == value) {
scratch.first = t->index;
do {
scratch.second = t->index + t->copies;
n++;
t = nth_block_ptr(n);
} while(n < *block_count_ptr() && t->data == value);
ret = &scratch;
}
break;
}
n++;
@ -98,6 +102,111 @@ Rle<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
}
return ret;
}
} // namespace rose
#else // COMPRESSION_BINARY_FIND
template <class TYPE>
inline std::pair<slot_index_t,slot_index_t>*
Rle<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
byte *exceptions, TYPE value,
std::pair<slot_index_t,slot_index_t>& scratch) {
DEBUG("\n\ncalled with start = %lld stop = %lld, val = %lld\n", (long long)start, (long long)stop, (long long)value);
DEBUG("1th data: %lld\n", (long long)nth_block_ptr(1)->data);
block_index_t low = 0;
block_index_t high = *block_count_ptr();
int64_t bs_ret;
int64_t bs_low = low;
int64_t bs_high = high;
DEBUG("low: %d->%ld, high %d->%ld\n", low, bs_low, high, bs_high);
TYPE bs_value = start;
rose_binary_search_greatest_lower(nth_high_index);
if(bs_ret != -1) {
assert(nth_low_index(bs_ret) <= start);
assert(nth_high_bound(bs_ret) > start);
// if(bs_ret < (*block_count_ptr()-1)) {
// assert(nth_low_index(bs_ret+1) > start);
// }
low = bs_ret;
} // else start not found because page starts after it (leave at block zero)
DEBUG("real low is %d\n", low);
bs_low = low;
bs_high = (*block_count_ptr());
bs_value = stop;
DEBUG("bs_low = %lld bs_high = %lld bs_val = %lld\n", (long long)bs_low, (long long)bs_high, (long long)bs_value);
rose_binary_search_greatest_lower(nth_high_bound);
if(bs_ret == -1) {
high = (*block_count_ptr()); //-1;
} else {
DEBUG("bs_ret = %lld\n", (long long)bs_ret);
// bs_high might not contain the index we're looking for
// high = bs_ret + XXX; /// where XXX is 1 sometimes, and zero others.
// if(nth_high_index(bs_ret+1) >= stop) { high = bs_ret+1; } else { high = bs_ret + 2; }
high = bs_ret;
// high contains the top of the range
assert(nth_high_bound(high) >= stop);
assert(nth_low_index(high) < stop);
// set high to the first block that we don't need to consider.
high++;
// if(high > 0) {
// assert(nth_low_index(high-1) <= stop);
// }
}
DEBUG("real high is %lld (last block slots: %lld - %lld) val = %lld\n", (long long int)high, (long long int)nth_low_index(high-1), (long long int)nth_high_index(high-1), (long long int)bs_value);
// now low is the smallest block we need consider; high the highest.
bs_low = low;
bs_high = high;
bs_value = value;
DEBUG("B low: %lld->%lld, high %lld->%lld val = %lld low_idx = %lld low_end = %lld low val = %lld high_idx = %lld high_end = %lld high_val = %lld\n", (long long int)low, (long long int)bs_low, (long long int)high,(long long int) bs_high,(long long int) value,(long long int) nth_low_index(bs_low),(long long int) nth_high_index(bs_low), (long long int)*nth_data_ptr(bs_low),(long long int) nth_low_index(bs_high),(long long int) nth_high_index(bs_high), (long long int)*nth_data_ptr(bs_high));
DEBUG("bs val =%lld bs low = %lld bs high = %lld\n", (long long) bs_value, (long long) bs_low, (long long) bs_high);
rose_binary_search(nth_data_ptr);
DEBUG("bs ret = %lld\n", (long long int)bs_ret);
if(bs_ret == -1) {
printf("[ %lld %lld %lld ]\n", (long long int)*(TYPE*)nth_data_ptr(0), (long long int)*(TYPE*)nth_data_ptr(1), (long long int)*(TYPE*)nth_data_ptr(2));
printf("not found by rle: start = %d stop = %d val = %lld count = %d zero copies %d low = %lld high = %lld\n", start, stop, (long long)value, *block_count_ptr(), nth_block_ptr(0)->copies, (long long)low, (long long)high);
abort();
return 0;
} else {
block_index_t firstBlock = bs_ret;
block_index_t lastBlock = bs_ret; // lastBlock is the offset of the last block in range
while(firstBlock > low && *nth_data_ptr(firstBlock-1) == value) { firstBlock--; }
// firstblock is >= 0 and it contains items of the desired value
while(lastBlock < high && *nth_data_ptr(lastBlock) == value) { lastBlock++; }
// lastblock == block_count_ptr or it contains items of the wrong value
lastBlock--;
// lastblock valid, and it contains items of the correct value
DEBUG("looking at blocks %d - %d\n", firstBlock, lastBlock);
scratch.first = nth_block_ptr(firstBlock)->index;
// set second to one past last valid index.
scratch.second = nth_block_ptr(lastBlock)->index + nth_block_ptr(lastBlock)->copies;
if(scratch.first < start) { scratch.first = start; }
if(scratch.second > stop) { scratch.second = stop; }
DEBUG("startstop = %d,%d scratch = %d,%d\n",start, stop, scratch.first, scratch.second);
return &scratch;
}
}
#endif // COMPRESSION_BINARY_FIND
} // namespace rose
#endif // _ROSE_COMPRESSION_RLE_IMPL_H__

View file

@ -101,6 +101,23 @@ class Rle {
inline triple_t* nth_block_ptr(block_index_t n) {
return reinterpret_cast<triple_t*>(block_count_ptr()+1) + n;
}
inline slot_index_t nth_low_index(block_index_t n) {
// return (reinterpret_cast<triple_t*>(block_count_ptr()+1) + n)->index;
return nth_block_ptr(n)->index;
}
inline slot_index_t nth_high_index(block_index_t n) {
triple_t* r = nth_block_ptr(n); // reinterpret_cast<triple_t*>(block_count_ptr()+1) + n;
return r->index + r->copies - 1;
}
inline slot_index_t nth_high_bound(block_index_t n) {
triple_t* r = nth_block_ptr(n); // reinterpret_cast<triple_t*>(block_count_ptr()+1) + n;
return r->index + r->copies;
}
inline TYPE* nth_data_ptr(block_index_t n) {
// printf("data %d: %lld\n", n, (long long) ((reinterpret_cast<triple_t*>(block_count_ptr()+1) + n)->data));
// return &((reinterpret_cast<triple_t*>(block_count_ptr()+1) + n)->data);
return &(nth_block_ptr(n)->data);
}
inline triple_t* last_block_ptr() {
return nth_block_ptr(*block_count_ptr()-1);
}