add support for early tombstone dropping, extend datatuple format to support timestamps
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2451 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
9672bc4f48
commit
5be84f59e8
18 changed files with 142 additions and 96 deletions
|
@ -339,7 +339,7 @@ bool DataPage<TUPLE>::recordRead(const typename TUPLE::key_t key, size_t keySize
|
||||||
int match = -1;
|
int match = -1;
|
||||||
while((*buf=itr.getnext()) != 0)
|
while((*buf=itr.getnext()) != 0)
|
||||||
{
|
{
|
||||||
match = TUPLE::compare((*buf)->key(), (*buf)->keylen(), key, keySize);
|
match = TUPLE::compare((*buf)->strippedkey(), (*buf)->strippedkeylen(), key, keySize);
|
||||||
|
|
||||||
if(match<0) //keep searching
|
if(match<0) //keep searching
|
||||||
{
|
{
|
||||||
|
|
|
@ -22,7 +22,7 @@ public:
|
||||||
if(key) {
|
if(key) {
|
||||||
len_t old_off = read_offset_;
|
len_t old_off = read_offset_;
|
||||||
TUPLE * t = getnext();
|
TUPLE * t = getnext();
|
||||||
while(t && TUPLE::compare(key->key(), key->keylen(), t->key(), t->keylen()) > 0) {
|
while(t && TUPLE::compare(key->strippedkey(), key->strippedkeylen(), t->strippedkey(), t->strippedkeylen()) > 0) {
|
||||||
TUPLE::freetuple(t);
|
TUPLE::freetuple(t);
|
||||||
old_off = read_offset_;
|
old_off = read_offset_;
|
||||||
t = getnext();
|
t = getnext();
|
||||||
|
|
57
datatuple.h
57
datatuple.h
|
@ -20,13 +20,23 @@ private:
|
||||||
byte* data_; // aliases key(). data_ - 1 should be the \0 terminating key().
|
byte* data_; // aliases key(). data_ - 1 should be the \0 terminating key().
|
||||||
|
|
||||||
datatuple* sanity_check() {
|
datatuple* sanity_check() {
|
||||||
assert(keylen() < 3000);
|
assert(rawkeylen() < 3000);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
public:
|
public:
|
||||||
|
|
||||||
inline len_t keylen() const {
|
inline len_t rawkeylen() const {
|
||||||
return data_ - key();
|
return data_ - rawkey();
|
||||||
|
}
|
||||||
|
inline len_t strippedkeylen() const {
|
||||||
|
return rawkeylen();
|
||||||
|
const size_t ts_sz = sizeof(uint64_t)+1;
|
||||||
|
size_t al = rawkeylen();
|
||||||
|
if(al <= ts_sz || rawkey()[al-ts_sz]!=0) {
|
||||||
|
return al;
|
||||||
|
} else {
|
||||||
|
return al - ts_sz;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
inline len_t datalen() const {
|
inline len_t datalen() const {
|
||||||
return (datalen_ == DELETE) ? 0 : datalen_;
|
return (datalen_ == DELETE) ? 0 : datalen_;
|
||||||
|
@ -34,22 +44,25 @@ public:
|
||||||
|
|
||||||
//returns the length of the byte array representation
|
//returns the length of the byte array representation
|
||||||
len_t byte_length() const {
|
len_t byte_length() const {
|
||||||
return sizeof(len_t) + sizeof(len_t) + keylen() + datalen();
|
return sizeof(len_t) + sizeof(len_t) + rawkeylen() + datalen();
|
||||||
}
|
}
|
||||||
static len_t length_from_header(len_t keylen, len_t datalen) {
|
static len_t length_from_header(len_t keylen, len_t datalen) {
|
||||||
return keylen + ((datalen == DELETE) ? 0 : datalen);
|
return keylen + ((datalen == DELETE) ? 0 : datalen);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline key_t key() const {
|
inline key_t rawkey() const {
|
||||||
return (key_t)(this+1);
|
return (key_t)(this+1);
|
||||||
}
|
}
|
||||||
inline data_t data() const {
|
inline data_t data() const {
|
||||||
return data_;
|
return data_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline key_t strippedkey() const {
|
||||||
|
return (key_t)(this+1);
|
||||||
|
}
|
||||||
//this is used by the stl set
|
//this is used by the stl set
|
||||||
bool operator() (const datatuple* lhs, const datatuple* rhs) const {
|
bool operator() (const datatuple* lhs, const datatuple* rhs) const {
|
||||||
return compare(lhs->key(), lhs->keylen(), rhs->key(), rhs->keylen()) < 0; //strcmp((char*)lhs.key(),(char*)rhs.key()) < 0;
|
return compare(lhs->strippedkey(), lhs->strippedkeylen(), rhs->strippedkey(), rhs->strippedkeylen()) < 0; //strcmp((char*)lhs.key(),(char*)rhs.key()) < 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -91,15 +104,15 @@ public:
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t timestamp() {
|
int64_t timestamp() {
|
||||||
const size_t ts_sz = sizeof(uint64_t)+1;
|
const size_t ts_sz = sizeof(uint64_t)+1;
|
||||||
size_t al = keylen();
|
size_t al = rawkeylen();
|
||||||
if(al <= ts_sz || key()[al-ts_sz]!=0) { return (uint64_t)-1; }
|
if(al <= ts_sz || rawkey()[al-ts_sz]!=0) { return (uint64_t)-1; }
|
||||||
return *(uint64_t*)(key()+1+al-ts_sz);
|
return (int64_t)*(uint64_t*)(rawkey()+1+al-ts_sz);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int compare_obj(const datatuple * a, const datatuple* b) {
|
static int compare_obj(const datatuple * a, const datatuple* b) {
|
||||||
return compare(a->key(), a->keylen(), b->key(), b->keylen());
|
return compare(a->strippedkey(), a->strippedkeylen(), b->strippedkey(), b->strippedkeylen());
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void setDelete() {
|
inline void setDelete() {
|
||||||
|
@ -123,7 +136,7 @@ public:
|
||||||
|
|
||||||
//copy the tuple. does a deep copy of the contents.
|
//copy the tuple. does a deep copy of the contents.
|
||||||
datatuple* create_copy() const {
|
datatuple* create_copy() const {
|
||||||
return create(key(), keylen(), data(), datalen_)->sanity_check();
|
return create(rawkey(), rawkeylen(), data(), datalen_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -132,8 +145,8 @@ public:
|
||||||
}
|
}
|
||||||
static datatuple* create(const void* key, len_t keylen, const void* data, len_t datalen) {
|
static datatuple* create(const void* key, len_t keylen, const void* data, len_t datalen) {
|
||||||
datatuple *ret = (datatuple*)malloc(sizeof(datatuple) + length_from_header(keylen,datalen));
|
datatuple *ret = (datatuple*)malloc(sizeof(datatuple) + length_from_header(keylen,datalen));
|
||||||
memcpy(ret->key(), key, keylen);
|
memcpy(ret->rawkey(), key, keylen);
|
||||||
ret->data_ = ret->key() + keylen; // need to set this even if delete, since it encodes the key length.
|
ret->data_ = ret->rawkey() + keylen; // need to set this even if delete, since it encodes the key length.
|
||||||
if(datalen != DELETE) {
|
if(datalen != DELETE) {
|
||||||
memcpy(ret->data_, data, datalen);
|
memcpy(ret->data_, data, datalen);
|
||||||
}
|
}
|
||||||
|
@ -144,24 +157,24 @@ public:
|
||||||
//format: key length _ data length _ key _ data
|
//format: key length _ data length _ key _ data
|
||||||
byte * to_bytes() const {
|
byte * to_bytes() const {
|
||||||
byte *ret = (byte*)malloc(byte_length());
|
byte *ret = (byte*)malloc(byte_length());
|
||||||
((len_t*)ret)[0] = keylen();
|
((len_t*)ret)[0] = rawkeylen();
|
||||||
((len_t*)ret)[1] = datalen_;
|
((len_t*)ret)[1] = datalen_;
|
||||||
memcpy(((len_t*)ret)+2, key(), length_from_header(keylen(), datalen_));
|
memcpy(((len_t*)ret)+2, rawkey(), length_from_header(rawkeylen(), datalen_));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
const byte* get_bytes(len_t *keylen, len_t *datalen) const {
|
const byte* get_bytes(len_t *keylen, len_t *datalen) const {
|
||||||
*keylen = this->keylen();
|
*keylen = this->rawkeylen();
|
||||||
*datalen = datalen_;
|
*datalen = datalen_;
|
||||||
return key();
|
return rawkey();
|
||||||
}
|
}
|
||||||
|
|
||||||
//format of buf: key _ data. The caller needs to 'peel' off key length and data length for this call.
|
//format of buf: key _ data. The caller needs to 'peel' off key length and data length for this call.
|
||||||
static datatuple* from_bytes(len_t keylen, len_t datalen, byte* buf) {
|
static datatuple* from_bytes(len_t keylen, len_t datalen, byte* buf) {
|
||||||
datatuple *dt = (datatuple*) malloc(sizeof(datatuple) + length_from_header(keylen,datalen));
|
datatuple *dt = (datatuple*) malloc(sizeof(datatuple) + length_from_header(keylen,datalen));
|
||||||
dt->datalen_ = datalen;
|
dt->datalen_ = datalen;
|
||||||
memcpy(dt->key(),buf, length_from_header(keylen,datalen));
|
memcpy(dt->rawkey(),buf, length_from_header(keylen,datalen));
|
||||||
dt->data_ = dt->key() + keylen;
|
dt->data_ = dt->rawkey() + keylen;
|
||||||
return dt->sanity_check();
|
return dt->sanity_check();
|
||||||
}
|
}
|
||||||
static datatuple* from_bytes(byte* buf) {
|
static datatuple* from_bytes(byte* buf) {
|
||||||
|
@ -169,8 +182,8 @@ public:
|
||||||
len_t buflen = length_from_header(keylen, ((len_t*)buf)[1]);
|
len_t buflen = length_from_header(keylen, ((len_t*)buf)[1]);
|
||||||
datatuple *dt = (datatuple*) malloc(sizeof(datatuple) + buflen);
|
datatuple *dt = (datatuple*) malloc(sizeof(datatuple) + buflen);
|
||||||
dt->datalen_ = ((len_t*)buf)[1];
|
dt->datalen_ = ((len_t*)buf)[1];
|
||||||
memcpy(dt->key(),((len_t*)buf)+2,buflen);
|
memcpy(dt->rawkey(),((len_t*)buf)+2,buflen);
|
||||||
dt->data_ = dt->key() + keylen;
|
dt->data_ = dt->rawkey() + keylen;
|
||||||
|
|
||||||
return dt->sanity_check();
|
return dt->sanity_check();
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ void diskTreeComponent::writes_done() {
|
||||||
int diskTreeComponent::insertTuple(int xid, datatuple *t)
|
int diskTreeComponent::insertTuple(int xid, datatuple *t)
|
||||||
{
|
{
|
||||||
if(bloom_filter) {
|
if(bloom_filter) {
|
||||||
bloom_filter_insert(bloom_filter, (const char*)t->key(), t->keylen());
|
bloom_filter_insert(bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen());
|
||||||
}
|
}
|
||||||
int ret = 0; // no error.
|
int ret = 0; // no error.
|
||||||
if(dp==0) {
|
if(dp==0) {
|
||||||
|
@ -106,8 +106,8 @@ DataPage<datatuple>* diskTreeComponent::insertDataPage(int xid, datatuple *tuple
|
||||||
|
|
||||||
|
|
||||||
ltree->appendPage(xid,
|
ltree->appendPage(xid,
|
||||||
tuple->key(),
|
tuple->strippedkey(),
|
||||||
tuple->keylen(),
|
tuple->strippedkeylen(),
|
||||||
dp->get_start_pid()
|
dp->get_start_pid()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -896,7 +896,7 @@ void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * k
|
||||||
lsmIterator_ = NULL;
|
lsmIterator_ = NULL;
|
||||||
} else {
|
} else {
|
||||||
if(key1) {
|
if(key1) {
|
||||||
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_, key1->key(), key1->keylen());
|
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_, key1->strippedkey(), key1->strippedkeylen());
|
||||||
} else {
|
} else {
|
||||||
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_);
|
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_);
|
||||||
}
|
}
|
||||||
|
|
11
logstore.cpp
11
logstore.cpp
|
@ -43,6 +43,8 @@ logtable<TUPLE>::logtable(int log_mode, pageid_t max_c0_size, pageid_t internal_
|
||||||
this->shutting_down_ = false;
|
this->shutting_down_ = false;
|
||||||
c0_flushing = false;
|
c0_flushing = false;
|
||||||
c1_flushing = false;
|
c1_flushing = false;
|
||||||
|
current_timestamp = 0;
|
||||||
|
expiry = 0;
|
||||||
this->merge_mgr = 0;
|
this->merge_mgr = 0;
|
||||||
tmerger = new tuplemerger(&replace_merger);
|
tmerger = new tuplemerger(&replace_merger);
|
||||||
|
|
||||||
|
@ -110,10 +112,10 @@ recordid logtable<TUPLE>::allocTable(int xid)
|
||||||
table_rec = Talloc(xid, sizeof(tbl_header));
|
table_rec = Talloc(xid, sizeof(tbl_header));
|
||||||
mergeStats * stats = 0;
|
mergeStats * stats = 0;
|
||||||
//create the big tree
|
//create the big tree
|
||||||
tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats);
|
tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats, 10);
|
||||||
|
|
||||||
//create the small tree
|
//create the small tree
|
||||||
tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats);
|
tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats, 10);
|
||||||
|
|
||||||
merge_mgr = new mergeManager(this);
|
merge_mgr = new mergeManager(this);
|
||||||
merge_mgr->set_c0_size(max_c0_size);
|
merge_mgr->set_c0_size(max_c0_size);
|
||||||
|
@ -544,10 +546,9 @@ datatuple * logtable<TUPLE>::insertTupleHelper(datatuple *tuple)
|
||||||
datatuple *new_t = tmerger->merge(pre_t, tuple);
|
datatuple *new_t = tmerger->merge(pre_t, tuple);
|
||||||
merge_mgr->get_merge_stats(0)->merged_tuples(new_t, tuple, pre_t);
|
merge_mgr->get_merge_stats(0)->merged_tuples(new_t, tuple, pre_t);
|
||||||
t = new_t;
|
t = new_t;
|
||||||
|
|
||||||
tree_c0->erase(pre_t); //remove the previous tuple
|
tree_c0->erase(pre_t); //remove the previous tuple
|
||||||
|
|
||||||
tree_c0->insert(new_t); //insert the new tuple
|
tree_c0->insert(new_t); //insert the new tuple
|
||||||
|
|
||||||
}
|
}
|
||||||
else //no tuple with same key exists in mem-tree
|
else //no tuple with same key exists in mem-tree
|
||||||
{
|
{
|
||||||
|
@ -626,7 +627,7 @@ bool logtable<TUPLE>::testAndSetTuple(datatuple *tuple, datatuple *tuple2)
|
||||||
static pthread_mutex_t test_and_set_mut = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t test_and_set_mut = PTHREAD_MUTEX_INITIALIZER;
|
||||||
pthread_mutex_lock(&test_and_set_mut);
|
pthread_mutex_lock(&test_and_set_mut);
|
||||||
|
|
||||||
datatuple * exists = findTuple_first(-1, tuple2 ? tuple2->key() : tuple->key(), tuple2 ? tuple2->keylen() : tuple->keylen());
|
datatuple * exists = findTuple_first(-1, tuple2 ? tuple2->rawkey() : tuple->rawkey(), tuple2 ? tuple2->rawkeylen() : tuple->rawkeylen());
|
||||||
|
|
||||||
if(!tuple2 || tuple2->isDelete()) {
|
if(!tuple2 || tuple2->isDelete()) {
|
||||||
if(!exists || exists->isDelete()) {
|
if(!exists || exists->isDelete()) {
|
||||||
|
|
34
logstore.h
34
logstore.h
|
@ -160,6 +160,9 @@ public:
|
||||||
bool c0_flushing;
|
bool c0_flushing;
|
||||||
bool c1_flushing; // this needs to be set to true at shutdown, or when the c0-c1 merger is waiting for c1-c2 to finish its merge
|
bool c1_flushing; // this needs to be set to true at shutdown, or when the c0-c1 merger is waiting for c1-c2 to finish its merge
|
||||||
|
|
||||||
|
lsn_t current_timestamp;
|
||||||
|
lsn_t expiry;
|
||||||
|
|
||||||
//DATA PAGE SETTINGS
|
//DATA PAGE SETTINGS
|
||||||
pageid_t internal_region_size; // in number of pages
|
pageid_t internal_region_size; // in number of pages
|
||||||
pageid_t datapage_region_size; // "
|
pageid_t datapage_region_size; // "
|
||||||
|
@ -172,6 +175,33 @@ private:
|
||||||
public:
|
public:
|
||||||
bool shutting_down_;
|
bool shutting_down_;
|
||||||
|
|
||||||
|
bool mightBeOnDisk(datatuple * t) {
|
||||||
|
if(tree_c1) {
|
||||||
|
if(!tree_c1->bloom_filter) { printf("no c1 bloom filter\n"); return true; }
|
||||||
|
if(bloom_filter_lookup(tree_c1->bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen())) { printf("in c1\n"); return true; }
|
||||||
|
}
|
||||||
|
if(tree_c1_prime) {
|
||||||
|
if(!tree_c1_prime->bloom_filter) { printf("no c1' bloom filter\n"); return true; }
|
||||||
|
if(bloom_filter_lookup(tree_c1_prime->bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen())) { printf("in c1'\n"); return true; }
|
||||||
|
}
|
||||||
|
return mightBeAfterMemMerge(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool mightBeAfterMemMerge(datatuple * t) {
|
||||||
|
|
||||||
|
if(tree_c1_mergeable) {
|
||||||
|
if(!tree_c1_mergeable->bloom_filter) { printf("no c1m bloom filter\n"); return true; }
|
||||||
|
if(bloom_filter_lookup(tree_c1_mergeable->bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen())) { printf("in c1m'\n");return true; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if(tree_c2) {
|
||||||
|
if(!tree_c2->bloom_filter) { printf("no c2 bloom filter\n"); return true; }
|
||||||
|
if(bloom_filter_lookup(tree_c2->bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen())) { printf("in c2\n");return true; }
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
template<class ITRA, class ITRN>
|
template<class ITRA, class ITRN>
|
||||||
class mergeManyIterator {
|
class mergeManyIterator {
|
||||||
public:
|
public:
|
||||||
|
@ -325,7 +355,7 @@ public:
|
||||||
revalidate();
|
revalidate();
|
||||||
TUPLE * tmp = merge_it_->next_callerFrees();
|
TUPLE * tmp = merge_it_->next_callerFrees();
|
||||||
if(last_returned && tmp) {
|
if(last_returned && tmp) {
|
||||||
assert(TUPLE::compare(last_returned->key(), last_returned->keylen(), tmp->key(), tmp->keylen()) < 0);
|
assert(TUPLE::compare(last_returned->strippedkey(), last_returned->strippedkeylen(), tmp->strippedkey(), tmp->strippedkeylen()) < 0);
|
||||||
TUPLE::freetuple(last_returned);
|
TUPLE::freetuple(last_returned);
|
||||||
}
|
}
|
||||||
last_returned = tmp;
|
last_returned = tmp;
|
||||||
|
@ -433,7 +463,7 @@ public:
|
||||||
merge_it_ = new merge_it_t(inner_merge_it, disk_it, 4, NULL, TUPLE::compare_obj); // XXX Hardcodes comparator, and does not handle merges
|
merge_it_ = new merge_it_t(inner_merge_it, disk_it, 4, NULL, TUPLE::compare_obj); // XXX Hardcodes comparator, and does not handle merges
|
||||||
if(last_returned) {
|
if(last_returned) {
|
||||||
TUPLE * junk = merge_it_->peek();
|
TUPLE * junk = merge_it_->peek();
|
||||||
if(junk && !TUPLE::compare(junk->key(), junk->keylen(), last_returned->key(), last_returned->keylen())) {
|
if(junk && !TUPLE::compare(junk->strippedkey(), junk->strippedkeylen(), last_returned->strippedkey(), last_returned->strippedkeylen())) {
|
||||||
// we already returned junk
|
// we already returned junk
|
||||||
TUPLE::freetuple(merge_it_->next_callerFrees());
|
TUPLE::freetuple(merge_it_->next_callerFrees());
|
||||||
}
|
}
|
||||||
|
|
25
merger.cpp
25
merger.cpp
|
@ -26,6 +26,17 @@ void merge_scheduler::start() {
|
||||||
pthread_create(&disk_merge_thread_, 0, diskMerge_thr, this);
|
pthread_create(&disk_merge_thread_, 0, diskMerge_thr, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool insert_filter(logtable<datatuple> * ltable, datatuple * t, bool dropDeletes) {
|
||||||
|
if(t->isDelete()) {
|
||||||
|
if(dropDeletes || ! ltable->mightBeAfterMemMerge(t)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(!ltable->expiry) { return true; }
|
||||||
|
if(t->timestamp() > ltable->current_timestamp + ltable->expiry) { return false; }
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
template <class ITA, class ITB>
|
template <class ITA, class ITB>
|
||||||
void merge_iterators(int xid, diskTreeComponent * forceMe,
|
void merge_iterators(int xid, diskTreeComponent * forceMe,
|
||||||
ITA *itrA,
|
ITA *itrA,
|
||||||
|
@ -189,7 +200,7 @@ void * merge_scheduler::memMergeThread() {
|
||||||
stats->handed_off_tree();
|
stats->handed_off_tree();
|
||||||
|
|
||||||
// 8: c1 = new empty.
|
// 8: c1 = new empty.
|
||||||
ltable_->set_tree_c1(new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats));
|
ltable_->set_tree_c1(new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, 10));
|
||||||
|
|
||||||
pthread_cond_signal(<able_->c1_ready);
|
pthread_cond_signal(<able_->c1_ready);
|
||||||
ltable_->update_persistent_header(xid);
|
ltable_->update_persistent_header(xid);
|
||||||
|
@ -388,10 +399,10 @@ void merge_iterators(int xid,
|
||||||
DEBUG("tuple\t%lld: keylen %d datalen %d\n",
|
DEBUG("tuple\t%lld: keylen %d datalen %d\n",
|
||||||
ntuples, *(t2->keylen),*(t2->datalen) );
|
ntuples, *(t2->keylen),*(t2->datalen) );
|
||||||
|
|
||||||
while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2
|
while(t1 != 0 && datatuple::compare(t1->rawkey(), t1->rawkeylen(), t2->rawkey(), t2->rawkeylen()) < 0) // t1 is less than t2
|
||||||
{
|
{
|
||||||
//insert t1
|
//insert t1
|
||||||
if((!t1->isDelete()) || !dropDeletes) {
|
if(insert_filter(ltable, t1, dropDeletes)) {
|
||||||
scratch_tree->insertTuple(xid, t1);
|
scratch_tree->insertTuple(xid, t1);
|
||||||
i+=t1->byte_length();
|
i+=t1->byte_length();
|
||||||
ltable->merge_mgr->wrote_tuple(stats->merge_level, t1);
|
ltable->merge_mgr->wrote_tuple(stats->merge_level, t1);
|
||||||
|
@ -405,13 +416,13 @@ void merge_iterators(int xid,
|
||||||
periodically_force(xid, &i, forceMe, log);
|
periodically_force(xid, &i, forceMe, log);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0)
|
if(t1 != 0 && datatuple::compare(t1->strippedkey(), t1->strippedkeylen(), t2->strippedkey(), t2->strippedkeylen()) == 0)
|
||||||
{
|
{
|
||||||
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2);
|
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2);
|
||||||
stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right.
|
stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right.
|
||||||
|
|
||||||
//insert merged tuple, drop deletes
|
//insert merged tuple, drop deletes
|
||||||
if((!mtuple->isDelete()) || !dropDeletes) {
|
if(insert_filter(ltable, mtuple, dropDeletes)) {
|
||||||
scratch_tree->insertTuple(xid, mtuple);
|
scratch_tree->insertTuple(xid, mtuple);
|
||||||
i+=mtuple->byte_length();
|
i+=mtuple->byte_length();
|
||||||
ltable->merge_mgr->wrote_tuple(stats->merge_level, mtuple);
|
ltable->merge_mgr->wrote_tuple(stats->merge_level, mtuple);
|
||||||
|
@ -425,7 +436,7 @@ void merge_iterators(int xid,
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
//insert t2
|
//insert t2
|
||||||
if((!t2->isDelete()) || !dropDeletes) {
|
if(insert_filter(ltable, t2, dropDeletes)) {
|
||||||
scratch_tree->insertTuple(xid, t2);
|
scratch_tree->insertTuple(xid, t2);
|
||||||
i+=t2->byte_length();
|
i+=t2->byte_length();
|
||||||
ltable->merge_mgr->wrote_tuple(stats->merge_level, t2);
|
ltable->merge_mgr->wrote_tuple(stats->merge_level, t2);
|
||||||
|
@ -448,7 +459,7 @@ void merge_iterators(int xid,
|
||||||
}
|
}
|
||||||
|
|
||||||
while(t1 != 0) {// t2 is empty, but t1 still has stuff in it.
|
while(t1 != 0) {// t2 is empty, but t1 still has stuff in it.
|
||||||
if((!t1->isDelete()) || !dropDeletes) {
|
if(insert_filter(ltable, t1, dropDeletes)) {
|
||||||
scratch_tree->insertTuple(xid, t1);
|
scratch_tree->insertTuple(xid, t1);
|
||||||
ltable->merge_mgr->wrote_tuple(stats->merge_level, t1);
|
ltable->merge_mgr->wrote_tuple(stats->merge_level, t1);
|
||||||
i += t1->byte_length();
|
i += t1->byte_length();
|
||||||
|
|
|
@ -48,7 +48,7 @@ inline int requestDispatch<HANDLE>::op_bulk_insert(logtable<datatuple> *ltable,
|
||||||
template<class HANDLE>
|
template<class HANDLE>
|
||||||
inline int requestDispatch<HANDLE>::op_find(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) {
|
inline int requestDispatch<HANDLE>::op_find(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) {
|
||||||
//find the tuple
|
//find the tuple
|
||||||
datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen());
|
datatuple *dt = ltable->findTuple_first(-1, tuple->strippedkey(), tuple->strippedkeylen());
|
||||||
|
|
||||||
#ifdef STATS_ENABLED
|
#ifdef STATS_ENABLED
|
||||||
|
|
||||||
|
@ -403,12 +403,12 @@ inline int requestDispatch<HANDLE>::op_dbg_drop_database(logtable<datatuple> * l
|
||||||
ltable->insertTuple(del);
|
ltable->insertTuple(del);
|
||||||
n++;
|
n++;
|
||||||
if(!(n % 1000)) {
|
if(!(n % 1000)) {
|
||||||
printf("X %lld %s\n", n, (char*)del->key()); fflush(stdout);
|
printf("X %lld %s\n", n, (char*)del->rawkey()); fflush(stdout);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
n++;
|
n++;
|
||||||
if(!(n % 1000)) {
|
if(!(n % 1000)) {
|
||||||
printf("? %lld %s\n", n, (char*)del->key()); fflush(stdout);
|
printf("? %lld %s\n", n, (char*)del->rawkey()); fflush(stdout);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
datatuple::freetuple(del);
|
datatuple::freetuple(del);
|
||||||
|
@ -423,12 +423,12 @@ inline int requestDispatch<HANDLE>::op_dbg_noop(logtable<datatuple> * ltable, HA
|
||||||
}
|
}
|
||||||
template<class HANDLE>
|
template<class HANDLE>
|
||||||
inline int requestDispatch<HANDLE>::op_dbg_set_log_mode(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) {
|
inline int requestDispatch<HANDLE>::op_dbg_set_log_mode(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) {
|
||||||
if(tuple->keylen() != sizeof(int)) {
|
if(tuple->rawkeylen() != sizeof(int)) {
|
||||||
abort();
|
abort();
|
||||||
return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR);
|
return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR);
|
||||||
} else {
|
} else {
|
||||||
int old_mode = ltable->log_mode;
|
int old_mode = ltable->log_mode;
|
||||||
ltable->log_mode = *(int*)tuple->key();
|
ltable->log_mode = *(int*)tuple->rawkey();
|
||||||
fprintf(stderr, "\n\nChanged log mode from %d to %d\n\n", old_mode, ltable->log_mode);
|
fprintf(stderr, "\n\nChanged log mode from %d to %d\n\n", old_mode, ltable->log_mode);
|
||||||
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
|
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
@ -475,10 +475,10 @@ int requestDispatch<HANDLE>::dispatch_request(network_op_t opcode, datatuple * t
|
||||||
int err = 0;
|
int err = 0;
|
||||||
#if 0
|
#if 0
|
||||||
if(tuple) {
|
if(tuple) {
|
||||||
char * printme = (char*)malloc(tuple->keylen()+1);
|
char * printme = (char*)malloc(tuple->rawkeylen()+1);
|
||||||
memcpy(printme, tuple->key(), tuple->keylen());
|
memcpy(printme, tuple->rawkey(), tuple->rawkeylen());
|
||||||
printme[tuple->keylen()] = 0;
|
printme[tuple->rawkeylen()] = 0;
|
||||||
printf("\nop = %d, key = %s, isdelete = %d\n", opcode, printme, tuple->isDelete());
|
printf("\nop = %d, key = ->%s<-, isdelete = %d\n", opcode, printme, tuple->isDelete());
|
||||||
free(printme);
|
free(printme);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -98,7 +98,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) {
|
||||||
DataPage<datatuple>::iterator it = dp->begin();
|
DataPage<datatuple>::iterator it = dp->begin();
|
||||||
datatuple * dt;
|
datatuple * dt;
|
||||||
while((dt = it.getnext()) != NULL) {
|
while((dt = it.getnext()) != NULL) {
|
||||||
if(!strcmp((char*)dt->key(), key_arr[j].c_str())) {
|
if(!strcmp((char*)dt->rawkey(), key_arr[j].c_str())) {
|
||||||
found = true;
|
found = true;
|
||||||
}
|
}
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
|
@ -209,7 +209,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datatuple *dt=0;
|
datatuple *dt=0;
|
||||||
while( (dt=itr.getnext()) != NULL)
|
while( (dt=itr.getnext()) != NULL)
|
||||||
{
|
{
|
||||||
assert(dt->keylen() == key_arr[tuplenum].length()+1);
|
assert(dt->rawkeylen() == key_arr[tuplenum].length()+1);
|
||||||
assert(dt->datalen() == data_arr[tuplenum].length()+1);
|
assert(dt->datalen() == data_arr[tuplenum].length()+1);
|
||||||
tuplenum++;
|
tuplenum++;
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
|
|
|
@ -86,7 +86,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datatuple *dt=0;
|
datatuple *dt=0;
|
||||||
while( (dt=tree_itr->next_callerFrees()) != NULL)
|
while( (dt=tree_itr->next_callerFrees()) != NULL)
|
||||||
{
|
{
|
||||||
assert(dt->keylen() == key_arr[tuplenum].length()+1);
|
assert(dt->rawkeylen() == key_arr[tuplenum].length()+1);
|
||||||
assert(dt->datalen() == data_arr[tuplenum].length()+1);
|
assert(dt->datalen() == data_arr[tuplenum].length()+1);
|
||||||
tuplenum++;
|
tuplenum++;
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
|
@ -108,7 +108,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datatuple *dt = ltable_c1->findTuple(xid, (const datatuple::key_t) key_arr[ri].c_str(), (size_t)key_arr[ri].length()+1);
|
datatuple *dt = ltable_c1->findTuple(xid, (const datatuple::key_t) key_arr[ri].c_str(), (size_t)key_arr[ri].length()+1);
|
||||||
|
|
||||||
assert(dt!=0);
|
assert(dt!=0);
|
||||||
assert(dt->keylen() == key_arr[ri].length()+1);
|
assert(dt->rawkeylen() == key_arr[ri].length()+1);
|
||||||
assert(dt->datalen() == data_arr[ri].length()+1);
|
assert(dt->datalen() == data_arr[ri].length()+1);
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
dt = 0;
|
dt = 0;
|
||||||
|
|
|
@ -126,7 +126,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
//if(dt!=0)
|
//if(dt!=0)
|
||||||
{
|
{
|
||||||
found_tuples++;
|
found_tuples++;
|
||||||
assert(dt->keylen() == (*key_arr)[ri].length()+1);
|
assert(dt->rawkeylen() == (*key_arr)[ri].length()+1);
|
||||||
assert(dt->datalen() == (*data_arr)[ri].length()+1);
|
assert(dt->datalen() == (*data_arr)[ri].length()+1);
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
}
|
}
|
||||||
|
|
|
@ -213,14 +213,14 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
assert(dt!=0);
|
assert(dt!=0);
|
||||||
assert(!dt->isDelete());
|
assert(!dt->isDelete());
|
||||||
found_tuples++;
|
found_tuples++;
|
||||||
assert(dt->keylen() == (*key_arr)[ri].length()+1);
|
assert(dt->rawkeylen() == (*key_arr)[ri].length()+1);
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if(dt!=0)
|
if(dt!=0)
|
||||||
{
|
{
|
||||||
assert(dt->keylen() == (*key_arr)[ri].length()+1);
|
assert(dt->rawkeylen() == (*key_arr)[ri].length()+1);
|
||||||
assert(dt->isDelete());
|
assert(dt->isDelete());
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datatuple *tuple = *rbitr;
|
datatuple *tuple = *rbitr;
|
||||||
|
|
||||||
found_tuples++;
|
found_tuples++;
|
||||||
assert(tuple->keylen() == key_arr[ri].length()+1);
|
assert(tuple->rawkeylen() == key_arr[ri].length()+1);
|
||||||
assert(tuple->datalen() == data_arr[ri].length()+1);
|
assert(tuple->datalen() == data_arr[ri].length()+1);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -181,7 +181,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
assert(dt!=0);
|
assert(dt!=0);
|
||||||
assert(!dt->isDelete());
|
assert(!dt->isDelete());
|
||||||
found_tuples++;
|
found_tuples++;
|
||||||
assert(dt->keylen() == (*key_arr)[ri].length()+1);
|
assert(dt->rawkeylen() == (*key_arr)[ri].length()+1);
|
||||||
|
|
||||||
//free dt
|
//free dt
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
|
@ -199,8 +199,8 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
while((tup = logstore_client_next_tuple(l))) {
|
while((tup = logstore_client_next_tuple(l))) {
|
||||||
assert(!tup->isDelete());
|
assert(!tup->isDelete());
|
||||||
assert(tup->keylen() == (*key_arr)[i].length()+1);
|
assert(tup->rawkeylen() == (*key_arr)[i].length()+1);
|
||||||
assert(!memcmp(tup->key(), (*key_arr)[i].c_str(), (*key_arr)[i].length()));
|
assert(!memcmp(tup->rawkey(), (*key_arr)[i].c_str(), (*key_arr)[i].length()));
|
||||||
datatuple::freetuple(tup);
|
datatuple::freetuple(tup);
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
assert(dt!=0);
|
assert(dt!=0);
|
||||||
assert(!dt->isDelete());
|
assert(!dt->isDelete());
|
||||||
found_tuples++;
|
found_tuples++;
|
||||||
assert(dt->keylen() == (*key_arr)[ri].length()+1);
|
assert(dt->rawkeylen() == (*key_arr)[ri].length()+1);
|
||||||
|
|
||||||
//free dt
|
//free dt
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
|
@ -184,8 +184,8 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
while((tup = logstore_client_next_tuple(l))) {
|
while((tup = logstore_client_next_tuple(l))) {
|
||||||
assert(!tup->isDelete());
|
assert(!tup->isDelete());
|
||||||
assert(tup->keylen() == (*key_arr)[i].length()+1);
|
assert(tup->rawkeylen() == (*key_arr)[i].length()+1);
|
||||||
assert(!memcmp(tup->key(), (*key_arr)[i].c_str(), (*key_arr)[i].length()));
|
assert(!memcmp(tup->rawkey(), (*key_arr)[i].c_str(), (*key_arr)[i].length()));
|
||||||
datatuple::freetuple(tup);
|
datatuple::freetuple(tup);
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,23 +5,14 @@
|
||||||
// we return deletes here. our caller decides what to do with them.
|
// we return deletes here. our caller decides what to do with them.
|
||||||
datatuple* tuplemerger::merge(const datatuple *t1, const datatuple *t2)
|
datatuple* tuplemerger::merge(const datatuple *t1, const datatuple *t2)
|
||||||
{
|
{
|
||||||
datatuple *t;
|
if(!(t1->isDelete() || t2->isDelete())) {
|
||||||
if(t1->isDelete() && t2->isDelete()) {
|
return (*merge_fp)(t1,t2);
|
||||||
t = t2->create_copy();
|
} else {
|
||||||
} else if(t1->isDelete()) //delete -> t2
|
// if there is at least one tombstone, we return t2 intact.
|
||||||
{
|
// t1 tombstone -> ignore it, and return t2.
|
||||||
t = t2->create_copy();
|
// t2 tombstone -> return a tombstone (like t2).
|
||||||
|
return t2->create_copy();
|
||||||
}
|
}
|
||||||
else if(t2->isDelete())
|
|
||||||
{
|
|
||||||
t = t2->create_copy();
|
|
||||||
}
|
|
||||||
else //neither is a delete
|
|
||||||
{
|
|
||||||
t = (*merge_fp)(t1,t2);
|
|
||||||
}
|
|
||||||
|
|
||||||
return t;
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* appends the data in t2 to data from t1
|
* appends the data in t2 to data from t1
|
||||||
|
@ -32,13 +23,13 @@ datatuple* tuplemerger::merge(const datatuple *t1, const datatuple *t2)
|
||||||
datatuple* append_merger(const datatuple *t1, const datatuple *t2)
|
datatuple* append_merger(const datatuple *t1, const datatuple *t2)
|
||||||
{
|
{
|
||||||
assert(!(t1->isDelete() || t2->isDelete()));
|
assert(!(t1->isDelete() || t2->isDelete()));
|
||||||
len_t keylen = t1->keylen();
|
len_t rawkeylen = t1->rawkeylen();
|
||||||
len_t datalen = t1->datalen() + t2->datalen();
|
len_t datalen = t1->datalen() + t2->datalen();
|
||||||
byte * data = (byte*)malloc(datalen);
|
byte * data = (byte*)malloc(datalen);
|
||||||
memcpy(data, t1->data(), t1->datalen());
|
memcpy(data, t1->data(), t1->datalen());
|
||||||
memcpy(data + t1->datalen(), t2->data(), t2->datalen());
|
memcpy(data + t1->datalen(), t2->data(), t2->datalen());
|
||||||
|
|
||||||
return datatuple::create(t1->key(), keylen, data, datalen);
|
return datatuple::create(t1->rawkey(), rawkeylen, data, datalen);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -37,13 +37,13 @@ int main(int argc, char * argv[]) {
|
||||||
bool first = true;
|
bool first = true;
|
||||||
while(( ret = logstore_client_next_tuple(l) )) {
|
while(( ret = logstore_client_next_tuple(l) )) {
|
||||||
if(first) {
|
if(first) {
|
||||||
assert(ret->keylen() == sizeof(uint64_t));
|
assert(ret->rawkeylen() == sizeof(uint64_t));
|
||||||
uint64_t stride = *(uint64_t*)ret->key();
|
uint64_t stride = *(uint64_t*)ret->rawkey();
|
||||||
printf("Stride: %lld\n", (long long)stride);
|
printf("Stride: %lld\n", (long long)stride);
|
||||||
first = false;
|
first = false;
|
||||||
} else {
|
} else {
|
||||||
assert(ret->key()[ret->keylen()-1] == 0); // check for null terminator.
|
assert(ret->strippedkey()[ret->strippedkeylen()-1] == 0); // check for null terminator.
|
||||||
printf("\t%s\n", (char*)ret->key());
|
printf("\t%s\n", (char*)ret->strippedkey());
|
||||||
}
|
}
|
||||||
datatuple::freetuple(ret);
|
datatuple::freetuple(ret);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,9 @@ int main(int argc, char * argv[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
logstore_client_close(l);
|
logstore_client_close(l);
|
||||||
assert(ret->keylen() == sizeof(uint64_t));
|
assert(ret->rawkeylen() == sizeof(uint64_t));
|
||||||
assert(ret->datalen() == sizeof(uint64_t));
|
assert(ret->datalen() == sizeof(uint64_t));
|
||||||
printf("Tree is %llu MB Store file is %llu MB\n", (unsigned long long)(*(uint64_t*)ret->key()) / (1024*1024), (unsigned long long)(*(uint64_t*)ret->data()) / (1024*1024));
|
printf("Tree is %llu MB Store file is %llu MB\n", (unsigned long long)(*(uint64_t*)ret->rawkey()) / (1024*1024), (unsigned long long)(*(uint64_t*)ret->data()) / (1024*1024));
|
||||||
datatuple::freetuple(ret);
|
datatuple::freetuple(ret);
|
||||||
;
|
;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in a new issue