fixes numerous blob bugs, allowing linearHashNTA to store arbitrary length key,value pairs.

This commit is contained in:
Sears Russell 2008-11-13 04:18:50 +00:00
parent e842987915
commit 53a7982f47
6 changed files with 115 additions and 17 deletions

View file

@ -11,15 +11,21 @@ void allocBlob(int xid, recordid rid) {
blob_record_t rec;
rec.offset = startPage;
rec.size = rid.size;
recordid rid2 = rid;
rid2.size = BLOB_SLOT;
Tset(xid, rid2, (byte*)&rec);
// printf("Page start = %d, count = %d, rid.size=%d\n", rec.offset, pageCount, rid.size);
// printf("rid = {%d %d %d}\n", rid.page, rid.slot, rid.size);
rid.size = sizeof(rec);
TsetRaw(xid, rid, (byte*)&rec);
DEBUG("Page start = %d, count = %d, rid.size=%d\n", rec.offset, pageCount, rid.size);
DEBUG("rid = {%d %d %d}\n", rid.page, rid.slot, rid.size);
}
void deallocBlob(int xid, recordid rid) {
TregionDealloc(xid, rid.page);
void deallocBlob(int xid, blob_record_t *r) {
/* Page *p = loadPage(xid, rid.page);
writelock(p->rwlatch,0);
blob_record_t r;
rid.size = sizeof(blob_record_t);
stasis_record_read(xid, p, rid, (byte*)&r);
unlock(p->rwlatch);
releasePage(p); */
TregionDealloc(xid, r->offset);
}
void readBlob(int xid, Page * p2, recordid rid, byte * buf) {

View file

@ -378,6 +378,7 @@ compensated_function recordid TallocFromPage(int xid, pageid_t page, unsigned lo
if(size >= BLOB_THRESHOLD_SIZE) {
type = BLOB_SLOT;
} else {
assert(size > 0);
type = size;
}
@ -390,6 +391,7 @@ compensated_function recordid TallocFromPage(int xid, pageid_t page, unsigned lo
writelock(p->rwlatch,0);
recordid rid = stasis_record_alloc_begin(xid, p, type);
if(rid.size != INVALID_SLOT) {
stasis_record_alloc_done(xid,p,rid);
allocationPolicyAllocedFromPage(allocPolicy, xid, page);
@ -464,12 +466,12 @@ compensated_function void Tdealloc(int xid, recordid rid) {
releasePage(p);
if(type==BLOB_SLOT) {
deallocBlob(xid,rid);
}
pthread_mutex_unlock(&talloc_mutex);
if(type==BLOB_SLOT) {
deallocBlob(xid,(blob_record_t*)(preimage+sizeof(alloc_arg)));
}
free(preimage);
}
@ -490,7 +492,14 @@ compensated_function int TrecordSize(int xid, recordid rid) {
Page * p;
p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
ret = stasis_record_length_read(xid, p, rid);
rid.size = stasis_record_length_read(xid, p, rid);
if(stasis_record_type_read(xid,p,rid) == BLOB_SLOT) {
blob_record_t r;
stasis_record_read(xid,p,rid,(byte*)&r);
ret = r.size;
} else {
ret = rid.size;
}
unlock(p->rwlatch);
releasePage(p);
return ret;

View file

@ -93,13 +93,14 @@ int Tset(int xid, recordid rid, const void * dat) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
rid = stasis_record_dereference(xid,p,rid);
rid.size = stasis_record_type_to_size(rid.size);
short type = stasis_record_type_read(xid,p,rid);
if(rid.size > BLOB_THRESHOLD_SIZE) {
if(type == BLOB_SLOT) {
writeBlob(xid,p,rid,dat);
unlock(p->rwlatch);
releasePage(p);
} else {
rid.size = stasis_record_type_to_size(rid.size);
if(rid.page == p->id) {
// failing early avoids unrecoverable logs...
assert(rid.size == stasis_record_length_read(xid, p, rid));

View file

@ -320,7 +320,8 @@ compensated_function void Tread(int xid, recordid rid, void * dat) {
p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
}
if(rid.size > BLOB_THRESHOLD_SIZE) {
short type = stasis_record_type_read(xid,p,rid);
if(type == BLOB_SLOT) {
DEBUG("call readBlob %lld %lld %lld\n", (long long)rid.page, (long long)rid.slot, (long long)rid.size);
readBlob(xid,p,rid,dat);
assert(rid.page == p->id);

View file

@ -68,7 +68,7 @@ compensated_function recordid preAllocBlobFromPage(int xid, long page, long blob
*/
void allocBlob(int xid, recordid rid);
void deallocBlob(int xid, recordid rid);
void deallocBlob(int xid, blob_record_t* r);
page_impl blobImpl();

View file

@ -56,7 +56,22 @@ terms specified in this license.
#define LOG_NAME "check_linearHashNTA.log"
static const int NUM_ENTRIES = 100000;
/** @test
#define ARRAY_SIZE (2 * 3 * (int)(PAGE_SIZE * 1.5))
static void arraySet(int * a, int mul) {
int i;
for ( i = 0 ; i < ARRAY_SIZE; i++) {
a[i]= mul*i;
}
}
static int arryCmp(int * a, int * b, int len) {
return memcmp(a,b,len);
}
/**
@test
*/
START_TEST(linearHashNTAtest)
{
@ -307,6 +322,71 @@ START_TEST(linearHashNTAThreadedTestRandomized) {
Tdeinit();
} END_TEST
#endif // LONG_TEST
/**
@test Test linear hash nta when the values it stores are larger
than a single page.
*/
START_TEST(linearHashNTABlobTest) {
Tinit();
int arry1[ARRAY_SIZE];
int arry2[ARRAY_SIZE];
int arry3[ARRAY_SIZE];
int arry4[ARRAY_SIZE];
int *scratch;
int alen=ARRAY_SIZE*sizeof(int);
int one, two, three, four;
int len1,len2,len3; // len4;
arraySet(arry1,1); one = 1;
arraySet(arry2,1); two = 2;
arraySet(arry3,1); three = 3;
arraySet(arry4,1); four = 4;
int xid = Tbegin();
recordid rid = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
ThashInsert(xid,rid,(byte*)&one,sizeof(one),(byte*)arry1,alen);
len1 = ThashLookup(xid,rid,(byte*)&one,sizeof(one),(byte**)&scratch);
assert(len1==alen);
assert(!arryCmp(arry1,scratch,alen));
free(scratch);
Tcommit(xid);
xid = Tbegin();
ThashInsert(xid,rid,(byte*)&two, sizeof(two), (byte*)arry2,alen/2);
ThashInsert(xid,rid,(byte*)&three,sizeof(three),(byte*)arry3,alen/3);
len2 = ThashLookup(xid,rid,(byte*)&two, sizeof(two), (byte**)&scratch);
assert(len2 == alen/2);
assert(!arryCmp(scratch,arry2,alen/2));
free(scratch);
len3 = ThashLookup(xid,rid,(byte*)&three, sizeof(three), (byte**)&scratch);
assert(len3 == alen/3);
assert(!arryCmp(scratch,arry3,alen/3));
free(scratch);
Tabort(xid);
Tdeinit();
Tinit();
xid = Tbegin();
len1 = ThashLookup(xid,rid,(byte*)&one, sizeof(one), (byte**)&scratch);
assert(len1 == alen);
assert(!arryCmp(scratch,arry1,alen));
free(scratch);
len3 = ThashLookup(xid,rid,(byte*)&two, sizeof(two), (byte**)&scratch);
assert(len3 == -1);
Tcommit(xid);
Tdeinit();
Tinit();
Tdeinit();
} END_TEST
START_TEST(linearHashNTAIteratortest) {
Tinit();
int xid = Tbegin();
@ -417,6 +497,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, linearHashNTAIteratortest);
tcase_add_test(tc, linearHashNTAtest);
tcase_add_test(tc, linearHashNTAThreadedTest);
tcase_add_test(tc, linearHashNTABlobTest);
#ifdef LONG_TEST
tcase_add_test(tc, linearHashNTAThreadedTestRandomized);
tcase_add_test(tc, linearHashNTAThreadedTestRandomized);