Rewrite core dedupe logic to simplify code and improve performance.

Hashtable based chunk-level deduplication instead of Quicksort.
Fix a corner case bug in Dedupe decompression.
This commit is contained in:
Moinak Ghosh 2012-09-23 14:57:09 +05:30
parent 99a8e4cd98
commit 8386e72566
4 changed files with 197 additions and 278 deletions

8
main.c
View file

@ -949,11 +949,9 @@ repeat:
perror("Chunk Write: ");
do_cancel:
main_cancel = 1;
for (i = 0; i < w->nprocs; i++) {
tdat->cancel = 1;
sem_post(&tdat->start_sem);
sem_post(&tdat->write_done_sem);
}
tdat->cancel = 1;
sem_post(&tdat->start_sem);
sem_post(&tdat->write_done_sem);
return (0);
}
sem_post(&tdat->write_done_sem);

View file

@ -254,38 +254,6 @@ destroy_dedupe_context(dedupe_context_t *ctx)
}
}
/*
* Checksum Comparator for qsort
*/
static int
cmpblks(const void *a, const void *b)
{
rabin_blockentry_t *a1 = *((rabin_blockentry_t **)a);
rabin_blockentry_t *b1 = *((rabin_blockentry_t **)b);
if (a1->cksum_n_offset < b1->cksum_n_offset) {
return (-1);
} else if (a1->cksum_n_offset == b1->cksum_n_offset) {
int l1 = a1->length;
int l2 = b1->length;
/*
* If fingerprints match then compare lengths. Length match makes
* for strong exact detection/ordering during sort while stopping
* short of expensive memcmp() during sorting.
*
* Even though rabin_blockentry_t->length is unsigned we use signed
* int here to avoid branches. In practice a rabin block size at
* this point varies from 2K to 128K. The length is unsigned in
* order to support non-duplicate block merging and large blocks
* after this point.
*/
return (l1 - l2);
} else {
return (1);
}
}
/**
* Perform Deduplication.
* Both Semi-Rabin fingerprinting based and Fixed Block Deduplication are supported.
@ -295,12 +263,13 @@ cmpblks(const void *a, const void *b)
uint32_t
dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, ssize_t *rabin_pos)
{
ssize_t i, last_offset, j, fplist_sz;
ssize_t i, last_offset, j, ary_sz;
uint32_t blknum;
char *buf1 = (char *)buf;
uint32_t length;
uint64_t cur_roll_checksum, cur_pos_checksum;
uint32_t *fplist;
rabin_blockentry_t **htab;
heap_t heap;
length = offset;
@ -327,10 +296,9 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs
ctx->blocks[i]->offset = last_offset;
ctx->blocks[i]->index = i; // Need to store for sorting
ctx->blocks[i]->length = length;
ctx->blocks[i]->ref = 0;
ctx->blocks[i]->similar = 0;
ctx->blocks[i]->crc = XXH_strong32(buf1+last_offset, length, 0);
ctx->blocks[i]->cksum_n_offset = ctx->blocks[i]->crc;
ctx->blocks[i]->hash = XXH_strong32(buf1+last_offset, length, 0);
ctx->blocks[i]->similarity_hash = ctx->blocks[i]->hash;
last_offset += length;
}
goto process_blocks;
@ -341,10 +309,9 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs
* Initialize arrays for sketch computation. We re-use memory allocated
* for the compressed chunk temporarily.
*/
fplist_sz = 4 * ctx->rabin_poly_max_block_size;
fplist = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - fplist_sz);
memset(fplist, 0, fplist_sz);
reset_heap(&heap, fplist_sz/2);
ary_sz = 4 * ctx->rabin_poly_max_block_size;
fplist = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz);
memset(fplist, 0, ary_sz);
}
memset(ctx->current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE);
@ -436,7 +403,8 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs
ctx->blocks[blknum]->offset = last_offset;
ctx->blocks[blknum]->index = blknum; // Need to store for sorting
ctx->blocks[blknum]->length = length;
ctx->blocks[blknum]->ref = 0;
ctx->blocks[blknum]->other = 0;
ctx->blocks[blknum]->next = 0;
ctx->blocks[blknum]->similar = 0;
if (ctx->delta_flag) {
@ -448,9 +416,9 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs
*/
reset_heap(&heap, FORTY_PCNT(j));
ksmallest(fplist, j, &heap);
ctx->blocks[blknum]->cksum_n_offset =
ctx->blocks[blknum]->similarity_hash =
XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0);
memset(fplist, 0, fplist_sz);
memset(fplist, 0, ary_sz);
}
blknum++;
last_offset = i+1;
@ -459,263 +427,224 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs
}
}
/*
* Compute hash signature for each block. We do this in a separate loop to
* have a fast linear scan through the buffer.
*/
if (ctx->delta_flag) {
for (i=0; i<blknum; i++) {
ctx->blocks[i]->crc = XXH_strong32(buf1+ctx->blocks[i]->offset, ctx->blocks[i]->length, 0);
}
} else {
for (i=0; i<blknum; i++) {
ctx->blocks[i]->crc = XXH_strong32(buf1+ctx->blocks[i]->offset, ctx->blocks[i]->length, 0);
ctx->blocks[i]->cksum_n_offset = ctx->blocks[i]->crc;
// Insert the last left-over trailing bytes, if any, into a block.
if (last_offset < *size) {
if (ctx->blocks[blknum] == 0)
ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL,
sizeof (rabin_blockentry_t));
ctx->blocks[blknum]->offset = last_offset;
ctx->blocks[blknum]->index = blknum;
ctx->blocks[blknum]->length = *size - last_offset;
ctx->blocks[blknum]->other = 0;
ctx->blocks[blknum]->next = 0;
ctx->blocks[blknum]->similar = 0;
ctx->blocks[blknum]->hash = XXH_strong32(buf1+last_offset, ctx->blocks[blknum]->length, 0);
if (ctx->delta_flag) {
uint64_t cur_sketch;
j = (j > 0 ? j:1);
if (j > 1) {
reset_heap(&heap, FORTY_PCNT(j));
ksmallest(fplist, j, &heap);
cur_sketch =
XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0);
} else {
cur_sketch =
XXH_fast32((const uchar_t *)fplist, (j*4)/2, 0);
}
ctx->blocks[blknum]->similarity_hash = cur_sketch;
}
blknum++;
last_offset = *size;
}
process_blocks:
DEBUG_STAT_EN(printf("Original size: %lld, blknum: %u\n", *size, blknum));
// If we found at least a few chunks, perform dedup.
DEBUG_STAT_EN(printf("Original size: %lld, blknum: %u\n", *size, blknum));
if (blknum > 2) {
uint32_t blk, prev_index, prev_length;
ssize_t pos, matchlen, pos1;
int valid = 1;
char *tmp;
uint32_t *blkarr, *trans, *dedupe_index;
uint32_t *dedupe_index;
ssize_t dedupe_index_sz;
rabin_blockentry_t *prev;
DEBUG_STAT_EN(uint32_t delta_calls, delta_fails);
rabin_blockentry_t *be;
DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions);
DEBUG_STAT_EN(delta_calls = 0);
DEBUG_STAT_EN(delta_fails = 0);
DEBUG_STAT_EN(hash_collisions = 0);
// Insert the last left-over trailing bytes, if any, into a block.
if (last_offset < *size) {
uint64_t cur_sketch;
if (ctx->blocks[blknum] == 0)
ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL,
sizeof (rabin_blockentry_t));
ctx->blocks[blknum]->offset = last_offset;
ctx->blocks[blknum]->index = blknum;
ctx->blocks[blknum]->length = *size - last_offset;
ctx->blocks[blknum]->ref = 0;
ctx->blocks[blknum]->similar = 0;
ctx->blocks[blknum]->crc = XXH_strong32(buf1+last_offset, ctx->blocks[blknum]->length, 0);
if (ctx->delta_flag) {
j = (j > 0 ? j:1);
if (j > 1) {
reset_heap(&heap, FORTY_PCNT(j));
ksmallest(fplist, j, &heap);
cur_sketch =
XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0);
} else {
cur_sketch =
XXH_fast32((const uchar_t *)fplist, (j*4)/2, 0);
}
} else {
cur_sketch = ctx->blocks[blknum]->crc;
}
ctx->blocks[blknum]->cksum_n_offset = cur_sketch;
blknum++;
last_offset = *size;
}
dedupe_index_sz = (ssize_t)blknum * RABIN_ENTRY_SIZE;
ary_sz = blknum * sizeof (rabin_blockentry_t *);
htab = (rabin_blockentry_t **)(ctx->cbuf + ctx->real_chunksize - ary_sz);
memset(htab, 0, ary_sz);
/*
* Now sort the block array based on checksums. This will bring virtually
* all similar block entries together. Effectiveness depends on how strong
* our checksum is. We are using a maximal super-sketch value.
* Compute hash signature for each block. We do this in a separate loop to
* have a fast linear scan through the buffer.
*/
qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t *), cmpblks);
dedupe_index = (uint32_t *)(ctx->cbuf + RABIN_HDR_SIZE);
/*
* We need 2 temporary arrays. We just use available space in the last
* portion of the buffer that will hold the deduped segment.
*/
blkarr = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - (dedupe_index_sz * 2 + 1));
trans = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - (dedupe_index_sz + 1));
matchlen = 0;
/*
* Now make a pass through the sorted block array making identical blocks
* point to the first identical block entry. A simple Run Length Encoding
* sort of. Checksums, length and contents (memcmp()) must match for blocks
* to be considered identical.
* The block index in the chunk is initialized with pointers into the
* sorted block array.
* A reference count is maintained for blocks that are similar with other
* blocks. This helps in non-duplicate block merging later.
*/
blkarr[ctx->blocks[0]->index] = 0;
prev = ctx->blocks[0];
for (blk = 1; blk < blknum; blk++) {
blkarr[ctx->blocks[blk]->index] = blk;
if (ctx->blocks[blk]->crc == prev->crc &&
ctx->blocks[blk]->length == prev->length &&
memcmp(buf1 + prev->offset, buf1 + ctx->blocks[blk]->offset,
prev->length) == 0)
{
ctx->blocks[blk]->similar = SIMILAR_EXACT;
ctx->blocks[blk]->index = prev->index;
prev->ref = 1;
matchlen += prev->length;
continue;
}
prev = ctx->blocks[blk];
}
prev = NULL;
if (ctx->delta_flag) {
for (blk = 0; blk < blknum; blk++) {
if (ctx->blocks[blk]->similar) continue;
for (i=0; i<blknum; i++) {
ctx->blocks[i]->hash = XXH_strong32(buf1+ctx->blocks[i]->offset,
ctx->blocks[i]->length, 0);
}
} else {
for (i=0; i<blknum; i++) {
ctx->blocks[i]->hash = XXH_strong32(buf1+ctx->blocks[i]->offset,
ctx->blocks[i]->length, 0);
ctx->blocks[i]->similarity_hash = ctx->blocks[i]->hash;
}
}
/*
* Perform hash-matching of blocks and use a bucket-chained hashtable to match
* for duplicates and similar blocks. Unique blocks are inserted and duplicates
* and similar ones are marked in the block array.
*
* Hashtable memory is not allocated. We just use available space in the
* target buffer.
*/
matchlen = 0;
for (i=0; i<blknum; i++) {
uint64_t ck;
/*
* Add length to hash for fewer collisions. If Delta Compression is
* not enabled then value of similarity_hash == hash.
*/
ck = ctx->blocks[i]->similarity_hash;
ck += ctx->blocks[i]->length;
j = ck % blknum;
if (htab[j] == 0) {
htab[j] = ctx->blocks[i];
} else {
be = htab[j];
length = 0;
/*
* Compare blocks for similarity.
* Note: Block list by now is sorted by length as well.
* Look for exact duplicates. Same cksum, length and memcmp()\
*/
if (prev != NULL && ctx->blocks[blk]->ref == 0 &&
ctx->blocks[blk]->cksum_n_offset == prev->cksum_n_offset &&
ctx->blocks[blk]->length - prev->length < 512
) {
ctx->blocks[blk]->index = prev->index;
ctx->blocks[blk]->similar = SIMILAR_PARTIAL;
prev->ref = 1;
matchlen += (prev->length>>1);
continue;
while (1) {
if (be->hash == ctx->blocks[i]->hash &&
be->length == ctx->blocks[i]->length &&
memcmp(buf1 + be->offset, buf1 + ctx->blocks[i]->offset,
be->length) == 0) {
ctx->blocks[i]->similar = SIMILAR_EXACT;
ctx->blocks[i]->other = be;
be->similar = SIMILAR_REF;
matchlen += be->length;
length = 1;
break;
}
if (be->next)
be = be->next;
else
break;
}
if (!length && ctx->delta_flag) {
/*
* Look for similar blocks.
*/
be = htab[j];
while (1) {
if (be->similarity_hash == ctx->blocks[i]->similarity_hash &&
be->length == ctx->blocks[i]->length) {
ctx->blocks[i]->similar = SIMILAR_PARTIAL;
ctx->blocks[i]->other = be;
be->similar = SIMILAR_REF;
matchlen += (be->length>>1);
length = 1;
break;
}
if (be->next)
be = be->next;
else
break;
}
}
// This is an unique block so add it to hashtable.
if (!length) {
be->next = ctx->blocks[i];
DEBUG_STAT_EN(hash_collisions++);
}
prev = ctx->blocks[blk];
}
}
DEBUG_STAT_EN(printf("Hash collisions: %u\n", hash_collisions));
if (matchlen < dedupe_index_sz) {
ctx->valid = 0;
return;
}
/*
* Another pass, this time through the block index in the chunk. We insert
* block length into unique block entries. For block entries that are
* identical with another one we store the index number with msb set.
* This way we can differentiate between a unique block length entry and a
* pointer to another block without needing a separate flag.
*/
prev_index = 0;
prev_length = 0;
dedupe_index = (uint32_t *)(ctx->cbuf + RABIN_HDR_SIZE);
pos = 0;
for (blk = 0; blk < blknum; blk++) {
rabin_blockentry_t *be;
DEBUG_STAT_EN(merge_count = 0);
be = ctx->blocks[blkarr[blk]];
if (be->similar == 0) {
/*
* Update Index entry with the length. Also try to merge runs
* of unique (non-duplicate/similar) blocks into a single block
* entry as long as the total length does not exceed max block
* size.
*/
if (prev_index == 0) {
if (be->ref == 0) {
prev_index = pos;
prev_length = be->length;
}
dedupe_index[pos] = be->length;
ctx->blocks[pos]->cksum_n_offset = be->offset;
trans[blk] = pos;
pos++;
} else {
if (be->ref > 0) {
prev_index = 0;
prev_length = 0;
dedupe_index[pos] = be->length;
ctx->blocks[pos]->cksum_n_offset = be->offset;
trans[blk] = pos;
pos++;
} else {
if (prev_length + be->length <= RABIN_MAX_BLOCK_SIZE) {
prev_length += be->length;
dedupe_index[prev_index] = prev_length;
} else {
prev_index = 0;
prev_length = 0;
dedupe_index[pos] = be->length;
ctx->blocks[pos]->cksum_n_offset = be->offset;
trans[blk] = pos;
pos++;
}
}
/*
* Merge runs of unique blocks into a single block entry to reduce
* dedupe index size.
*/
for (i=0; i<blknum;) {
dedupe_index[pos] = i;
ctx->blocks[i]->index = pos;
pos++;
length = 0;
j = i;
if (ctx->blocks[i]->similar == 0) {
while (i< blknum && ctx->blocks[i]->similar == 0 &&
length < RABIN_MAX_BLOCK_SIZE) {
length += ctx->blocks[i]->length;
i++;
DEBUG_STAT_EN(merge_count++);
}
ctx->blocks[j]->length = length;
} else {
prev_index = 0;
prev_length = 0;
ctx->blocks[pos]->cksum_n_offset = be->offset;
ctx->blocks[pos]->alt_length = be->length;
trans[blk] = pos;
if (be->similar == SIMILAR_EXACT) {
dedupe_index[pos] = (blkarr[be->index] | RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG;
} else {
dedupe_index[pos] = blkarr[be->index] | RABIN_INDEX_FLAG |
SET_SIMILARITY_FLAG;
}
pos++;
i++;
}
}
DEBUG_STAT_EN(printf("Merge count: %u\n", merge_count));
/*
* Final pass, copy the data and perform delta encoding.
* Final pass update dedupe index and copy data.
*/
blknum = pos;
dedupe_index_sz = (ssize_t)pos * RABIN_ENTRY_SIZE;
dedupe_index_sz = (ssize_t)blknum * RABIN_ENTRY_SIZE;
pos1 = dedupe_index_sz + RABIN_HDR_SIZE;
for (blk = 0; blk < blknum; blk++) {
uchar_t *old, *new;
int32_t bsz;
/*
* If blocks are overflowing the allowed chunk size then dedup did not
* help at all. We invalidate the dedup operation.
*/
if (pos1 > last_offset) {
valid = 0;
break;
}
if (dedupe_index[blk] & RABIN_INDEX_FLAG) {
j = dedupe_index[blk] & RABIN_INDEX_VALUE;
i = ctx->blocks[j]->index;
if (dedupe_index[blk] & GET_SIMILARITY_FLAG) {
old = buf1 + ctx->blocks[j]->offset;
new = buf1 + ctx->blocks[blk]->cksum_n_offset;
matchlen = ctx->real_chunksize - *size;
matchlen = ctx->real_chunksize - *size;
for (i=0; i<blknum; i++) {
be = ctx->blocks[dedupe_index[i]];
if (be->similar == 0 || be->similar == SIMILAR_REF) {
/* Just copy. */
dedupe_index[i] = htonl(be->length);
memcpy(ctx->cbuf + pos1, buf1 + be->offset, be->length);
pos1 += be->length;
} else {
if (be->similar == SIMILAR_EXACT) {
dedupe_index[i] = htonl((be->other->index | RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG);
} else {
uchar_t *old, *new;
int32_t bsz;
/*
* Perform bsdiff.
*/
old = buf1 + be->other->offset;
new = buf1 + be->offset;
DEBUG_STAT_EN(delta_calls++);
bsz = bsdiff(old, ctx->blocks[j]->length, new,
ctx->blocks[blk]->alt_length, ctx->cbuf + pos1,
buf1 + *size, matchlen);
bsz = bsdiff(old, be->other->length, new, be->length,
ctx->cbuf + pos1, buf1 + *size, matchlen);
if (bsz == 0) {
DEBUG_STAT_EN(delta_fails++);
memcpy(ctx->cbuf + pos1, new, ctx->blocks[blk]->alt_length);
dedupe_index[blk] = htonl(ctx->blocks[blk]->alt_length);
pos1 += ctx->blocks[blk]->alt_length;
memcpy(ctx->cbuf + pos1, new, be->length);
dedupe_index[i] = htonl(be->length);
pos1 += be->length;
} else {
dedupe_index[blk] = htonl(trans[i] |
dedupe_index[i] = htonl(be->other->index |
RABIN_INDEX_FLAG | SET_SIMILARITY_FLAG);
pos1 += bsz;
}
} else {
dedupe_index[blk] = htonl(trans[i] | RABIN_INDEX_FLAG);
}
} else {
memcpy(ctx->cbuf + pos1, buf1 + ctx->blocks[blk]->cksum_n_offset,
dedupe_index[blk]);
pos1 += dedupe_index[blk];
dedupe_index[blk] = htonl(dedupe_index[blk]);
}
}
cont:
@ -793,9 +722,9 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size)
if (ctx->blocks[blk] == 0)
ctx->blocks[blk] = (rabin_blockentry_t *)slab_alloc(NULL, sizeof (rabin_blockentry_t));
len = ntohl(dedupe_index[blk]);
ctx->blocks[blk]->hash = 0;
if (len == 0) {
ctx->blocks[blk]->length = 0;
ctx->blocks[blk]->index = 0;
ctx->blocks[blk]->hash = 1;
} else if (!(len & RABIN_INDEX_FLAG)) {
ctx->blocks[blk]->length = len;
@ -820,7 +749,7 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size)
int rv;
bsize_t newsz;
if (ctx->blocks[blk]->length == 0 && ctx->blocks[blk]->index == 0) continue;
if (ctx->blocks[blk]->hash == 1) continue;
if (ctx->blocks[blk]->length > 0) {
len = ctx->blocks[blk]->length;
pos1 = ctx->blocks[blk]->offset;

View file

@ -75,9 +75,6 @@
// Minimum practical chunk size when doing dedup
#define RAB_MIN_CHUNK_SIZE (1048576L)
// Number of bytes to compute one maximal fingerprint value
#define SKETCH_BASIC_BLOCK_SZ (1024)
// An entry in the Rabin block array in the chunk.
// It is either a length value <= RABIN_MAX_BLOCK_SIZE or an index value with
// which this block is a duplicate/similar. The entries are variable sized.
@ -117,6 +114,7 @@
*/
#define SIMILAR_EXACT 1
#define SIMILAR_PARTIAL 2
#define SIMILAR_REF 3
/*
* Irreducible polynomial for Rabin modulus. This value is from the
@ -124,14 +122,15 @@
*/
#define FP_POLY 0xbfe6b8a5bf378d83ULL
typedef struct {
typedef struct rab_blockentry {
ssize_t offset;
uint64_t cksum_n_offset; // Dual purpose variable
uint64_t alt_length;
uint64_t crc;
unsigned int index;
unsigned int length;
unsigned char ref, similar;
uint32_t similarity_hash;
uint32_t hash;
uint32_t index;
uint32_t length;
unsigned char similar;
struct rab_blockentry *other;
struct rab_blockentry *next;
} rabin_blockentry_t;
typedef struct {

View file

@ -152,13 +152,6 @@ ksmallest(__TYPE *ary, __TYPE len, heap_t *heap)
__TYPE elem, los;
__TYPE i, *hp, n;
#ifdef ERROR_CHK
if (len >= heap->tot) {
fprintf(stderr, "nsmallest: array size > heap size\n");
return (-1);
}
#endif
n = heap->tot;
heap->ary = ary;
hp = ary;