From 53a7982f47b01600edd76e921ea3fc44d2ff21a5 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 13 Nov 2008 04:18:50 +0000 Subject: [PATCH] fixes numerous blob bugs, allowing linearHashNTA to store arbitrary length key,value pairs. --- src/stasis/blobManager.c | 20 +++++--- src/stasis/operations/alloc.c | 19 +++++-- src/stasis/operations/set.c | 5 +- src/stasis/transactional2.c | 3 +- stasis/blobManager.h | 2 +- test/stasis/check_linearHashNTA.c | 83 ++++++++++++++++++++++++++++++- 6 files changed, 115 insertions(+), 17 deletions(-) diff --git a/src/stasis/blobManager.c b/src/stasis/blobManager.c index 1d84838..6cd8703 100644 --- a/src/stasis/blobManager.c +++ b/src/stasis/blobManager.c @@ -11,15 +11,21 @@ void allocBlob(int xid, recordid rid) { blob_record_t rec; rec.offset = startPage; rec.size = rid.size; - recordid rid2 = rid; - rid2.size = BLOB_SLOT; - Tset(xid, rid2, (byte*)&rec); - // printf("Page start = %d, count = %d, rid.size=%d\n", rec.offset, pageCount, rid.size); - // printf("rid = {%d %d %d}\n", rid.page, rid.slot, rid.size); + rid.size = sizeof(rec); + TsetRaw(xid, rid, (byte*)&rec); + DEBUG("Page start = %d, count = %d, rid.size=%d\n", rec.offset, pageCount, rid.size); + DEBUG("rid = {%d %d %d}\n", rid.page, rid.slot, rid.size); } -void deallocBlob(int xid, recordid rid) { - TregionDealloc(xid, rid.page); +void deallocBlob(int xid, blob_record_t *r) { + /* Page *p = loadPage(xid, rid.page); + writelock(p->rwlatch,0); + blob_record_t r; + rid.size = sizeof(blob_record_t); + stasis_record_read(xid, p, rid, (byte*)&r); + unlock(p->rwlatch); + releasePage(p); */ + TregionDealloc(xid, r->offset); } void readBlob(int xid, Page * p2, recordid rid, byte * buf) { diff --git a/src/stasis/operations/alloc.c b/src/stasis/operations/alloc.c index 4ae6b74..5eba1ac 100644 --- a/src/stasis/operations/alloc.c +++ b/src/stasis/operations/alloc.c @@ -378,6 +378,7 @@ compensated_function recordid TallocFromPage(int xid, pageid_t page, unsigned lo if(size >= BLOB_THRESHOLD_SIZE) { type = BLOB_SLOT; } else { + assert(size > 0); type = size; } @@ -390,6 +391,7 @@ compensated_function recordid TallocFromPage(int xid, pageid_t page, unsigned lo writelock(p->rwlatch,0); recordid rid = stasis_record_alloc_begin(xid, p, type); + if(rid.size != INVALID_SLOT) { stasis_record_alloc_done(xid,p,rid); allocationPolicyAllocedFromPage(allocPolicy, xid, page); @@ -464,12 +466,12 @@ compensated_function void Tdealloc(int xid, recordid rid) { releasePage(p); - if(type==BLOB_SLOT) { - deallocBlob(xid,rid); - } - pthread_mutex_unlock(&talloc_mutex); + if(type==BLOB_SLOT) { + deallocBlob(xid,(blob_record_t*)(preimage+sizeof(alloc_arg))); + } + free(preimage); } @@ -490,7 +492,14 @@ compensated_function int TrecordSize(int xid, recordid rid) { Page * p; p = loadPage(xid, rid.page); readlock(p->rwlatch,0); - ret = stasis_record_length_read(xid, p, rid); + rid.size = stasis_record_length_read(xid, p, rid); + if(stasis_record_type_read(xid,p,rid) == BLOB_SLOT) { + blob_record_t r; + stasis_record_read(xid,p,rid,(byte*)&r); + ret = r.size; + } else { + ret = rid.size; + } unlock(p->rwlatch); releasePage(p); return ret; diff --git a/src/stasis/operations/set.c b/src/stasis/operations/set.c index dfc49f9..ee920f3 100644 --- a/src/stasis/operations/set.c +++ b/src/stasis/operations/set.c @@ -93,13 +93,14 @@ int Tset(int xid, recordid rid, const void * dat) { Page * p = loadPage(xid, rid.page); readlock(p->rwlatch,0); rid = stasis_record_dereference(xid,p,rid); - rid.size = stasis_record_type_to_size(rid.size); + short type = stasis_record_type_read(xid,p,rid); - if(rid.size > BLOB_THRESHOLD_SIZE) { + if(type == BLOB_SLOT) { writeBlob(xid,p,rid,dat); unlock(p->rwlatch); releasePage(p); } else { + rid.size = stasis_record_type_to_size(rid.size); if(rid.page == p->id) { // failing early avoids unrecoverable logs... assert(rid.size == stasis_record_length_read(xid, p, rid)); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 3c66133..83693e9 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -320,7 +320,8 @@ compensated_function void Tread(int xid, recordid rid, void * dat) { p = loadPage(xid, rid.page); readlock(p->rwlatch,0); } - if(rid.size > BLOB_THRESHOLD_SIZE) { + short type = stasis_record_type_read(xid,p,rid); + if(type == BLOB_SLOT) { DEBUG("call readBlob %lld %lld %lld\n", (long long)rid.page, (long long)rid.slot, (long long)rid.size); readBlob(xid,p,rid,dat); assert(rid.page == p->id); diff --git a/stasis/blobManager.h b/stasis/blobManager.h index e772f62..1e17324 100644 --- a/stasis/blobManager.h +++ b/stasis/blobManager.h @@ -68,7 +68,7 @@ compensated_function recordid preAllocBlobFromPage(int xid, long page, long blob */ void allocBlob(int xid, recordid rid); -void deallocBlob(int xid, recordid rid); +void deallocBlob(int xid, blob_record_t* r); page_impl blobImpl(); diff --git a/test/stasis/check_linearHashNTA.c b/test/stasis/check_linearHashNTA.c index 43839df..4532e29 100644 --- a/test/stasis/check_linearHashNTA.c +++ b/test/stasis/check_linearHashNTA.c @@ -56,7 +56,22 @@ terms specified in this license. #define LOG_NAME "check_linearHashNTA.log" static const int NUM_ENTRIES = 100000; -/** @test + +#define ARRAY_SIZE (2 * 3 * (int)(PAGE_SIZE * 1.5)) +static void arraySet(int * a, int mul) { + int i; + + for ( i = 0 ; i < ARRAY_SIZE; i++) { + a[i]= mul*i; + } +} + +static int arryCmp(int * a, int * b, int len) { + return memcmp(a,b,len); +} + +/** + @test */ START_TEST(linearHashNTAtest) { @@ -307,6 +322,71 @@ START_TEST(linearHashNTAThreadedTestRandomized) { Tdeinit(); } END_TEST #endif // LONG_TEST +/** + @test Test linear hash nta when the values it stores are larger + than a single page. + */ +START_TEST(linearHashNTABlobTest) { + Tinit(); + + int arry1[ARRAY_SIZE]; + int arry2[ARRAY_SIZE]; + int arry3[ARRAY_SIZE]; + int arry4[ARRAY_SIZE]; + int *scratch; + int alen=ARRAY_SIZE*sizeof(int); + int one, two, three, four; + int len1,len2,len3; // len4; + + arraySet(arry1,1); one = 1; + arraySet(arry2,1); two = 2; + arraySet(arry3,1); three = 3; + arraySet(arry4,1); four = 4; + + int xid = Tbegin(); + recordid rid = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); + ThashInsert(xid,rid,(byte*)&one,sizeof(one),(byte*)arry1,alen); + len1 = ThashLookup(xid,rid,(byte*)&one,sizeof(one),(byte**)&scratch); + assert(len1==alen); + assert(!arryCmp(arry1,scratch,alen)); + free(scratch); + Tcommit(xid); + xid = Tbegin(); + ThashInsert(xid,rid,(byte*)&two, sizeof(two), (byte*)arry2,alen/2); + ThashInsert(xid,rid,(byte*)&three,sizeof(three),(byte*)arry3,alen/3); + + len2 = ThashLookup(xid,rid,(byte*)&two, sizeof(two), (byte**)&scratch); + assert(len2 == alen/2); + assert(!arryCmp(scratch,arry2,alen/2)); + free(scratch); + + len3 = ThashLookup(xid,rid,(byte*)&three, sizeof(three), (byte**)&scratch); + assert(len3 == alen/3); + assert(!arryCmp(scratch,arry3,alen/3)); + free(scratch); + + Tabort(xid); + + Tdeinit(); + Tinit(); + + xid = Tbegin(); + len1 = ThashLookup(xid,rid,(byte*)&one, sizeof(one), (byte**)&scratch); + assert(len1 == alen); + assert(!arryCmp(scratch,arry1,alen)); + free(scratch); + + len3 = ThashLookup(xid,rid,(byte*)&two, sizeof(two), (byte**)&scratch); + assert(len3 == -1); + Tcommit(xid); + + Tdeinit(); + + Tinit(); + + Tdeinit(); +} END_TEST + START_TEST(linearHashNTAIteratortest) { Tinit(); int xid = Tbegin(); @@ -417,6 +497,7 @@ Suite * check_suite(void) { tcase_add_test(tc, linearHashNTAIteratortest); tcase_add_test(tc, linearHashNTAtest); tcase_add_test(tc, linearHashNTAThreadedTest); + tcase_add_test(tc, linearHashNTABlobTest); #ifdef LONG_TEST tcase_add_test(tc, linearHashNTAThreadedTestRandomized); tcase_add_test(tc, linearHashNTAThreadedTestRandomized);