diff --git a/datapage.cpp b/datapage.cpp index 07a0bd5..53b8488 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -339,7 +339,7 @@ bool DataPage::recordRead(const typename TUPLE::key_t key, size_t keySize int match = -1; while((*buf=itr.getnext()) != 0) { - match = TUPLE::compare((*buf)->key(), (*buf)->keylen(), key, keySize); + match = TUPLE::compare((*buf)->strippedkey(), (*buf)->strippedkeylen(), key, keySize); if(match<0) //keep searching { diff --git a/datapage.h b/datapage.h index 3aa4099..b1fcbe6 100644 --- a/datapage.h +++ b/datapage.h @@ -22,7 +22,7 @@ public: if(key) { len_t old_off = read_offset_; TUPLE * t = getnext(); - while(t && TUPLE::compare(key->key(), key->keylen(), t->key(), t->keylen()) > 0) { + while(t && TUPLE::compare(key->strippedkey(), key->strippedkeylen(), t->strippedkey(), t->strippedkeylen()) > 0) { TUPLE::freetuple(t); old_off = read_offset_; t = getnext(); @@ -39,7 +39,7 @@ public: } public: iterator(DataPage *dp, TUPLE * key=NULL) : read_offset_(0), dp(dp) { - scan_to_key(key); + scan_to_key(key); } void operator=(const iterator &rhs) diff --git a/datatuple.h b/datatuple.h index 17acb30..67e94c2 100644 --- a/datatuple.h +++ b/datatuple.h @@ -20,13 +20,23 @@ private: byte* data_; // aliases key(). data_ - 1 should be the \0 terminating key(). datatuple* sanity_check() { - assert(keylen() < 3000); + assert(rawkeylen() < 3000); return this; } public: - inline len_t keylen() const { - return data_ - key(); + inline len_t rawkeylen() const { + return data_ - rawkey(); + } + inline len_t strippedkeylen() const { + return rawkeylen(); + const size_t ts_sz = sizeof(uint64_t)+1; + size_t al = rawkeylen(); + if(al <= ts_sz || rawkey()[al-ts_sz]!=0) { + return al; + } else { + return al - ts_sz; + } } inline len_t datalen() const { return (datalen_ == DELETE) ? 0 : datalen_; @@ -34,22 +44,25 @@ public: //returns the length of the byte array representation len_t byte_length() const { - return sizeof(len_t) + sizeof(len_t) + keylen() + datalen(); + return sizeof(len_t) + sizeof(len_t) + rawkeylen() + datalen(); } static len_t length_from_header(len_t keylen, len_t datalen) { return keylen + ((datalen == DELETE) ? 0 : datalen); } - inline key_t key() const { + inline key_t rawkey() const { return (key_t)(this+1); } inline data_t data() const { return data_; } + inline key_t strippedkey() const { + return (key_t)(this+1); + } //this is used by the stl set bool operator() (const datatuple* lhs, const datatuple* rhs) const { - return compare(lhs->key(), lhs->keylen(), rhs->key(), rhs->keylen()) < 0; //strcmp((char*)lhs.key(),(char*)rhs.key()) < 0; + return compare(lhs->strippedkey(), lhs->strippedkeylen(), rhs->strippedkey(), rhs->strippedkeylen()) < 0; //strcmp((char*)lhs.key(),(char*)rhs.key()) < 0; } /** @@ -91,15 +104,15 @@ public: return 1; } - uint64_t timestamp() { + int64_t timestamp() { const size_t ts_sz = sizeof(uint64_t)+1; - size_t al = keylen(); - if(al <= ts_sz || key()[al-ts_sz]!=0) { return (uint64_t)-1; } - return *(uint64_t*)(key()+1+al-ts_sz); + size_t al = rawkeylen(); + if(al <= ts_sz || rawkey()[al-ts_sz]!=0) { return (uint64_t)-1; } + return (int64_t)*(uint64_t*)(rawkey()+1+al-ts_sz); } static int compare_obj(const datatuple * a, const datatuple* b) { - return compare(a->key(), a->keylen(), b->key(), b->keylen()); + return compare(a->strippedkey(), a->strippedkeylen(), b->strippedkey(), b->strippedkeylen()); } inline void setDelete() { @@ -123,7 +136,7 @@ public: //copy the tuple. does a deep copy of the contents. datatuple* create_copy() const { - return create(key(), keylen(), data(), datalen_)->sanity_check(); + return create(rawkey(), rawkeylen(), data(), datalen_); } @@ -132,8 +145,8 @@ public: } static datatuple* create(const void* key, len_t keylen, const void* data, len_t datalen) { datatuple *ret = (datatuple*)malloc(sizeof(datatuple) + length_from_header(keylen,datalen)); - memcpy(ret->key(), key, keylen); - ret->data_ = ret->key() + keylen; // need to set this even if delete, since it encodes the key length. + memcpy(ret->rawkey(), key, keylen); + ret->data_ = ret->rawkey() + keylen; // need to set this even if delete, since it encodes the key length. if(datalen != DELETE) { memcpy(ret->data_, data, datalen); } @@ -144,24 +157,24 @@ public: //format: key length _ data length _ key _ data byte * to_bytes() const { byte *ret = (byte*)malloc(byte_length()); - ((len_t*)ret)[0] = keylen(); + ((len_t*)ret)[0] = rawkeylen(); ((len_t*)ret)[1] = datalen_; - memcpy(((len_t*)ret)+2, key(), length_from_header(keylen(), datalen_)); + memcpy(((len_t*)ret)+2, rawkey(), length_from_header(rawkeylen(), datalen_)); return ret; } const byte* get_bytes(len_t *keylen, len_t *datalen) const { - *keylen = this->keylen(); - *datalen = datalen_; - return key(); + *keylen = this->rawkeylen(); + *datalen = datalen_; + return rawkey(); } //format of buf: key _ data. The caller needs to 'peel' off key length and data length for this call. static datatuple* from_bytes(len_t keylen, len_t datalen, byte* buf) { datatuple *dt = (datatuple*) malloc(sizeof(datatuple) + length_from_header(keylen,datalen)); dt->datalen_ = datalen; - memcpy(dt->key(),buf, length_from_header(keylen,datalen)); - dt->data_ = dt->key() + keylen; + memcpy(dt->rawkey(),buf, length_from_header(keylen,datalen)); + dt->data_ = dt->rawkey() + keylen; return dt->sanity_check(); } static datatuple* from_bytes(byte* buf) { @@ -169,8 +182,8 @@ public: len_t buflen = length_from_header(keylen, ((len_t*)buf)[1]); datatuple *dt = (datatuple*) malloc(sizeof(datatuple) + buflen); dt->datalen_ = ((len_t*)buf)[1]; - memcpy(dt->key(),((len_t*)buf)+2,buflen); - dt->data_ = dt->key() + keylen; + memcpy(dt->rawkey(),((len_t*)buf)+2,buflen); + dt->data_ = dt->rawkey() + keylen; return dt->sanity_check(); } diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 6334920..bf59348 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -65,7 +65,7 @@ void diskTreeComponent::writes_done() { int diskTreeComponent::insertTuple(int xid, datatuple *t) { if(bloom_filter) { - bloom_filter_insert(bloom_filter, (const char*)t->key(), t->keylen()); + bloom_filter_insert(bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen()); } int ret = 0; // no error. if(dp==0) { @@ -106,8 +106,8 @@ DataPage* diskTreeComponent::insertDataPage(int xid, datatuple *tuple ltree->appendPage(xid, - tuple->key(), - tuple->keylen(), + tuple->strippedkey(), + tuple->strippedkeylen(), dp->get_start_pid() ); @@ -896,7 +896,7 @@ void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * k lsmIterator_ = NULL; } else { if(key1) { - lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_, key1->key(), key1->keylen()); + lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_, key1->strippedkey(), key1->strippedkeylen()); } else { lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_); } diff --git a/logstore.cpp b/logstore.cpp index 4ceffa1..128967b 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -43,6 +43,8 @@ logtable::logtable(int log_mode, pageid_t max_c0_size, pageid_t internal_ this->shutting_down_ = false; c0_flushing = false; c1_flushing = false; + current_timestamp = 0; + expiry = 0; this->merge_mgr = 0; tmerger = new tuplemerger(&replace_merger); @@ -110,10 +112,10 @@ recordid logtable::allocTable(int xid) table_rec = Talloc(xid, sizeof(tbl_header)); mergeStats * stats = 0; //create the big tree - tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); + tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats, 10); //create the small tree - tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); + tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats, 10); merge_mgr = new mergeManager(this); merge_mgr->set_c0_size(max_c0_size); @@ -544,18 +546,17 @@ datatuple * logtable::insertTupleHelper(datatuple *tuple) datatuple *new_t = tmerger->merge(pre_t, tuple); merge_mgr->get_merge_stats(0)->merged_tuples(new_t, tuple, pre_t); t = new_t; + tree_c0->erase(pre_t); //remove the previous tuple - tree_c0->insert(new_t); //insert the new tuple - } else //no tuple with same key exists in mem-tree { - t = tuple->create_copy(); + t = tuple->create_copy(); - //insert tuple into the rbtree - tree_c0->insert(t); + //insert tuple into the rbtree + tree_c0->insert(t); } return pre_t; @@ -626,7 +627,7 @@ bool logtable::testAndSetTuple(datatuple *tuple, datatuple *tuple2) static pthread_mutex_t test_and_set_mut = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&test_and_set_mut); - datatuple * exists = findTuple_first(-1, tuple2 ? tuple2->key() : tuple->key(), tuple2 ? tuple2->keylen() : tuple->keylen()); + datatuple * exists = findTuple_first(-1, tuple2 ? tuple2->rawkey() : tuple->rawkey(), tuple2 ? tuple2->rawkeylen() : tuple->rawkeylen()); if(!tuple2 || tuple2->isDelete()) { if(!exists || exists->isDelete()) { diff --git a/logstore.h b/logstore.h index 9378152..ff21928 100644 --- a/logstore.h +++ b/logstore.h @@ -160,6 +160,9 @@ public: bool c0_flushing; bool c1_flushing; // this needs to be set to true at shutdown, or when the c0-c1 merger is waiting for c1-c2 to finish its merge + lsn_t current_timestamp; + lsn_t expiry; + //DATA PAGE SETTINGS pageid_t internal_region_size; // in number of pages pageid_t datapage_region_size; // " @@ -172,6 +175,33 @@ private: public: bool shutting_down_; + bool mightBeOnDisk(datatuple * t) { + if(tree_c1) { + if(!tree_c1->bloom_filter) { printf("no c1 bloom filter\n"); return true; } + if(bloom_filter_lookup(tree_c1->bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen())) { printf("in c1\n"); return true; } + } + if(tree_c1_prime) { + if(!tree_c1_prime->bloom_filter) { printf("no c1' bloom filter\n"); return true; } + if(bloom_filter_lookup(tree_c1_prime->bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen())) { printf("in c1'\n"); return true; } + } + return mightBeAfterMemMerge(t); + } + + bool mightBeAfterMemMerge(datatuple * t) { + + if(tree_c1_mergeable) { + if(!tree_c1_mergeable->bloom_filter) { printf("no c1m bloom filter\n"); return true; } + if(bloom_filter_lookup(tree_c1_mergeable->bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen())) { printf("in c1m'\n");return true; } + } + + + if(tree_c2) { + if(!tree_c2->bloom_filter) { printf("no c2 bloom filter\n"); return true; } + if(bloom_filter_lookup(tree_c2->bloom_filter, (const char*)t->strippedkey(), t->strippedkeylen())) { printf("in c2\n");return true; } + } + return false; + } + template class mergeManyIterator { public: @@ -325,7 +355,7 @@ public: revalidate(); TUPLE * tmp = merge_it_->next_callerFrees(); if(last_returned && tmp) { - assert(TUPLE::compare(last_returned->key(), last_returned->keylen(), tmp->key(), tmp->keylen()) < 0); + assert(TUPLE::compare(last_returned->strippedkey(), last_returned->strippedkeylen(), tmp->strippedkey(), tmp->strippedkeylen()) < 0); TUPLE::freetuple(last_returned); } last_returned = tmp; @@ -433,7 +463,7 @@ public: merge_it_ = new merge_it_t(inner_merge_it, disk_it, 4, NULL, TUPLE::compare_obj); // XXX Hardcodes comparator, and does not handle merges if(last_returned) { TUPLE * junk = merge_it_->peek(); - if(junk && !TUPLE::compare(junk->key(), junk->keylen(), last_returned->key(), last_returned->keylen())) { + if(junk && !TUPLE::compare(junk->strippedkey(), junk->strippedkeylen(), last_returned->strippedkey(), last_returned->strippedkeylen())) { // we already returned junk TUPLE::freetuple(merge_it_->next_callerFrees()); } diff --git a/merger.cpp b/merger.cpp index 527377f..90a0289 100644 --- a/merger.cpp +++ b/merger.cpp @@ -26,6 +26,17 @@ void merge_scheduler::start() { pthread_create(&disk_merge_thread_, 0, diskMerge_thr, this); } +bool insert_filter(logtable * ltable, datatuple * t, bool dropDeletes) { + if(t->isDelete()) { + if(dropDeletes || ! ltable->mightBeAfterMemMerge(t)) { + return false; + } + } + if(!ltable->expiry) { return true; } + if(t->timestamp() > ltable->current_timestamp + ltable->expiry) { return false; } + return true; +} + template void merge_iterators(int xid, diskTreeComponent * forceMe, ITA *itrA, @@ -189,7 +200,7 @@ void * merge_scheduler::memMergeThread() { stats->handed_off_tree(); // 8: c1 = new empty. - ltable_->set_tree_c1(new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats)); + ltable_->set_tree_c1(new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, 10)); pthread_cond_signal(<able_->c1_ready); ltable_->update_persistent_header(xid); @@ -388,10 +399,10 @@ void merge_iterators(int xid, DEBUG("tuple\t%lld: keylen %d datalen %d\n", ntuples, *(t2->keylen),*(t2->datalen) ); - while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2 + while(t1 != 0 && datatuple::compare(t1->rawkey(), t1->rawkeylen(), t2->rawkey(), t2->rawkeylen()) < 0) // t1 is less than t2 { //insert t1 - if((!t1->isDelete()) || !dropDeletes) { + if(insert_filter(ltable, t1, dropDeletes)) { scratch_tree->insertTuple(xid, t1); i+=t1->byte_length(); ltable->merge_mgr->wrote_tuple(stats->merge_level, t1); @@ -405,13 +416,13 @@ void merge_iterators(int xid, periodically_force(xid, &i, forceMe, log); } - if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) + if(t1 != 0 && datatuple::compare(t1->strippedkey(), t1->strippedkeylen(), t2->strippedkey(), t2->strippedkeylen()) == 0) { datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right. //insert merged tuple, drop deletes - if((!mtuple->isDelete()) || !dropDeletes) { + if(insert_filter(ltable, mtuple, dropDeletes)) { scratch_tree->insertTuple(xid, mtuple); i+=mtuple->byte_length(); ltable->merge_mgr->wrote_tuple(stats->merge_level, mtuple); @@ -425,7 +436,7 @@ void merge_iterators(int xid, else { //insert t2 - if((!t2->isDelete()) || !dropDeletes) { + if(insert_filter(ltable, t2, dropDeletes)) { scratch_tree->insertTuple(xid, t2); i+=t2->byte_length(); ltable->merge_mgr->wrote_tuple(stats->merge_level, t2); @@ -448,7 +459,7 @@ void merge_iterators(int xid, } while(t1 != 0) {// t2 is empty, but t1 still has stuff in it. - if((!t1->isDelete()) || !dropDeletes) { + if(insert_filter(ltable, t1, dropDeletes)) { scratch_tree->insertTuple(xid, t1); ltable->merge_mgr->wrote_tuple(stats->merge_level, t1); i += t1->byte_length(); diff --git a/requestDispatch.cpp b/requestDispatch.cpp index 44d6980..87bdb2c 100644 --- a/requestDispatch.cpp +++ b/requestDispatch.cpp @@ -48,7 +48,7 @@ inline int requestDispatch::op_bulk_insert(logtable *ltable, template inline int requestDispatch::op_find(logtable * ltable, HANDLE fd, datatuple * tuple) { //find the tuple - datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen()); + datatuple *dt = ltable->findTuple_first(-1, tuple->strippedkey(), tuple->strippedkeylen()); #ifdef STATS_ENABLED @@ -403,12 +403,12 @@ inline int requestDispatch::op_dbg_drop_database(logtable * l ltable->insertTuple(del); n++; if(!(n % 1000)) { - printf("X %lld %s\n", n, (char*)del->key()); fflush(stdout); + printf("X %lld %s\n", n, (char*)del->rawkey()); fflush(stdout); } } else { n++; if(!(n % 1000)) { - printf("? %lld %s\n", n, (char*)del->key()); fflush(stdout); + printf("? %lld %s\n", n, (char*)del->rawkey()); fflush(stdout); } } datatuple::freetuple(del); @@ -423,12 +423,12 @@ inline int requestDispatch::op_dbg_noop(logtable * ltable, HA } template inline int requestDispatch::op_dbg_set_log_mode(logtable * ltable, HANDLE fd, datatuple * tuple) { - if(tuple->keylen() != sizeof(int)) { + if(tuple->rawkeylen() != sizeof(int)) { abort(); return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR); } else { int old_mode = ltable->log_mode; - ltable->log_mode = *(int*)tuple->key(); + ltable->log_mode = *(int*)tuple->rawkey(); fprintf(stderr, "\n\nChanged log mode from %d to %d\n\n", old_mode, ltable->log_mode); return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } @@ -475,12 +475,12 @@ int requestDispatch::dispatch_request(network_op_t opcode, datatuple * t int err = 0; #if 0 if(tuple) { - char * printme = (char*)malloc(tuple->keylen()+1); - memcpy(printme, tuple->key(), tuple->keylen()); - printme[tuple->keylen()] = 0; - printf("\nop = %d, key = %s, isdelete = %d\n", opcode, printme, tuple->isDelete()); + char * printme = (char*)malloc(tuple->rawkeylen()+1); + memcpy(printme, tuple->rawkey(), tuple->rawkeylen()); + printme[tuple->rawkeylen()] = 0; + printf("\nop = %d, key = ->%s<-, isdelete = %d\n", opcode, printme, tuple->isDelete()); free(printme); - } + } #endif if(opcode == OP_INSERT) { diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index 8ea1ad3..cec840c 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -98,7 +98,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) { DataPage::iterator it = dp->begin(); datatuple * dt; while((dt = it.getnext()) != NULL) { - if(!strcmp((char*)dt->key(), key_arr[j].c_str())) { + if(!strcmp((char*)dt->rawkey(), key_arr[j].c_str())) { found = true; } datatuple::freetuple(dt); @@ -209,7 +209,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *dt=0; while( (dt=itr.getnext()) != NULL) { - assert(dt->keylen() == key_arr[tuplenum].length()+1); + assert(dt->rawkeylen() == key_arr[tuplenum].length()+1); assert(dt->datalen() == data_arr[tuplenum].length()+1); tuplenum++; datatuple::freetuple(dt); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index dfe5c22..7fd0dfd 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -86,7 +86,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *dt=0; while( (dt=tree_itr->next_callerFrees()) != NULL) { - assert(dt->keylen() == key_arr[tuplenum].length()+1); + assert(dt->rawkeylen() == key_arr[tuplenum].length()+1); assert(dt->datalen() == data_arr[tuplenum].length()+1); tuplenum++; datatuple::freetuple(dt); @@ -108,7 +108,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *dt = ltable_c1->findTuple(xid, (const datatuple::key_t) key_arr[ri].c_str(), (size_t)key_arr[ri].length()+1); assert(dt!=0); - assert(dt->keylen() == key_arr[ri].length()+1); + assert(dt->rawkeylen() == key_arr[ri].length()+1); assert(dt->datalen() == data_arr[ri].length()+1); datatuple::freetuple(dt); dt = 0; diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 3d312f2..dab96ee 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -126,7 +126,7 @@ void insertProbeIter(size_t NUM_ENTRIES) //if(dt!=0) { found_tuples++; - assert(dt->keylen() == (*key_arr)[ri].length()+1); + assert(dt->rawkeylen() == (*key_arr)[ri].length()+1); assert(dt->datalen() == (*data_arr)[ri].length()+1); datatuple::freetuple(dt); } diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index fd24e68..df37595 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -213,14 +213,14 @@ void insertProbeIter(size_t NUM_ENTRIES) assert(dt!=0); assert(!dt->isDelete()); found_tuples++; - assert(dt->keylen() == (*key_arr)[ri].length()+1); + assert(dt->rawkeylen() == (*key_arr)[ri].length()+1); datatuple::freetuple(dt); } else { if(dt!=0) { - assert(dt->keylen() == (*key_arr)[ri].length()+1); + assert(dt->rawkeylen() == (*key_arr)[ri].length()+1); assert(dt->isDelete()); datatuple::freetuple(dt); } diff --git a/test/check_rbtree.cpp b/test/check_rbtree.cpp index 55d63c8..e731647 100644 --- a/test/check_rbtree.cpp +++ b/test/check_rbtree.cpp @@ -77,7 +77,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *tuple = *rbitr; found_tuples++; - assert(tuple->keylen() == key_arr[ri].length()+1); + assert(tuple->rawkeylen() == key_arr[ri].length()+1); assert(tuple->datalen() == data_arr[ri].length()+1); } else diff --git a/test/check_tcpbulkinsert.cpp b/test/check_tcpbulkinsert.cpp index 8d4b878..5b58419 100644 --- a/test/check_tcpbulkinsert.cpp +++ b/test/check_tcpbulkinsert.cpp @@ -181,7 +181,7 @@ void insertProbeIter(size_t NUM_ENTRIES) assert(dt!=0); assert(!dt->isDelete()); found_tuples++; - assert(dt->keylen() == (*key_arr)[ri].length()+1); + assert(dt->rawkeylen() == (*key_arr)[ri].length()+1); //free dt datatuple::freetuple(dt); @@ -199,8 +199,8 @@ void insertProbeIter(size_t NUM_ENTRIES) size_t i = 0; while((tup = logstore_client_next_tuple(l))) { assert(!tup->isDelete()); - assert(tup->keylen() == (*key_arr)[i].length()+1); - assert(!memcmp(tup->key(), (*key_arr)[i].c_str(), (*key_arr)[i].length())); + assert(tup->rawkeylen() == (*key_arr)[i].length()+1); + assert(!memcmp(tup->rawkey(), (*key_arr)[i].c_str(), (*key_arr)[i].length())); datatuple::freetuple(tup); i++; } diff --git a/test/check_tcpclient.cpp b/test/check_tcpclient.cpp index 79a17d3..570de47 100644 --- a/test/check_tcpclient.cpp +++ b/test/check_tcpclient.cpp @@ -166,7 +166,7 @@ void insertProbeIter(size_t NUM_ENTRIES) assert(dt!=0); assert(!dt->isDelete()); found_tuples++; - assert(dt->keylen() == (*key_arr)[ri].length()+1); + assert(dt->rawkeylen() == (*key_arr)[ri].length()+1); //free dt datatuple::freetuple(dt); @@ -184,8 +184,8 @@ void insertProbeIter(size_t NUM_ENTRIES) size_t i = 0; while((tup = logstore_client_next_tuple(l))) { assert(!tup->isDelete()); - assert(tup->keylen() == (*key_arr)[i].length()+1); - assert(!memcmp(tup->key(), (*key_arr)[i].c_str(), (*key_arr)[i].length())); + assert(tup->rawkeylen() == (*key_arr)[i].length()+1); + assert(!memcmp(tup->rawkey(), (*key_arr)[i].c_str(), (*key_arr)[i].length())); datatuple::freetuple(tup); i++; } diff --git a/tuplemerger.cpp b/tuplemerger.cpp index e3c723f..84043db 100644 --- a/tuplemerger.cpp +++ b/tuplemerger.cpp @@ -5,23 +5,14 @@ // we return deletes here. our caller decides what to do with them. datatuple* tuplemerger::merge(const datatuple *t1, const datatuple *t2) { - datatuple *t; - if(t1->isDelete() && t2->isDelete()) { - t = t2->create_copy(); - } else if(t1->isDelete()) //delete -> t2 - { - t = t2->create_copy(); - } - else if(t2->isDelete()) - { - t = t2->create_copy(); - } - else //neither is a delete - { - t = (*merge_fp)(t1,t2); - } - - return t; + if(!(t1->isDelete() || t2->isDelete())) { + return (*merge_fp)(t1,t2); + } else { + // if there is at least one tombstone, we return t2 intact. + // t1 tombstone -> ignore it, and return t2. + // t2 tombstone -> return a tombstone (like t2). + return t2->create_copy(); + } } /** * appends the data in t2 to data from t1 @@ -32,13 +23,13 @@ datatuple* tuplemerger::merge(const datatuple *t1, const datatuple *t2) datatuple* append_merger(const datatuple *t1, const datatuple *t2) { assert(!(t1->isDelete() || t2->isDelete())); - len_t keylen = t1->keylen(); + len_t rawkeylen = t1->rawkeylen(); len_t datalen = t1->datalen() + t2->datalen(); byte * data = (byte*)malloc(datalen); memcpy(data, t1->data(), t1->datalen()); memcpy(data + t1->datalen(), t2->data(), t2->datalen()); - return datatuple::create(t1->key(), keylen, data, datalen); + return datatuple::create(t1->rawkey(), rawkeylen, data, datalen); } /** diff --git a/util/histogram.cpp b/util/histogram.cpp index ce457c3..8d6cdcb 100644 --- a/util/histogram.cpp +++ b/util/histogram.cpp @@ -37,13 +37,13 @@ int main(int argc, char * argv[]) { bool first = true; while(( ret = logstore_client_next_tuple(l) )) { if(first) { - assert(ret->keylen() == sizeof(uint64_t)); - uint64_t stride = *(uint64_t*)ret->key(); + assert(ret->rawkeylen() == sizeof(uint64_t)); + uint64_t stride = *(uint64_t*)ret->rawkey(); printf("Stride: %lld\n", (long long)stride); first = false; } else { - assert(ret->key()[ret->keylen()-1] == 0); // check for null terminator. - printf("\t%s\n", (char*)ret->key()); + assert(ret->strippedkey()[ret->strippedkeylen()-1] == 0); // check for null terminator. + printf("\t%s\n", (char*)ret->strippedkey()); } datatuple::freetuple(ret); } diff --git a/util/space_usage.cpp b/util/space_usage.cpp index 0d961a5..72581c0 100644 --- a/util/space_usage.cpp +++ b/util/space_usage.cpp @@ -23,9 +23,9 @@ int main(int argc, char * argv[]) { } logstore_client_close(l); - assert(ret->keylen() == sizeof(uint64_t)); + assert(ret->rawkeylen() == sizeof(uint64_t)); assert(ret->datalen() == sizeof(uint64_t)); - printf("Tree is %llu MB Store file is %llu MB\n", (unsigned long long)(*(uint64_t*)ret->key()) / (1024*1024), (unsigned long long)(*(uint64_t*)ret->data()) / (1024*1024)); + printf("Tree is %llu MB Store file is %llu MB\n", (unsigned long long)(*(uint64_t*)ret->rawkey()) / (1024*1024), (unsigned long long)(*(uint64_t*)ret->data()) / (1024*1024)); datatuple::freetuple(ret); ; return 0;