From 75f857bc262530e761c3d43fe2715a8b5747b80b Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Sun, 8 Jun 2008 20:28:53 +0000 Subject: [PATCH] Added supoort for binary search to page lookups --- stasis/page/compression/compression.h | 2 + stasis/page/compression/for-impl.h | 86 +++++++++++++++++ stasis/page/compression/rle-impl.h | 127 ++++++++++++++++++++++++-- stasis/page/compression/rle.h | 17 ++++ 4 files changed, 223 insertions(+), 9 deletions(-) diff --git a/stasis/page/compression/compression.h b/stasis/page/compression/compression.h index f485748..ac0eced 100644 --- a/stasis/page/compression/compression.h +++ b/stasis/page/compression/compression.h @@ -10,6 +10,8 @@ namespace rose { +#define COMPRESSION_BINARY_FIND + typedef int8_t record_size_t; typedef uint16_t byte_off_t; typedef uint16_t slot_index_t; diff --git a/stasis/page/compression/for-impl.h b/stasis/page/compression/for-impl.h index 2cd9df5..88e3a3f 100644 --- a/stasis/page/compression/for-impl.h +++ b/stasis/page/compression/for-impl.h @@ -1,6 +1,8 @@ #ifndef _ROSE_COMPRESSION_FOR_IMPL_H__ #define _ROSE_COMPRESSION_FOR_IMPL_H__ +#include + // Copyright 2007 Google Inc. All Rights Reserved. // Author: sears@google.com (Rusty Sears) @@ -82,6 +84,8 @@ For::recordRead(int xid, slot_index_t slot, byte *exceptions, return scratch; } } + +#ifndef COMPRESSION_BINARY_FIND template inline std::pair* For::recordFind(int xid, slot_index_t start, slot_index_t stop, @@ -126,5 +130,87 @@ For::recordFind(int xid, slot_index_t start, slot_index_t stop, } return ret; } +#else // COMPRESSION_BINARY_FIND +template +inline std::pair* +For::recordFind(int xid, slot_index_t low, slot_index_t high, + byte *exceptions, TYPE value, + std::pair& scratch) { + delta_t delta = value - *base_ptr(); + int64_t bs_ret; + + //printf("delta = %d\n", delta); + if(delta >= 0) { + + { + int64_t bs_low = low; + int64_t bs_high = high; + while(nth_delta_ptr(bs_low) < 0 && bs_low < bs_high) { bs_low++; } + while(nth_delta_ptr(bs_high) < 0 && bs_low < bs_high) { bs_high--; } + + DEBUG("low: %d->%ld, high %d->%ld\n", low, bs_low, high, bs_high); + + delta_t bs_value = delta; + rose_binary_search(nth_delta_ptr); + } + if(bs_ret == -1) { printf("not found by for\n"); return 0; } + + while(scratch.first != low) { + if(*nth_delta_ptr(scratch.first-1) == delta) { + scratch.first --; + } else { + break; + } + } + while(scratch.second != high) { + if(*nth_delta_ptr(scratch.second+1) == delta) { + scratch.second++; + } else { + break; + } + } + DEBUG("front: %ld->%d, back: %ld\n",bs_ret, scratch.first, scratch.second); + return &scratch; + } else { // @todo Optimize lookup of exceptional data. (It can be binary searched too...) + std::pair* ret = 0; + slot_index_t i; + for(i = low; i < high; i++) { + delta_t d = *nth_delta_ptr(i); + if(d >= 0) { + if(d == delta) { + scratch.first = i; + scratch.second = high; + ret = &scratch; + i++; + break; + } + } else { + if(value == *(TYPE*)(exceptions + d + PAGE_SIZE - sizeof(TYPE))) { + scratch.first = i; + scratch.second = high; + ret = &scratch; + i++; + break; + } + } + } + for(;i < high; i++) { + delta_t d = *nth_delta_ptr(i); + if(d >= 0) { + if(d != delta) { + scratch.second = i; + break; + } + } else { + if(value != *(TYPE*)(exceptions +d + PAGE_SIZE - sizeof(TYPE))) { + scratch.second = i; + break; + } + } + } + return ret; + } + } +#endif // COMPRESSION_BINARY_FIND } // namespace rose #endif // _ROSE_COMPRESSION_FOR_IMPL_H__ diff --git a/stasis/page/compression/rle-impl.h b/stasis/page/compression/rle-impl.h index 1c050bc..fa0ebc7 100644 --- a/stasis/page/compression/rle-impl.h +++ b/stasis/page/compression/rle-impl.h @@ -25,6 +25,8 @@ Rle::append(int xid, const TYPE dat, int *free_bytes) { int64_t ret; + DEBUG("\trle got %lld\n", (long long)dat); + ret = last_block_ptr()->index + last_block_ptr()->copies; if(ret == MAX_INDEX) { @@ -58,7 +60,6 @@ inline TYPE * Rle::recordRead(int xid, slot_index_t slot, byte* exceptions, TYPE * scratch) { block_index_t n = nth_block_ptr(last_)->index <= slot ? last_ : 0; - // while (n < *block_count_ptr()) { do { triple_t * t = nth_block_ptr(n); if (t->index <= slot && t->index + t->copies > slot) { @@ -70,6 +71,7 @@ Rle::recordRead(int xid, slot_index_t slot, byte* exceptions, } while (n < *block_count_ptr()); return 0; } +#ifndef COMPRESSION_BINARY_FIND template inline std::pair* Rle::recordFind(int xid, slot_index_t start, slot_index_t stop, @@ -80,13 +82,15 @@ Rle::recordFind(int xid, slot_index_t start, slot_index_t stop, do { triple_t * t = nth_block_ptr(n); if(t->data >= value) { - scratch.first = t->index; - do { - scratch.second = t->index + t->copies; - n++; - t = nth_block_ptr(n); - } while(n < *block_count_ptr() && t->data == value); - ret = &scratch; + if(t->data == value) { + scratch.first = t->index; + do { + scratch.second = t->index + t->copies; + n++; + t = nth_block_ptr(n); + } while(n < *block_count_ptr() && t->data == value); + ret = &scratch; + } break; } n++; @@ -98,6 +102,111 @@ Rle::recordFind(int xid, slot_index_t start, slot_index_t stop, } return ret; } -} // namespace rose +#else // COMPRESSION_BINARY_FIND +template +inline std::pair* +Rle::recordFind(int xid, slot_index_t start, slot_index_t stop, + byte *exceptions, TYPE value, + std::pair& scratch) { + DEBUG("\n\ncalled with start = %lld stop = %lld, val = %lld\n", (long long)start, (long long)stop, (long long)value); + DEBUG("1th data: %lld\n", (long long)nth_block_ptr(1)->data); + + + block_index_t low = 0; + block_index_t high = *block_count_ptr(); + int64_t bs_ret; + + int64_t bs_low = low; + int64_t bs_high = high; + + DEBUG("low: %d->%ld, high %d->%ld\n", low, bs_low, high, bs_high); + + TYPE bs_value = start; + rose_binary_search_greatest_lower(nth_high_index); + if(bs_ret != -1) { + assert(nth_low_index(bs_ret) <= start); + assert(nth_high_bound(bs_ret) > start); + // if(bs_ret < (*block_count_ptr()-1)) { + // assert(nth_low_index(bs_ret+1) > start); + // } + low = bs_ret; + } // else start not found because page starts after it (leave at block zero) + + DEBUG("real low is %d\n", low); + bs_low = low; + bs_high = (*block_count_ptr()); + bs_value = stop; + + DEBUG("bs_low = %lld bs_high = %lld bs_val = %lld\n", (long long)bs_low, (long long)bs_high, (long long)bs_value); + rose_binary_search_greatest_lower(nth_high_bound); + + if(bs_ret == -1) { + high = (*block_count_ptr()); //-1; + } else { + + DEBUG("bs_ret = %lld\n", (long long)bs_ret); + // bs_high might not contain the index we're looking for + // high = bs_ret + XXX; /// where XXX is 1 sometimes, and zero others. + // if(nth_high_index(bs_ret+1) >= stop) { high = bs_ret+1; } else { high = bs_ret + 2; } + + high = bs_ret; + // high contains the top of the range + assert(nth_high_bound(high) >= stop); + assert(nth_low_index(high) < stop); + // set high to the first block that we don't need to consider. + high++; + // if(high > 0) { + // assert(nth_low_index(high-1) <= stop); + // } + + } + + DEBUG("real high is %lld (last block slots: %lld - %lld) val = %lld\n", (long long int)high, (long long int)nth_low_index(high-1), (long long int)nth_high_index(high-1), (long long int)bs_value); + + // now low is the smallest block we need consider; high the highest. + + bs_low = low; + bs_high = high; + bs_value = value; + + DEBUG("B low: %lld->%lld, high %lld->%lld val = %lld low_idx = %lld low_end = %lld low val = %lld high_idx = %lld high_end = %lld high_val = %lld\n", (long long int)low, (long long int)bs_low, (long long int)high,(long long int) bs_high,(long long int) value,(long long int) nth_low_index(bs_low),(long long int) nth_high_index(bs_low), (long long int)*nth_data_ptr(bs_low),(long long int) nth_low_index(bs_high),(long long int) nth_high_index(bs_high), (long long int)*nth_data_ptr(bs_high)); + DEBUG("bs val =%lld bs low = %lld bs high = %lld\n", (long long) bs_value, (long long) bs_low, (long long) bs_high); + + rose_binary_search(nth_data_ptr); + + DEBUG("bs ret = %lld\n", (long long int)bs_ret); + + + if(bs_ret == -1) { + printf("[ %lld %lld %lld ]\n", (long long int)*(TYPE*)nth_data_ptr(0), (long long int)*(TYPE*)nth_data_ptr(1), (long long int)*(TYPE*)nth_data_ptr(2)); + printf("not found by rle: start = %d stop = %d val = %lld count = %d zero copies %d low = %lld high = %lld\n", start, stop, (long long)value, *block_count_ptr(), nth_block_ptr(0)->copies, (long long)low, (long long)high); + abort(); + return 0; + } else { + block_index_t firstBlock = bs_ret; + block_index_t lastBlock = bs_ret; // lastBlock is the offset of the last block in range + while(firstBlock > low && *nth_data_ptr(firstBlock-1) == value) { firstBlock--; } + // firstblock is >= 0 and it contains items of the desired value + + while(lastBlock < high && *nth_data_ptr(lastBlock) == value) { lastBlock++; } + // lastblock == block_count_ptr or it contains items of the wrong value + lastBlock--; + // lastblock valid, and it contains items of the correct value + + DEBUG("looking at blocks %d - %d\n", firstBlock, lastBlock); + + scratch.first = nth_block_ptr(firstBlock)->index; + // set second to one past last valid index. + scratch.second = nth_block_ptr(lastBlock)->index + nth_block_ptr(lastBlock)->copies; + if(scratch.first < start) { scratch.first = start; } + if(scratch.second > stop) { scratch.second = stop; } + + DEBUG("startstop = %d,%d scratch = %d,%d\n",start, stop, scratch.first, scratch.second); + + return &scratch; + } + } +#endif // COMPRESSION_BINARY_FIND +} // namespace rose #endif // _ROSE_COMPRESSION_RLE_IMPL_H__ diff --git a/stasis/page/compression/rle.h b/stasis/page/compression/rle.h index e0e94f1..9276f42 100644 --- a/stasis/page/compression/rle.h +++ b/stasis/page/compression/rle.h @@ -101,6 +101,23 @@ class Rle { inline triple_t* nth_block_ptr(block_index_t n) { return reinterpret_cast(block_count_ptr()+1) + n; } + inline slot_index_t nth_low_index(block_index_t n) { + // return (reinterpret_cast(block_count_ptr()+1) + n)->index; + return nth_block_ptr(n)->index; + } + inline slot_index_t nth_high_index(block_index_t n) { + triple_t* r = nth_block_ptr(n); // reinterpret_cast(block_count_ptr()+1) + n; + return r->index + r->copies - 1; + } + inline slot_index_t nth_high_bound(block_index_t n) { + triple_t* r = nth_block_ptr(n); // reinterpret_cast(block_count_ptr()+1) + n; + return r->index + r->copies; + } + inline TYPE* nth_data_ptr(block_index_t n) { + // printf("data %d: %lld\n", n, (long long) ((reinterpret_cast(block_count_ptr()+1) + n)->data)); + // return &((reinterpret_cast(block_count_ptr()+1) + n)->data); + return &(nth_block_ptr(n)->data); + } inline triple_t* last_block_ptr() { return nth_block_ptr(*block_count_ptr()-1); }