fix latching bug in scan

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2483 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2011-04-28 20:18:54 +00:00
parent e81b8522d7
commit c910a7af8f

View file

@ -291,6 +291,8 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
// with the last slot. so thats why i go backwards here. // with the last slot. so thats why i go backwards here.
DEBUG("slots %d (%d) keysize=%lld\n", (int)last_slot+1, (int)FIRST_SLOT+1, (long long int)keySize); DEBUG("slots %d (%d) keysize=%lld\n", (int)last_slot+1, (int)FIRST_SLOT+1, (long long int)keySize);
assert(numslots >= FIRST_SLOT+1); assert(numslots >= FIRST_SLOT+1);
writelock(p->rwlatch,0);
// Note that we leave the first slot in place. // Note that we leave the first slot in place.
for(int i = numslots-1; i>FIRST_SLOT; i--) { for(int i = numslots-1; i>FIRST_SLOT; i--) {
recordid tmp_rec= {p->id, i, INVALID_SIZE}; recordid tmp_rec= {p->id, i, INVALID_SIZE};
@ -314,7 +316,7 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
depth ++; depth ++;
recordid depth_rid = { p->id, DEPTH, root_rec_size }; recordid depth_rid = { p->id, DEPTH, root_rec_size };
stasis_record_write(xid, p, depth_rid, (byte*)(&depth)); stasis_record_write(xid, p, depth_rid, (byte*)(&depth));
unlock(p->rwlatch);
assert(tree.page == p->id); assert(tree.page == p->id);
ret = appendInternalNode(xid, p, depth, key, keySize, val_page); ret = appendInternalNode(xid, p, depth, key, keySize, val_page);
@ -748,9 +750,15 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r
const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p, rid); const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p, rid);
int64_t depth = nr->ptr; int64_t depth = nr->ptr;
justOnePage = (depth == 0);
DEBUG("DEPTH = %lld\n", depth); DEBUG("DEPTH = %lld\n", depth);
stasis_record_read_done(xid,p,rid,(const byte*)nr); stasis_record_read_done(xid,p,rid,(const byte*)nr);
unlock(p->rwlatch); // NOTE: The root page is not append only. We need to hold onto
// this latch throughout the iteration to protect ourselves from
// root tree splits. For multi-level trees, this is not the case,
// as everything below the root is append-only, so we can release
// and reacquire the latches if need be.
if(!justOnePage) unlock(p->rwlatch);
pageid_t leafid = diskTreeComponent::internalNodes::findFirstLeaf(xid, p, depth); pageid_t leafid = diskTreeComponent::internalNodes::findFirstLeaf(xid, p, depth);
if(leafid != root.page) { if(leafid != root.page) {
@ -773,8 +781,7 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r
xid_ = xid; xid_ = xid;
done = false; done = false;
t = 0; t = 0;
justOnePage = (depth == 0); if(!justOnePage) readlock(p->rwlatch,0);
readlock(p->rwlatch,0);
} }
diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root, const byte* key, len_t keylen) { diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root, const byte* key, len_t keylen) {
@ -788,22 +795,29 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r
readlock(p->rwlatch,0); readlock(p->rwlatch,0);
const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p,rid); const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p,rid);
int64_t depth = nr->ptr; int64_t depth = nr->ptr;
justOnePage = (depth==0);
stasis_record_read_done(xid,p,rid,(const byte*)nr); stasis_record_read_done(xid,p,rid,(const byte*)nr);
unlock(p->rwlatch);
recordid lsm_entry_rid = diskTreeComponent::internalNodes::lookup(xid,p,depth,key,keylen); recordid lsm_entry_rid = diskTreeComponent::internalNodes::lookup(xid,p,depth,key,keylen);
if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) { if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) {
unlock(p->rwlatch);
releasePage(p); releasePage(p);
p = NULL; p = NULL;
done = true; done = true;
} else { } else {
if(!justOnePage) unlock(p->rwlatch);
assert(lsm_entry_rid.size != INVALID_SLOT); assert(lsm_entry_rid.size != INVALID_SLOT);
if(root.page != lsm_entry_rid.page) if(root.page != lsm_entry_rid.page)
{ {
releasePage(p); releasePage(p);
p = ro_alloc->load_page(xid,lsm_entry_rid.page); p = ro_alloc->load_page(xid,lsm_entry_rid.page);
assert(!justOnePage);
} else {
assert(justOnePage);
} }
done = false; done = false;
@ -812,12 +826,11 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r
current.size = lsm_entry_rid.size; current.size = lsm_entry_rid.size;
xid_ = xid; xid_ = xid;
justOnePage = (depth==0);
DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, current.page, rec->ptr, key); DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, current.page, rec->ptr, key);
DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key); DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key);
readlock(p->rwlatch,0); if(!justOnePage) readlock(p->rwlatch,0);
} }
t = 0; // must be zero so free() doesn't croak. t = 0; // must be zero so free() doesn't croak.
} }