diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index bf59348..60ca433 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -291,6 +291,8 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid, // with the last slot. so thats why i go backwards here. DEBUG("slots %d (%d) keysize=%lld\n", (int)last_slot+1, (int)FIRST_SLOT+1, (long long int)keySize); assert(numslots >= FIRST_SLOT+1); + + writelock(p->rwlatch,0); // Note that we leave the first slot in place. for(int i = numslots-1; i>FIRST_SLOT; i--) { recordid tmp_rec= {p->id, i, INVALID_SIZE}; @@ -314,7 +316,7 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid, depth ++; recordid depth_rid = { p->id, DEPTH, root_rec_size }; stasis_record_write(xid, p, depth_rid, (byte*)(&depth)); - + unlock(p->rwlatch); assert(tree.page == p->id); ret = appendInternalNode(xid, p, depth, key, keySize, val_page); @@ -748,9 +750,15 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p, rid); int64_t depth = nr->ptr; + justOnePage = (depth == 0); DEBUG("DEPTH = %lld\n", depth); stasis_record_read_done(xid,p,rid,(const byte*)nr); - unlock(p->rwlatch); + // NOTE: The root page is not append only. We need to hold onto + // this latch throughout the iteration to protect ourselves from + // root tree splits. For multi-level trees, this is not the case, + // as everything below the root is append-only, so we can release + // and reacquire the latches if need be. + if(!justOnePage) unlock(p->rwlatch); pageid_t leafid = diskTreeComponent::internalNodes::findFirstLeaf(xid, p, depth); if(leafid != root.page) { @@ -773,8 +781,7 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r xid_ = xid; done = false; t = 0; - justOnePage = (depth == 0); - readlock(p->rwlatch,0); + if(!justOnePage) readlock(p->rwlatch,0); } diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root, const byte* key, len_t keylen) { @@ -788,22 +795,29 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r readlock(p->rwlatch,0); const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p,rid); int64_t depth = nr->ptr; + justOnePage = (depth==0); stasis_record_read_done(xid,p,rid,(const byte*)nr); - unlock(p->rwlatch); recordid lsm_entry_rid = diskTreeComponent::internalNodes::lookup(xid,p,depth,key,keylen); if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) { + unlock(p->rwlatch); releasePage(p); p = NULL; done = true; } else { + + if(!justOnePage) unlock(p->rwlatch); + assert(lsm_entry_rid.size != INVALID_SLOT); if(root.page != lsm_entry_rid.page) { releasePage(p); p = ro_alloc->load_page(xid,lsm_entry_rid.page); + assert(!justOnePage); + } else { + assert(justOnePage); } done = false; @@ -812,12 +826,11 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r current.size = lsm_entry_rid.size; xid_ = xid; - justOnePage = (depth==0); DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, current.page, rec->ptr, key); DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key); - readlock(p->rwlatch,0); + if(!justOnePage) readlock(p->rwlatch,0); } t = 0; // must be zero so free() doesn't croak. }