Re-implemented pageOrientedListNTA from scratch. It's now more flexible, but relies on features that are not provided by Talloc/Tdealloc, so performance isn't as good as it could be.

This commit is contained in:
Sears Russell 2005-01-29 01:17:37 +00:00
parent 4d04155b0e
commit 7cf5fdee6e
9 changed files with 251 additions and 266 deletions

View file

@ -43,8 +43,13 @@ terms specified in this license.
/**
@file
A linked list implementation designed to handle variable length entries, and
minimize the number of pages spanned by each list.
A linked list implementation designed to handle variable length
entries, and minimize the number of pages spanned by each list.
This linked list implementation is currently used to implement
buckets for the linear hash table. Unfortunately, due to
limitations in the allocation mechanisms, the full benefits of this
linked list implementation are not available to the linear hash
table.
The data is stored using the slotted page implementation.
@ -59,12 +64,23 @@ terms specified in this license.
#define __pageOrientedListNTA_H
typedef struct {
long page;
//typedef struct {
// long page;
/* The slot of the next record to be returned. */
int slot;
// int slot;
//} lladd_pagedList_iterator;
typedef struct {
recordid headerRid;
recordid entryRid;
} lladd_pagedList_iterator;
typedef struct {
short thisPage;
recordid nextPage;
} pagedListHeader;
//recordid dereferencePagedListRID(int xid, recordid rid);
/** @return 1 if the key was already in the list. */
int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);

View file

@ -143,7 +143,11 @@ recordid TallocFromPage(int xid, long page, long size) {
} else {
pthread_mutex_lock(&talloc_mutex);
rid = slottedPreRallocFromPage(xid, page, size, &p);
assert(p != NULL);
if(p == NULL) {
assert(rid.size == -1);
pthread_mutex_unlock(&talloc_mutex);
return rid;
}
}
Tupdate(xid,rid, NULL, OPERATION_ALLOC);

View file

@ -1,10 +1,13 @@
#define __USE_GNU
#define _GNU_SOURCE
#include <pthread.h>
#include <lladd/transactional.h>
#include <lladd/hash.h>
#include "../page.h"
#include "../page/slotted.h"
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#define __USE_GNU
#include <pthread.h>
static pthread_mutex_t linear_hash_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
@ -38,7 +41,7 @@ recordid ThashCreate(int xid, int keySize, int valueSize) {
recordid hashHeader = Talloc(xid, sizeof(lladd_hash_header));
lladd_hash_header lhh;
if(keySize == VARIABLE_LENGTH || valueSize == VARIABLE_LENGTH) {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(long));
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(pagedListHeader));
} else {
lhh.buckets = TarrayListAlloc(xid, HASH_INIT_ARRAY_LIST_COUNT, HASH_INIT_ARRAY_LIST_MULT, sizeof(lladd_linkedList_entry) + keySize + valueSize);
}
@ -49,7 +52,7 @@ recordid ThashCreate(int xid, int keySize, int valueSize) {
for(i = 0; i < HASH_INIT_ARRAY_LIST_COUNT; i++) {
recordid rid = TpagedListAlloc(xid);
bucket.slot = i;
Tset(xid, bucket, &(rid.page));
Tset(xid, bucket, &rid);
}
} else {
byte * entry = calloc(1, lhh.buckets.size);
@ -152,9 +155,9 @@ static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keyS
lhh.numEntries ++;
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
/* if(lhh.numEntries > (int)((double)(lhh.nextSplit + twoToThe(lhh.bits-1)) * (HASH_FILL_FACTOR * 40))) {
if(lhh.numEntries > (int)((double)(lhh.nextSplit + twoToThe(lhh.bits-1)) * (HASH_FILL_FACTOR))) {
ThashSplitBucket(xid, hashHeader, &lhh);
} */
}
} else {
if(lhh.numEntries > (int)((double)(lhh.nextSplit + twoToThe(lhh.bits-1)) * HASH_FILL_FACTOR)) {
ThashSplitBucket(xid, hashHeader, &lhh);
@ -168,15 +171,15 @@ static int __ThashInsert(int xid, recordid hashHeader, const byte* key, int keyS
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &(bucketList.page));
bucketList.slot = 0;
bucketList.size = 0;
Tread(xid, bucket, &bucketList);
// int before = TpagedListSpansPages(xid, bucketList);
ret = TpagedListInsert(xid, bucketList, key, keySize, value, valueSize);
int after = TpagedListSpansPages(xid, bucketList);
if(after) { // Page overflowed...
ThashSplitBucket(xid, hashHeader, &lhh);
}
// int after = TpagedListSpansPages(xid, bucketList);
// if(before != after) { // Page overflowed...
// ThashSplitBucket(xid, hashHeader, &lhh);
// ThashSplitBucket(xid, hashHeader, &lhh);
// }
} else {
assert(lhh.keySize == keySize); assert(lhh.valueSize == valueSize);
@ -229,9 +232,7 @@ static int __ThashRemove(int xid, recordid hashHeader, const byte * key, int key
int ret;
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &(bucketList.page));
bucketList.slot = 0;
bucketList.size = 0;
Tread(xid, bucket, &bucketList);
ret = TpagedListRemove(xid, bucketList, key, keySize);
} else {
assert(lhh.keySize == keySize);
@ -252,9 +253,7 @@ int ThashLookup(int xid, recordid hashHeader, const byte * key, int keySize, byt
int ret;
if(lhh.keySize == VARIABLE_LENGTH || lhh.valueSize == VARIABLE_LENGTH) {
recordid bucketList;
Tread(xid, bucket, &(bucketList.page));
bucketList.slot = 0;
bucketList.size = 0;
Tread(xid, bucket, &bucketList);
ret = TpagedListFind(xid, bucketList, key, keySize, value);
} else {
assert(lhh.keySize == keySize);
@ -276,7 +275,7 @@ static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * l
recordid new_bucket_list; // will be uninitialized if we have fixed length entries.
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
new_bucket_list = TpagedListAlloc(xid);
Tset(xid, new_bucket_rid, &(new_bucket_list.page));
Tset(xid, new_bucket_rid, &new_bucket_list);
} else {
byte * entry = calloc(1, lhh->buckets.size);
Tset(xid, new_bucket_rid, entry);
@ -292,9 +291,8 @@ static void ThashSplitBucket(int xid, recordid hashHeader, lladd_hash_header * l
if(lhh->keySize == VARIABLE_LENGTH || lhh->valueSize == VARIABLE_LENGTH) {
recordid old_bucket_list;
// recordid new_bucket_list;
Tread(xid, old_bucket_rid, &(old_bucket_list.page));
old_bucket_list.slot = 0;
old_bucket_list.size = 0;
Tread(xid, old_bucket_rid, &old_bucket_list);
// Tread(xid, new_bucket_rid, &(new_bucket_list.page)); // @todo could remember value from above.
lladd_pagedList_iterator * pit = TpagedListIterator(xid, old_bucket_list);
byte *key, *value;
@ -363,9 +361,7 @@ int ThashNext(int xid, lladd_hash_iterator * it, byte ** key, int * keySize, byt
it->bucket.slot++;
if(it->bucket.slot < it->numBuckets) {
recordid bucketList;
Tread(xid, it->bucket, &(bucketList.page));
bucketList.slot =0;
bucketList.size =0;
Tread(xid, it->bucket, &bucketList);
it->pit = TpagedListIterator(xid, bucketList);
} else {
free(it);

View file

@ -1,216 +1,173 @@
#include <lladd/transactional.h>
#include "../blobManager.h"
#include "../page.h"
#include "../page/slotted.h"
#include <malloc.h>
#include <assert.h>
#include <string.h>
/**
Low level function that looks at the page structure, and finds the 'real' recordid
of a page oriented list rid */
/*recordid dereferencePagedListRID(int xid, recordid rid) {
Page * p = loadPage(rid.page);
while((*numslots_ptr(p)-POLL_NUM_RESERVED) <= rid.slot) {
int oldSlot = rid.slot;
oldSlot -= (*numslots_ptr(p) - POLL_NUM_RESERVED);
rid.slot = POLL_NEXT;
readRecord(xid, p , rid, &rid);
rid.slot = oldSlot;
releasePage(p);
p = loadPage(rid.page);
}
releasePage(p);
rid.slot+=POLL_NUM_RESERVED;
return rid;
}*/
typedef struct {
short nextEntry;
short keySize;
} pagedListEntry;
recordid TpagedListAlloc(int xid) {
long page = TpageAlloc(xid);
recordid list = TallocFromPage(xid, page, sizeof(long));
long zero = 0;
Tset(xid, list, &zero);
assert(list.slot == 0);
assert(list.size == sizeof(long));
return list;
recordid NULLRID;
NULLRID.page = 0;
NULLRID.slot = 0;
NULLRID.size = -1;
recordid ret = Talloc(xid, sizeof(pagedListHeader));
pagedListHeader header;
header.thisPage = 0;
header.nextPage = NULLRID;
Tset(xid, ret, &header);
return ret;
}
int TpagedListSpansPages(int xid, recordid list) {
// TpagedListCompact(int xid, recordid list);
list.slot = 0;
list.size = sizeof(long);
long nextPage;
Tread(xid, list, &nextPage);
return nextPage != 0;
}
/** Should have write lock on page for this whole function. */
int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
int ret = 0;
// if find in list, return 1
byte * val;
if(-1 != TpagedListFind(xid, list, key, keySize, &val)) {
free(val);
ret = 1;
int removed = TpagedListRemove(xid, list, key, keySize);
assert(removed);
pagedListHeader header;
Tread(xid, list, &header);
recordid headerRid = list;
byte * garbage;
int ret = (TpagedListFind(xid, list, key, keySize, &garbage) != -1);
if(ret) {
TpagedListRemove(xid, list, key, keySize);
free(garbage);
}
Page * p = loadPage(list.page);
int recordSize = (sizeof(short)+keySize+valueSize);
int isBlob = recordSize >= BLOB_THRESHOLD_SIZE;
int realSize = recordSize;
if(isBlob) {
recordSize = sizeof(blob_record_t);
int entrySize = sizeof(pagedListEntry) + keySize + valueSize;
recordid rid = TallocFromPage(xid, headerRid.page, entrySize);
DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size);
// When the loop completes, header will contain the contents of the page header the entry will be inserted into,
// headerrid will contain the rid of that header, and rid will contain the newly allocated recordid
while(rid.size == -1) {
if(header.nextPage.size == -1) {
header.nextPage = Talloc(xid, sizeof(pagedListHeader));
DEBUG("allocing on new page %d\n", header.nextPage.page);
Tset(xid, headerRid, &header);
pagedListHeader newHead;
newHead.thisPage = 0;
newHead.nextPage.page =0;
newHead.nextPage.slot =0;
newHead.nextPage.size =-1;
Tset(xid, header.nextPage, &newHead);
}
while(slottedFreespace(p) < recordSize) {
// load next page, update some rid somewhere
list.slot = 0;
list.size = sizeof(long);
long nextPage;
readRecord(xid, p, list, &nextPage);
// printf("+ page = %d nextpage=%ld freespace: %d recordsize: %d\n", list.page, nextPage, slottedFreespace(p), recordSize); fflush(stdout);
if(nextPage == 0) {
releasePage(p);
nextPage = TpageAlloc(xid);
Tset(xid, list, &nextPage);
p = loadPage(nextPage);
//slottedPageInitialize(p);
// ** @todo shouldn't a log entry be generated here?? */
list.page = nextPage;
assert(slottedFreespace(p) >= recordSize);
long zero = 0;
recordid rid = TallocFromPage(xid, list.page, sizeof(long));
Tset(xid, rid, &zero);
} else {
releasePage(p);
list.page = nextPage;
p = loadPage(nextPage);
}
headerRid = header.nextPage;
Tread(xid, header.nextPage, &header);
rid = TallocFromPage(xid, headerRid.page, entrySize);
DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size);
}
if(isBlob) {
recordSize = realSize;
}
pagedListEntry * dat = malloc(entrySize);
releasePage(p);
// printf("recordsize = %d\n", recordSize);
recordid rid = TallocFromPage(xid, list.page, recordSize); // Allocates a record at a location given by the caller
short* record = malloc(recordSize);
*record = keySize;
memcpy((record+1), key, keySize);
memcpy(((char*)(record+1))+keySize, value, valueSize);
Tset(xid, rid, record);
dat->keySize = keySize;
dat->nextEntry = header.thisPage;
memcpy(dat+1, key, keySize);
memcpy(((byte*)(dat+1))+keySize, value, valueSize);
Tset(xid, rid, dat);
header.thisPage = rid.slot;
DEBUG("Header.thisPage = %d\n", rid.slot);
Tset(xid, headerRid, &header);
free(dat);
return ret;
}
int TpagedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) {
pagedListHeader header;
Tread(xid, list, &header);
long nextPage = 1;
recordid rid;
rid.page = list.page;
rid.slot = header.thisPage;
while (nextPage) {
int i;
while(rid.slot || header.nextPage.size != -1) {
rid.size = TrecordSize(xid, rid);
Page * p = loadPage(list.page);
pagedListEntry * dat = malloc(rid.size);
Tread(xid, rid, dat);
// int pageCount = TrecordsInPage(xid, list.page);
int pageCount = *numslots_ptr(p);
// printf("%ld\n", nextPage);
//fflush(stdout);
for(i = 1; i < pageCount; i++) {
recordid entry = list;
entry.slot = i;
// int length = TrecordSize(xid, entry);
int length = getRecordSize(xid,p,entry);
if(length != -1) { // then entry is defined.
short * dat = malloc(length);
entry.size = length;
// Tread(xid, entry, dat);
slottedRead(xid, p, entry, (byte*)dat);
if(*dat == keySize && !memcmp(dat+1, key, keySize)) {
int valueSize = length-keySize-sizeof(short);
if(!memcmp(dat+1, key, keySize)) {
int valueSize = rid.size - keySize - sizeof(pagedListEntry);
*value = malloc(valueSize);
memcpy(*value, ((byte*)(dat+1))+keySize, valueSize);
free(dat);
releasePage(p);
return valueSize;
}
if(dat->nextEntry) { // another entry on this page
rid.slot = dat->nextEntry;
} else if (header.nextPage.size != -1) { // another page
rid.page = header.nextPage.page;
Tread(xid, header.nextPage, &header);
rid.slot = header.thisPage;
} else { // we've reached the end of the last page
rid.slot = 0;
}
free(dat);
}
}
// recordid rid = list;
list.slot = 0;
list.size = sizeof(long);
// Tread(xid, list, &nextPage);
slottedRead(xid, p, list, (byte*)&nextPage);
list.page = nextPage;
releasePage(p);
}
return -1;
}
int TpagedListRemove(int xid, recordid list, const byte * key, int keySize) {
long nextPage = 1;
pagedListHeader header;
Tread(xid, list, &header);
recordid headerRid;
recordid rid;
rid.page = list.page;
rid.slot = header.thisPage;
short lastSlot = -1;
headerRid = list;
while(rid.slot || header.nextPage.size != -1) {
rid.size = TrecordSize(xid, rid);
pagedListEntry * dat = malloc(rid.size);
Tread(xid, rid, dat);
while (nextPage) {
int i;
if(!memcmp(dat+1, key, keySize)) {
Page * p = loadPage(list.page);
// int pageCount = TrecordsInPage(xid, list.page);
int pageCount = *numslots_ptr(p);
// printf("%ld\n", nextPage);
fflush(stdout);
for(i = 1; i < pageCount; i++) {
recordid entry = list;
entry.slot = i;
// int length = TrecordSize(xid, entry);
int length = getRecordSize(xid, p, entry);
if(length != -1) { // then entry is defined.
short * dat = malloc(length);
entry.size = length;
slottedRead(xid,p,entry,(byte*)dat);
// Tread(xid, entry, dat);
if(*dat == keySize && !memcmp(dat+1, key, keySize)) {
releasePage(p);
Tdealloc(xid, entry);
// assert(-1 == TrecordSize(xid, entry));
if(lastSlot != -1) {
recordid lastRid = rid;
lastRid.slot = lastSlot;
lastRid.size = TrecordSize(xid, lastRid);
pagedListEntry * lastRidBuf = malloc(lastRid.size);
Tread(xid, lastRid, lastRidBuf);
lastRidBuf->nextEntry = dat->nextEntry;
Tset(xid, lastRid, lastRidBuf);
free(lastRidBuf);
} else {
header.thisPage = dat->nextEntry;
Tset(xid, headerRid, &header);
}
Tdealloc(xid, rid);
free(dat);
return 1;
}
if(dat->nextEntry) { // another entry on this page
lastSlot = rid.slot;
rid.slot = dat->nextEntry;
} else if (header.nextPage.size != -1) { // another page
lastSlot = -1;
rid.page = header.nextPage.page;
headerRid = header.nextPage;
Tread(xid, header.nextPage, &header);
rid.slot = header.thisPage;
} else { // we've reached the end of the last page
rid.slot = 0;
}
free(dat);
}
}
list.slot = 0;
list.size = sizeof(long);
// Tread(xid, list, &nextPage);
slottedRead(xid,p,list, (byte*)&nextPage);
list.page = nextPage;
releasePage(p);
}
return 0;
}
int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize) {
int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte * key, int keySize) {
byte * value;
int valueSize = TpagedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
@ -226,56 +183,53 @@ int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte *
}
lladd_pagedList_iterator * TpagedListIterator(int xid, recordid list) {
lladd_pagedList_iterator * ret = malloc(sizeof(lladd_pagedList_iterator));
pagedListHeader header;
Tread(xid, list, &header);
lladd_pagedList_iterator * it = malloc(sizeof(lladd_pagedList_iterator));
ret->page = list.page;
ret->slot = 1;
it->headerRid = header.nextPage;
it->entryRid = list;
// printf("slot <- %d\n", header.thisPage);
it->entryRid.slot = header.thisPage;
return ret;
return it;
}
int TpagedListNext(int xid, lladd_pagedList_iterator * it,
byte ** key, int * keySize,
byte ** value, int * valueSize) {
// printf("next: page %d slot %d\n", it->page, it->slot);
recordid rid;
while(it->page) {
while(it->slot < TrecordsInPage(xid, it->page)) {
rid.page = it->page;
rid.slot = it->slot;
rid.size=TrecordSize(xid, rid);
if(rid.size != -1) {
// found entry!
while(it->entryRid.slot || it->headerRid.size != -1) {
if(it->entryRid.slot) {
it->entryRid.size = TrecordSize(xid, it->entryRid);
assert(it->entryRid.size != -1);
byte * dat = malloc(rid.size);
Tread(xid, rid, dat);
pagedListEntry * entry = malloc(it->entryRid.size);
// populate / alloc pointers passed in by caller.
Tread(xid, it->entryRid, entry);
*keySize = *(short*)dat;
*valueSize = rid.size - *keySize - sizeof(short);
*keySize = entry->keySize;
*valueSize = it->entryRid.size - *keySize - sizeof(pagedListEntry);
*key = malloc(*keySize);
*value = malloc(*valueSize);
memcpy(*key, ((short*)dat)+1, *keySize);
memcpy(*value, ((byte*)(((short*)dat)+1)) + *keySize, *valueSize);
memcpy(*key, entry+1, *keySize);
memcpy(*value, ((byte*)(entry+1))+*keySize, *valueSize);
free(dat);
it->slot++;
it->entryRid.slot = entry->nextEntry;
// printf("slotA <- %d\n", it->entryRid.slot);
free(entry);
return 1;
}
it->slot++;
}
rid.page = it->page;
rid.slot = 0;
rid.size = sizeof(long);
Tread(xid, rid, &(it->page));
it->slot = 1;
} else { // move to next page.
pagedListHeader header;
Tread(xid, it->headerRid, &header);
it->entryRid.page = it->headerRid.page;
it->headerRid = header.nextPage;
it->entryRid.slot = header.thisPage;
// printf("slotB <- %d\n", it->entryRid.slot);
}
}
free(it);
return 0;
}

View file

@ -18,7 +18,8 @@ increase the available free space.
The caller of this function must have a writelock on the page.
*/
static void slottedCompact(Page * page) {
void slottedCompact(Page * page) {
int i;
Page bufPage;
@ -192,7 +193,16 @@ recordid slottedPreRallocFromPage(int xid, long page, long size, Page **pp) {
*pp = loadPage(page);
assert(slottedFreespace(*pp) >= size);
if(slottedFreespace(*pp) < size) {
releasePage(*pp);
*pp = NULL;
recordid rid;
rid.page = 0;
rid.slot = 0;
rid.size = -1;
return rid;
}
if(*page_type_ptr(*pp) == UNINITIALIZED_PAGE) {
slottedPageInitialize(*pp);
}

View file

@ -206,3 +206,5 @@ int slottedGetType(Page * p, int slot);
*
*/
void slottedSetType(Page * p, int slot, int type);
/** The caller of this function must have a write lock on the page. */
void slottedCompact(Page * page);

View file

@ -6,6 +6,6 @@ else
TESTS =
endif
noinst_PROGRAMS = $(TESTS)
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a # -lefence
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a #-lefence
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log check_indirect.log check_bufferMananger.log check_lladdhash.log check_pageOperations.log check_linearhash.log check_linkedListNTA.log check_linearHashNTA.log check_pageOrientedListNTA.log
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -130,9 +130,12 @@ START_TEST(linearHashNTAVariableSizetest)
val.page = i * NUM_ENTRIES;
val.slot = val.page * NUM_ENTRIES;
val.size = val.slot * NUM_ENTRIES;
val2 = 0;
assert(-1 == ThashLookup(xid, hashHeader, (byte*)&i, sizeof(int), (byte**)&val2));
ThashInsert(xid, hashHeader, (byte*)&i, sizeof(int), (byte*)&val, sizeof(recordid));
assert(sizeof(recordid) == ThashLookup(xid, hashHeader, (byte*)&i, sizeof(int), (byte**)&val2));
val2 =0;
int ret = ThashLookup(xid, hashHeader, (byte*)&i, sizeof(int), (byte**)&val2);
assert(sizeof(recordid) == ret);
assert(val2->page == i * NUM_ENTRIES);
assert(val2->slot == val2->page * NUM_ENTRIES);
assert(val2->size == val2->slot * NUM_ENTRIES);

View file

@ -116,7 +116,7 @@ START_TEST(pagedListCheck) {
ret = TpagedListFind(xid, list, (byte*)&a, sizeof(int), (byte**)&bb);
assert(!ret);
assert(-1==ret);
assert(!bb);
}
}