diff --git a/main.c b/main.c index 2506201..e58cdd4 100644 --- a/main.c +++ b/main.c @@ -949,11 +949,9 @@ repeat: perror("Chunk Write: "); do_cancel: main_cancel = 1; - for (i = 0; i < w->nprocs; i++) { - tdat->cancel = 1; - sem_post(&tdat->start_sem); - sem_post(&tdat->write_done_sem); - } + tdat->cancel = 1; + sem_post(&tdat->start_sem); + sem_post(&tdat->write_done_sem); return (0); } sem_post(&tdat->write_done_sem); diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index acb8cb0..10ee6cc 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -254,38 +254,6 @@ destroy_dedupe_context(dedupe_context_t *ctx) } } -/* - * Checksum Comparator for qsort - */ -static int -cmpblks(const void *a, const void *b) -{ - rabin_blockentry_t *a1 = *((rabin_blockentry_t **)a); - rabin_blockentry_t *b1 = *((rabin_blockentry_t **)b); - - if (a1->cksum_n_offset < b1->cksum_n_offset) { - return (-1); - } else if (a1->cksum_n_offset == b1->cksum_n_offset) { - int l1 = a1->length; - int l2 = b1->length; - - /* - * If fingerprints match then compare lengths. Length match makes - * for strong exact detection/ordering during sort while stopping - * short of expensive memcmp() during sorting. - * - * Even though rabin_blockentry_t->length is unsigned we use signed - * int here to avoid branches. In practice a rabin block size at - * this point varies from 2K to 128K. The length is unsigned in - * order to support non-duplicate block merging and large blocks - * after this point. - */ - return (l1 - l2); - } else { - return (1); - } -} - /** * Perform Deduplication. * Both Semi-Rabin fingerprinting based and Fixed Block Deduplication are supported. @@ -295,12 +263,13 @@ cmpblks(const void *a, const void *b) uint32_t dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, ssize_t *rabin_pos) { - ssize_t i, last_offset, j, fplist_sz; + ssize_t i, last_offset, j, ary_sz; uint32_t blknum; char *buf1 = (char *)buf; uint32_t length; uint64_t cur_roll_checksum, cur_pos_checksum; uint32_t *fplist; + rabin_blockentry_t **htab; heap_t heap; length = offset; @@ -327,10 +296,9 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs ctx->blocks[i]->offset = last_offset; ctx->blocks[i]->index = i; // Need to store for sorting ctx->blocks[i]->length = length; - ctx->blocks[i]->ref = 0; ctx->blocks[i]->similar = 0; - ctx->blocks[i]->crc = XXH_strong32(buf1+last_offset, length, 0); - ctx->blocks[i]->cksum_n_offset = ctx->blocks[i]->crc; + ctx->blocks[i]->hash = XXH_strong32(buf1+last_offset, length, 0); + ctx->blocks[i]->similarity_hash = ctx->blocks[i]->hash; last_offset += length; } goto process_blocks; @@ -341,10 +309,9 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs * Initialize arrays for sketch computation. We re-use memory allocated * for the compressed chunk temporarily. */ - fplist_sz = 4 * ctx->rabin_poly_max_block_size; - fplist = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - fplist_sz); - memset(fplist, 0, fplist_sz); - reset_heap(&heap, fplist_sz/2); + ary_sz = 4 * ctx->rabin_poly_max_block_size; + fplist = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz); + memset(fplist, 0, ary_sz); } memset(ctx->current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE); @@ -436,7 +403,8 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs ctx->blocks[blknum]->offset = last_offset; ctx->blocks[blknum]->index = blknum; // Need to store for sorting ctx->blocks[blknum]->length = length; - ctx->blocks[blknum]->ref = 0; + ctx->blocks[blknum]->other = 0; + ctx->blocks[blknum]->next = 0; ctx->blocks[blknum]->similar = 0; if (ctx->delta_flag) { @@ -448,9 +416,9 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs */ reset_heap(&heap, FORTY_PCNT(j)); ksmallest(fplist, j, &heap); - ctx->blocks[blknum]->cksum_n_offset = + ctx->blocks[blknum]->similarity_hash = XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0); - memset(fplist, 0, fplist_sz); + memset(fplist, 0, ary_sz); } blknum++; last_offset = i+1; @@ -459,263 +427,224 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offs } } - /* - * Compute hash signature for each block. We do this in a separate loop to - * have a fast linear scan through the buffer. - */ - if (ctx->delta_flag) { - for (i=0; iblocks[i]->crc = XXH_strong32(buf1+ctx->blocks[i]->offset, ctx->blocks[i]->length, 0); - } - } else { - for (i=0; iblocks[i]->crc = XXH_strong32(buf1+ctx->blocks[i]->offset, ctx->blocks[i]->length, 0); - ctx->blocks[i]->cksum_n_offset = ctx->blocks[i]->crc; + // Insert the last left-over trailing bytes, if any, into a block. + if (last_offset < *size) { + if (ctx->blocks[blknum] == 0) + ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, + sizeof (rabin_blockentry_t)); + ctx->blocks[blknum]->offset = last_offset; + ctx->blocks[blknum]->index = blknum; + ctx->blocks[blknum]->length = *size - last_offset; + ctx->blocks[blknum]->other = 0; + ctx->blocks[blknum]->next = 0; + ctx->blocks[blknum]->similar = 0; + ctx->blocks[blknum]->hash = XXH_strong32(buf1+last_offset, ctx->blocks[blknum]->length, 0); + + if (ctx->delta_flag) { + uint64_t cur_sketch; + j = (j > 0 ? j:1); + if (j > 1) { + reset_heap(&heap, FORTY_PCNT(j)); + ksmallest(fplist, j, &heap); + cur_sketch = + XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0); + } else { + cur_sketch = + XXH_fast32((const uchar_t *)fplist, (j*4)/2, 0); + } + ctx->blocks[blknum]->similarity_hash = cur_sketch; } + + blknum++; + last_offset = *size; } process_blocks: - DEBUG_STAT_EN(printf("Original size: %lld, blknum: %u\n", *size, blknum)); // If we found at least a few chunks, perform dedup. + DEBUG_STAT_EN(printf("Original size: %lld, blknum: %u\n", *size, blknum)); if (blknum > 2) { - uint32_t blk, prev_index, prev_length; ssize_t pos, matchlen, pos1; int valid = 1; - char *tmp; - uint32_t *blkarr, *trans, *dedupe_index; + uint32_t *dedupe_index; ssize_t dedupe_index_sz; - rabin_blockentry_t *prev; - DEBUG_STAT_EN(uint32_t delta_calls, delta_fails); + rabin_blockentry_t *be; + DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions); DEBUG_STAT_EN(delta_calls = 0); DEBUG_STAT_EN(delta_fails = 0); + DEBUG_STAT_EN(hash_collisions = 0); - // Insert the last left-over trailing bytes, if any, into a block. - if (last_offset < *size) { - uint64_t cur_sketch; - - if (ctx->blocks[blknum] == 0) - ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, - sizeof (rabin_blockentry_t)); - ctx->blocks[blknum]->offset = last_offset; - ctx->blocks[blknum]->index = blknum; - ctx->blocks[blknum]->length = *size - last_offset; - ctx->blocks[blknum]->ref = 0; - ctx->blocks[blknum]->similar = 0; - ctx->blocks[blknum]->crc = XXH_strong32(buf1+last_offset, ctx->blocks[blknum]->length, 0); - - if (ctx->delta_flag) { - j = (j > 0 ? j:1); - if (j > 1) { - reset_heap(&heap, FORTY_PCNT(j)); - ksmallest(fplist, j, &heap); - cur_sketch = - XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0); - } else { - cur_sketch = - XXH_fast32((const uchar_t *)fplist, (j*4)/2, 0); - } - } else { - cur_sketch = ctx->blocks[blknum]->crc; - } - - ctx->blocks[blknum]->cksum_n_offset = cur_sketch; - blknum++; - last_offset = *size; - } - - dedupe_index_sz = (ssize_t)blknum * RABIN_ENTRY_SIZE; + ary_sz = blknum * sizeof (rabin_blockentry_t *); + htab = (rabin_blockentry_t **)(ctx->cbuf + ctx->real_chunksize - ary_sz); + memset(htab, 0, ary_sz); /* - * Now sort the block array based on checksums. This will bring virtually - * all similar block entries together. Effectiveness depends on how strong - * our checksum is. We are using a maximal super-sketch value. + * Compute hash signature for each block. We do this in a separate loop to + * have a fast linear scan through the buffer. */ - qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t *), cmpblks); - dedupe_index = (uint32_t *)(ctx->cbuf + RABIN_HDR_SIZE); - - /* - * We need 2 temporary arrays. We just use available space in the last - * portion of the buffer that will hold the deduped segment. - */ - blkarr = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - (dedupe_index_sz * 2 + 1)); - trans = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - (dedupe_index_sz + 1)); - matchlen = 0; - - /* - * Now make a pass through the sorted block array making identical blocks - * point to the first identical block entry. A simple Run Length Encoding - * sort of. Checksums, length and contents (memcmp()) must match for blocks - * to be considered identical. - * The block index in the chunk is initialized with pointers into the - * sorted block array. - * A reference count is maintained for blocks that are similar with other - * blocks. This helps in non-duplicate block merging later. - */ - blkarr[ctx->blocks[0]->index] = 0; - prev = ctx->blocks[0]; - for (blk = 1; blk < blknum; blk++) { - blkarr[ctx->blocks[blk]->index] = blk; - - if (ctx->blocks[blk]->crc == prev->crc && - ctx->blocks[blk]->length == prev->length && - memcmp(buf1 + prev->offset, buf1 + ctx->blocks[blk]->offset, - prev->length) == 0) - { - ctx->blocks[blk]->similar = SIMILAR_EXACT; - ctx->blocks[blk]->index = prev->index; - prev->ref = 1; - matchlen += prev->length; - continue; - } - prev = ctx->blocks[blk]; - } - - prev = NULL; if (ctx->delta_flag) { - for (blk = 0; blk < blknum; blk++) { - if (ctx->blocks[blk]->similar) continue; + for (i=0; iblocks[i]->hash = XXH_strong32(buf1+ctx->blocks[i]->offset, + ctx->blocks[i]->length, 0); + } + } else { + for (i=0; iblocks[i]->hash = XXH_strong32(buf1+ctx->blocks[i]->offset, + ctx->blocks[i]->length, 0); + ctx->blocks[i]->similarity_hash = ctx->blocks[i]->hash; + } + } + + /* + * Perform hash-matching of blocks and use a bucket-chained hashtable to match + * for duplicates and similar blocks. Unique blocks are inserted and duplicates + * and similar ones are marked in the block array. + * + * Hashtable memory is not allocated. We just use available space in the + * target buffer. + */ + matchlen = 0; + for (i=0; iblocks[i]->similarity_hash; + ck += ctx->blocks[i]->length; + j = ck % blknum; + + if (htab[j] == 0) { + htab[j] = ctx->blocks[i]; + } else { + be = htab[j]; + length = 0; /* - * Compare blocks for similarity. - * Note: Block list by now is sorted by length as well. + * Look for exact duplicates. Same cksum, length and memcmp()\ */ - if (prev != NULL && ctx->blocks[blk]->ref == 0 && - ctx->blocks[blk]->cksum_n_offset == prev->cksum_n_offset && - ctx->blocks[blk]->length - prev->length < 512 - ) { - ctx->blocks[blk]->index = prev->index; - ctx->blocks[blk]->similar = SIMILAR_PARTIAL; - prev->ref = 1; - matchlen += (prev->length>>1); - continue; + while (1) { + if (be->hash == ctx->blocks[i]->hash && + be->length == ctx->blocks[i]->length && + memcmp(buf1 + be->offset, buf1 + ctx->blocks[i]->offset, + be->length) == 0) { + ctx->blocks[i]->similar = SIMILAR_EXACT; + ctx->blocks[i]->other = be; + be->similar = SIMILAR_REF; + matchlen += be->length; + length = 1; + break; + } + if (be->next) + be = be->next; + else + break; + } + + if (!length && ctx->delta_flag) { + /* + * Look for similar blocks. + */ + be = htab[j]; + while (1) { + if (be->similarity_hash == ctx->blocks[i]->similarity_hash && + be->length == ctx->blocks[i]->length) { + ctx->blocks[i]->similar = SIMILAR_PARTIAL; + ctx->blocks[i]->other = be; + be->similar = SIMILAR_REF; + matchlen += (be->length>>1); + length = 1; + break; + } + if (be->next) + be = be->next; + else + break; + } + } + // This is an unique block so add it to hashtable. + if (!length) { + be->next = ctx->blocks[i]; + DEBUG_STAT_EN(hash_collisions++); } - prev = ctx->blocks[blk]; } } + DEBUG_STAT_EN(printf("Hash collisions: %u\n", hash_collisions)); + if (matchlen < dedupe_index_sz) { ctx->valid = 0; return; } - /* - * Another pass, this time through the block index in the chunk. We insert - * block length into unique block entries. For block entries that are - * identical with another one we store the index number with msb set. - * This way we can differentiate between a unique block length entry and a - * pointer to another block without needing a separate flag. - */ - prev_index = 0; - prev_length = 0; + dedupe_index = (uint32_t *)(ctx->cbuf + RABIN_HDR_SIZE); pos = 0; - for (blk = 0; blk < blknum; blk++) { - rabin_blockentry_t *be; + DEBUG_STAT_EN(merge_count = 0); - be = ctx->blocks[blkarr[blk]]; - if (be->similar == 0) { - /* - * Update Index entry with the length. Also try to merge runs - * of unique (non-duplicate/similar) blocks into a single block - * entry as long as the total length does not exceed max block - * size. - */ - if (prev_index == 0) { - if (be->ref == 0) { - prev_index = pos; - prev_length = be->length; - } - dedupe_index[pos] = be->length; - ctx->blocks[pos]->cksum_n_offset = be->offset; - trans[blk] = pos; - pos++; - } else { - if (be->ref > 0) { - prev_index = 0; - prev_length = 0; - dedupe_index[pos] = be->length; - ctx->blocks[pos]->cksum_n_offset = be->offset; - trans[blk] = pos; - pos++; - } else { - if (prev_length + be->length <= RABIN_MAX_BLOCK_SIZE) { - prev_length += be->length; - dedupe_index[prev_index] = prev_length; - } else { - prev_index = 0; - prev_length = 0; - dedupe_index[pos] = be->length; - ctx->blocks[pos]->cksum_n_offset = be->offset; - trans[blk] = pos; - pos++; - } - } + /* + * Merge runs of unique blocks into a single block entry to reduce + * dedupe index size. + */ + for (i=0; iblocks[i]->index = pos; + pos++; + length = 0; + j = i; + if (ctx->blocks[i]->similar == 0) { + while (i< blknum && ctx->blocks[i]->similar == 0 && + length < RABIN_MAX_BLOCK_SIZE) { + length += ctx->blocks[i]->length; + i++; + DEBUG_STAT_EN(merge_count++); } + ctx->blocks[j]->length = length; } else { - prev_index = 0; - prev_length = 0; - ctx->blocks[pos]->cksum_n_offset = be->offset; - ctx->blocks[pos]->alt_length = be->length; - trans[blk] = pos; - - if (be->similar == SIMILAR_EXACT) { - dedupe_index[pos] = (blkarr[be->index] | RABIN_INDEX_FLAG) & - CLEAR_SIMILARITY_FLAG; - } else { - dedupe_index[pos] = blkarr[be->index] | RABIN_INDEX_FLAG | - SET_SIMILARITY_FLAG; - } - pos++; + i++; } } + DEBUG_STAT_EN(printf("Merge count: %u\n", merge_count)); /* - * Final pass, copy the data and perform delta encoding. + * Final pass update dedupe index and copy data. */ blknum = pos; - dedupe_index_sz = (ssize_t)pos * RABIN_ENTRY_SIZE; + dedupe_index_sz = (ssize_t)blknum * RABIN_ENTRY_SIZE; pos1 = dedupe_index_sz + RABIN_HDR_SIZE; - for (blk = 0; blk < blknum; blk++) { - uchar_t *old, *new; - int32_t bsz; - - /* - * If blocks are overflowing the allowed chunk size then dedup did not - * help at all. We invalidate the dedup operation. - */ - if (pos1 > last_offset) { - valid = 0; - break; - } - if (dedupe_index[blk] & RABIN_INDEX_FLAG) { - j = dedupe_index[blk] & RABIN_INDEX_VALUE; - i = ctx->blocks[j]->index; - - if (dedupe_index[blk] & GET_SIMILARITY_FLAG) { - old = buf1 + ctx->blocks[j]->offset; - new = buf1 + ctx->blocks[blk]->cksum_n_offset; - matchlen = ctx->real_chunksize - *size; + matchlen = ctx->real_chunksize - *size; + for (i=0; iblocks[dedupe_index[i]]; + if (be->similar == 0 || be->similar == SIMILAR_REF) { + /* Just copy. */ + dedupe_index[i] = htonl(be->length); + memcpy(ctx->cbuf + pos1, buf1 + be->offset, be->length); + pos1 += be->length; + } else { + if (be->similar == SIMILAR_EXACT) { + dedupe_index[i] = htonl((be->other->index | RABIN_INDEX_FLAG) & + CLEAR_SIMILARITY_FLAG); + } else { + uchar_t *old, *new; + int32_t bsz; + /* + * Perform bsdiff. + */ + old = buf1 + be->other->offset; + new = buf1 + be->offset; DEBUG_STAT_EN(delta_calls++); - bsz = bsdiff(old, ctx->blocks[j]->length, new, - ctx->blocks[blk]->alt_length, ctx->cbuf + pos1, - buf1 + *size, matchlen); + bsz = bsdiff(old, be->other->length, new, be->length, + ctx->cbuf + pos1, buf1 + *size, matchlen); if (bsz == 0) { DEBUG_STAT_EN(delta_fails++); - memcpy(ctx->cbuf + pos1, new, ctx->blocks[blk]->alt_length); - dedupe_index[blk] = htonl(ctx->blocks[blk]->alt_length); - pos1 += ctx->blocks[blk]->alt_length; + memcpy(ctx->cbuf + pos1, new, be->length); + dedupe_index[i] = htonl(be->length); + pos1 += be->length; } else { - dedupe_index[blk] = htonl(trans[i] | + dedupe_index[i] = htonl(be->other->index | RABIN_INDEX_FLAG | SET_SIMILARITY_FLAG); pos1 += bsz; } - } else { - dedupe_index[blk] = htonl(trans[i] | RABIN_INDEX_FLAG); } - } else { - memcpy(ctx->cbuf + pos1, buf1 + ctx->blocks[blk]->cksum_n_offset, - dedupe_index[blk]); - pos1 += dedupe_index[blk]; - dedupe_index[blk] = htonl(dedupe_index[blk]); } } cont: @@ -793,9 +722,9 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size) if (ctx->blocks[blk] == 0) ctx->blocks[blk] = (rabin_blockentry_t *)slab_alloc(NULL, sizeof (rabin_blockentry_t)); len = ntohl(dedupe_index[blk]); + ctx->blocks[blk]->hash = 0; if (len == 0) { - ctx->blocks[blk]->length = 0; - ctx->blocks[blk]->index = 0; + ctx->blocks[blk]->hash = 1; } else if (!(len & RABIN_INDEX_FLAG)) { ctx->blocks[blk]->length = len; @@ -820,7 +749,7 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, ssize_t *size) int rv; bsize_t newsz; - if (ctx->blocks[blk]->length == 0 && ctx->blocks[blk]->index == 0) continue; + if (ctx->blocks[blk]->hash == 1) continue; if (ctx->blocks[blk]->length > 0) { len = ctx->blocks[blk]->length; pos1 = ctx->blocks[blk]->offset; diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index df00e8e..94d9a5b 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -75,9 +75,6 @@ // Minimum practical chunk size when doing dedup #define RAB_MIN_CHUNK_SIZE (1048576L) -// Number of bytes to compute one maximal fingerprint value -#define SKETCH_BASIC_BLOCK_SZ (1024) - // An entry in the Rabin block array in the chunk. // It is either a length value <= RABIN_MAX_BLOCK_SIZE or an index value with // which this block is a duplicate/similar. The entries are variable sized. @@ -117,6 +114,7 @@ */ #define SIMILAR_EXACT 1 #define SIMILAR_PARTIAL 2 +#define SIMILAR_REF 3 /* * Irreducible polynomial for Rabin modulus. This value is from the @@ -124,14 +122,15 @@ */ #define FP_POLY 0xbfe6b8a5bf378d83ULL -typedef struct { +typedef struct rab_blockentry { ssize_t offset; - uint64_t cksum_n_offset; // Dual purpose variable - uint64_t alt_length; - uint64_t crc; - unsigned int index; - unsigned int length; - unsigned char ref, similar; + uint32_t similarity_hash; + uint32_t hash; + uint32_t index; + uint32_t length; + unsigned char similar; + struct rab_blockentry *other; + struct rab_blockentry *next; } rabin_blockentry_t; typedef struct { diff --git a/utils/heapq.c b/utils/heapq.c index ec9eb24..42dde7d 100644 --- a/utils/heapq.c +++ b/utils/heapq.c @@ -152,13 +152,6 @@ ksmallest(__TYPE *ary, __TYPE len, heap_t *heap) __TYPE elem, los; __TYPE i, *hp, n; -#ifdef ERROR_CHK - if (len >= heap->tot) { - fprintf(stderr, "nsmallest: array size > heap size\n"); - return (-1); - } -#endif - n = heap->tot; heap->ary = ary; hp = ary;