diff --git a/datapage.cpp b/datapage.cpp index 096b35a..b8981a8 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -116,6 +116,9 @@ void DataPage::initialize_page(pageid_t pageid) { //initialize header p->pageType = DATA_PAGE; + //clear page (arranges for null-padding) + memset(p->memAddr, 0, PAGE_SIZE); + //we're the last page for now. *is_another_page_ptr(p) = 0; @@ -263,7 +266,7 @@ bool DataPage::append(TUPLE const * dat) } template -bool DataPage::recordRead(typename TUPLE::key_t key, size_t keySize, TUPLE ** buf) +bool DataPage::recordRead(const typename TUPLE::key_t key, size_t keySize, TUPLE ** buf) { iterator itr(this, NULL); diff --git a/datapage.h b/datapage.h index d59c334..407c81a 100644 --- a/datapage.h +++ b/datapage.h @@ -87,7 +87,7 @@ public: } bool append(TUPLE const * dat); - bool recordRead(typename TUPLE::key_t key, size_t keySize, TUPLE ** buf); + bool recordRead(const typename TUPLE::key_t key, size_t keySize, TUPLE ** buf); inline uint16_t recordCount(); diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index ed63d14..af3e7af 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -504,6 +504,9 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo DEBUG("%lld <-> %lld\n", lastLeaf, child); } + // Crucially, this happens *after* the recursion. Therefore, we can query the + // tree with impunity while the leaf is being built and don't have to worry + // about dangling pointers to pages that are in the process of being allocated. writeNodeRecord(xid, root_p, root, key, key_len, child); return ret; diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index b3aa3d8..5b55f07 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -22,6 +22,108 @@ template class DataPage; +void insertWithConcurrentReads(size_t NUM_ENTRIES) { + srand(1001); + unlink("storefile.txt"); + unlink("logfile.txt"); + + sync(); + + logtable::init_stasis(); + + int xid = Tbegin(); + + + std::vector data_arr; + std::vector key_arr; + preprandstr(NUM_ENTRIES, data_arr, 5*4096, true); + preprandstr(NUM_ENTRIES+200, key_arr, 50, true);//well i can handle upto 200 + + std::sort(key_arr.begin(), key_arr.end(), &mycmp); + + removeduplicates(key_arr); + if(key_arr.size() > NUM_ENTRIES) + key_arr.erase(key_arr.begin()+NUM_ENTRIES, key_arr.end()); + + NUM_ENTRIES=key_arr.size(); + + if(data_arr.size() > NUM_ENTRIES) + data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end()); + + RegionAllocator * alloc = new RegionAllocator(xid, 10000); // ~ 10 datapages per region. + + printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); + + int pcount = 1000; + int dpages = 0; + DataPage *dp=0; + int64_t datasize = 0; + std::vector dsp; + size_t last_i = 0; + for(size_t i = 0; i < NUM_ENTRIES; i++) + { + //prepare the key + datatuple *newtuple = datatuple::create(key_arr[i].c_str(), key_arr[i].length()+1, data_arr[i].c_str(), data_arr[i].length()+1); + + datasize += newtuple->byte_length(); + if(dp==NULL || !dp->append(newtuple)) + { + last_i = i; + dpages++; + if(dp) + dp->writes_done(); + delete dp; + + // Free the old region allocator so that we repeatedly overwrite the same space. This will find bugs when we fail to null pad at write. + alloc->done(); + alloc->dealloc_regions(xid); + delete alloc; + Tcommit(xid); + xid = Tbegin(); + alloc = new RegionAllocator(xid, 10000); + + dp = new DataPage(xid, pcount, alloc); +// printf("%lld\n", dp->get_start_pid()); + bool succ = dp->append(newtuple); + assert(succ); + + dsp.push_back(dp->get_start_pid()); + } + size_t j = (rand() % (2 * (1 + i - last_i))) + last_i; + + bool found = 0; + { + DataPage::iterator it = dp->begin(); + datatuple * dt; + while((dt = it.getnext()) != NULL) { + if(!strcmp((char*)dt->key(), key_arr[j].c_str())) { + found = true; + } + datatuple::freetuple(dt); + } + } + if(found) { + assert(j <= i); +// printf("found!"); + } else { + assert(i < j); + } + } + if(dp) { + dp->writes_done(); + delete dp; + } + + printf("Total data set length: %lld\n", (long long)datasize); + printf("Storage utilization: %.2f\n", (datasize+.0) / (PAGE_SIZE * pcount * dpages)); + printf("Number of datapages: %d\n", dpages); + printf("Writes complete.\n"); + + Tcommit(xid); + + logtable::deinit_stasis(); +} + void insertProbeIter(size_t NUM_ENTRIES) { srand(1000); @@ -126,10 +228,13 @@ void insertProbeIter(size_t NUM_ENTRIES) */ int main() { - insertProbeIter(10000); - - - return 0; + insertWithConcurrentReads(5000); + + insertProbeIter(10000); + + + + return 0; }