laying the groundwork for snowshoveling; as of this commit, pointers and null padding happens in an order that should let us query incomplete tree components. We should only need short page latches at this point (so that we do not look at partially written keys/values)

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@931 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-07-14 22:42:26 +00:00
parent 9dcb624649
commit eb8b57378a
4 changed files with 117 additions and 6 deletions

View file

@ -116,6 +116,9 @@ void DataPage<TUPLE>::initialize_page(pageid_t pageid) {
//initialize header //initialize header
p->pageType = DATA_PAGE; p->pageType = DATA_PAGE;
//clear page (arranges for null-padding)
memset(p->memAddr, 0, PAGE_SIZE);
//we're the last page for now. //we're the last page for now.
*is_another_page_ptr(p) = 0; *is_another_page_ptr(p) = 0;
@ -263,7 +266,7 @@ bool DataPage<TUPLE>::append(TUPLE const * dat)
} }
template <class TUPLE> template <class TUPLE>
bool DataPage<TUPLE>::recordRead(typename TUPLE::key_t key, size_t keySize, TUPLE ** buf) bool DataPage<TUPLE>::recordRead(const typename TUPLE::key_t key, size_t keySize, TUPLE ** buf)
{ {
iterator itr(this, NULL); iterator itr(this, NULL);

View file

@ -87,7 +87,7 @@ public:
} }
bool append(TUPLE const * dat); bool append(TUPLE const * dat);
bool recordRead(typename TUPLE::key_t key, size_t keySize, TUPLE ** buf); bool recordRead(const typename TUPLE::key_t key, size_t keySize, TUPLE ** buf);
inline uint16_t recordCount(); inline uint16_t recordCount();

View file

@ -504,6 +504,9 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo
DEBUG("%lld <-> %lld\n", lastLeaf, child); DEBUG("%lld <-> %lld\n", lastLeaf, child);
} }
// Crucially, this happens *after* the recursion. Therefore, we can query the
// tree with impunity while the leaf is being built and don't have to worry
// about dangling pointers to pages that are in the process of being allocated.
writeNodeRecord(xid, root_p, root, key, key_len, child); writeNodeRecord(xid, root_p, root, key, key_len, child);
return ret; return ret;

View file

@ -22,6 +22,108 @@
template class DataPage<datatuple>; template class DataPage<datatuple>;
void insertWithConcurrentReads(size_t NUM_ENTRIES) {
srand(1001);
unlink("storefile.txt");
unlink("logfile.txt");
sync();
logtable<datatuple>::init_stasis();
int xid = Tbegin();
std::vector<std::string> data_arr;
std::vector<std::string> key_arr;
preprandstr(NUM_ENTRIES, data_arr, 5*4096, true);
preprandstr(NUM_ENTRIES+200, key_arr, 50, true);//well i can handle upto 200
std::sort(key_arr.begin(), key_arr.end(), &mycmp);
removeduplicates(key_arr);
if(key_arr.size() > NUM_ENTRIES)
key_arr.erase(key_arr.begin()+NUM_ENTRIES, key_arr.end());
NUM_ENTRIES=key_arr.size();
if(data_arr.size() > NUM_ENTRIES)
data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end());
RegionAllocator * alloc = new RegionAllocator(xid, 10000); // ~ 10 datapages per region.
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);
int pcount = 1000;
int dpages = 0;
DataPage<datatuple> *dp=0;
int64_t datasize = 0;
std::vector<pageid_t> dsp;
size_t last_i = 0;
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
datatuple *newtuple = datatuple::create(key_arr[i].c_str(), key_arr[i].length()+1, data_arr[i].c_str(), data_arr[i].length()+1);
datasize += newtuple->byte_length();
if(dp==NULL || !dp->append(newtuple))
{
last_i = i;
dpages++;
if(dp)
dp->writes_done();
delete dp;
// Free the old region allocator so that we repeatedly overwrite the same space. This will find bugs when we fail to null pad at write.
alloc->done();
alloc->dealloc_regions(xid);
delete alloc;
Tcommit(xid);
xid = Tbegin();
alloc = new RegionAllocator(xid, 10000);
dp = new DataPage<datatuple>(xid, pcount, alloc);
// printf("%lld\n", dp->get_start_pid());
bool succ = dp->append(newtuple);
assert(succ);
dsp.push_back(dp->get_start_pid());
}
size_t j = (rand() % (2 * (1 + i - last_i))) + last_i;
bool found = 0;
{
DataPage<datatuple>::iterator it = dp->begin();
datatuple * dt;
while((dt = it.getnext()) != NULL) {
if(!strcmp((char*)dt->key(), key_arr[j].c_str())) {
found = true;
}
datatuple::freetuple(dt);
}
}
if(found) {
assert(j <= i);
// printf("found!");
} else {
assert(i < j);
}
}
if(dp) {
dp->writes_done();
delete dp;
}
printf("Total data set length: %lld\n", (long long)datasize);
printf("Storage utilization: %.2f\n", (datasize+.0) / (PAGE_SIZE * pcount * dpages));
printf("Number of datapages: %d\n", dpages);
printf("Writes complete.\n");
Tcommit(xid);
logtable<datatuple>::deinit_stasis();
}
void insertProbeIter(size_t NUM_ENTRIES) void insertProbeIter(size_t NUM_ENTRIES)
{ {
srand(1000); srand(1000);
@ -126,6 +228,9 @@ void insertProbeIter(size_t NUM_ENTRIES)
*/ */
int main() int main()
{ {
insertWithConcurrentReads(5000);
insertProbeIter(10000); insertProbeIter(10000);