From f3f472b860db34a57a38d818abef2ac041a7e58d Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Tue, 11 Sep 2012 20:26:36 +0530 Subject: [PATCH] Implement K-min-values Sketch for Similarity detection. --- Makefile.in | 4 +- rabin/rabin_dedup.c | 147 +++++++++++---------------------- utils/heapq.c | 193 ++++++++++++++++++++++++++++++++++++++++++++ utils/heapq.h | 14 ++++ 4 files changed, 257 insertions(+), 101 deletions(-) create mode 100644 utils/heapq.c create mode 100644 utils/heapq.h diff --git a/Makefile.in b/Makefile.in index b4fd42b..2de01c9 100644 --- a/Makefile.in +++ b/Makefile.in @@ -24,8 +24,8 @@ PROG= pcompress MAINSRCS = main.c utils/utils.c allocator.c zlib_compress.c bzip2_compress.c \ lzma_compress.c ppmd_compress.c adaptive_compress.c lzfx_compress.c \ - lz4_compress.c none_compress.c utils/xxhash.c -MAINHDRS = allocator.h pcompress.h utils/utils.h utils/xxhash.h + lz4_compress.c none_compress.c utils/xxhash.c utils/heapq.c +MAINHDRS = allocator.h pcompress.h utils/utils.h utils/xxhash.h utils/heapq.h MAINOBJS = $(MAINSRCS:.c=.o) RABINSRCS = rabin/rabin_dedup.c diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 4da4cbd..baef162 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -63,9 +63,12 @@ #include #include #include +#include #include "rabin_dedup.h" +#define FORTY_PCNT(x) (((x)/5 << 1)) + extern int lzma_init(void **data, int *level, ssize_t chunksize); extern int lzma_compress(void *src, size_t srclen, void *dst, size_t *destlen, int level, uchar_t chdr, void *data); @@ -87,19 +90,7 @@ rabin_min_blksz(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta_ { uint32_t min_blk; - min_blk = 1 << (rab_blk_sz + RAB_BLK_MIN_BITS); - if (rab_blk_sz > 1) - return (min_blk); - - if (((memcmp(algo, "lzma", 4) == 0 || memcmp(algo, "adapt", 5) == 0) && - chunksize <= LZMA_WINDOW_MAX) || delta_flag) { - if (memcmp(algo, "lzfx", 4) == 0 || memcmp(algo, "lz4", 3) == 0 || - memcmp(algo, "zlib", 4) == 0 || memcmp(algo, "none", 4) == 0) { - min_blk = 1 << (rab_blk_sz + RAB_BLK_MIN_BITS - 1); - } - } else { - min_blk = 1 << (rab_blk_sz + RAB_BLK_MIN_BITS - 1); - } + min_blk = 1 << (rab_blk_sz + RAB_BLK_MIN_BITS - 1); return (min_blk); } @@ -298,32 +289,25 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s char *buf1 = (char *)buf; uint32_t length; uint64_t cur_roll_checksum, cur_pos_checksum, cur_sketch; - uint64_t *fplist; - uint32_t len1, fpos[2], cur_sketch2; - uint32_t *charcounts, byts; + uint32_t *fplist; + heap_t heap; if (rabin_pos == NULL) { /* * Initialize arrays for sketch computation. We re-use memory allocated * for the compressed chunk temporarily. */ - fplist_sz = 8 * ctx->rabin_poly_avg_block_size; - fplist = (uint64_t *)(ctx->cbuf + ctx->real_chunksize - fplist_sz - 256 * 4); - charcounts = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - 256 * 4); + fplist_sz = 4 * ctx->rabin_poly_max_block_size; + fplist = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - fplist_sz); memset(fplist, 0, fplist_sz); - memset(charcounts, 0, 256 * 4); - fpos[0] = 0; - fpos[1] = 0; - len1 = 0; + reset_heap(&heap, fplist_sz/2); } length = offset; last_offset = 0; blknum = 0; ctx->valid = 0; cur_roll_checksum = 0; - j = 0; cur_sketch = 0; - cur_sketch2 = 0; /* * If rabin_pos is non-zero then we are being asked to scan for the last rabin boundary @@ -362,6 +346,8 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s } if (*size < ctx->rabin_poly_avg_block_size) return; + j = 0; + for (i=offset; i<*size; i++) { uint32_t *splits; uchar_t cur_byte = buf1[i]; @@ -379,68 +365,19 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s cur_pos_checksum = cur_roll_checksum ^ ir[pushed_out]; /* - * Compute a super sketch value of the block. We store a sum of relative - * maximal rabin hash values per 1K(SKETCH_BASIC_BLOCK_SZ) of data. So we - * get upto 128 sums for a max block size of 128K. The bottom blocksize bits - * of the hash are only used which are then biased with the occurrence count. - * This is a representative fingerprint sketch of the block. Storing and - * comparing upto 128 fingerprints per block is very expensive (compute & RAM) - * so we eventually sum all the fingerprints for the block to create a single - * super sketch value representing maximal features of the block. In addition - * the top 2 commonly occuring byte values are used to compute a second sketch - * to refine the earlier one. + * Retain a list of all fingerprints in the block. We then compute + * the K min values sketch from that list and generate a super sketch + * by hashing over the K min values sketch. We only store the least + * significant 32 bits of the fingerprint. This uses less memory, + * requires smaller memset() calls and generates a sufficiently large + * number of similarity matches without false positives - determined + * by experimentation. * - * This value can be used for similarity detection for delta encoding. Exact - * match for deduplication is additionally detected via a memcmp(). This is a - * variant of some approaches detailed in: - * http://www.armedia.com/wp/SimilarityIndex.pdf + * This is called minhashing and is used widely, for example in various + * search engines to detect similar documents. */ - len1++; - fpos[1] = cur_pos_checksum & ctx->rabin_avg_block_mask; - splits = (uint32_t *)(&fplist[fpos[1]]); -#if BYTE_ORDER == BIG_ENDIAN - splits[0]++; - splits[1] += cur_pos_checksum & ctx->fp_mask; -#else - splits[1]++; - splits[0] += cur_pos_checksum & ctx->fp_mask; -#endif - charcounts[cur_byte]++; - - /* - * Perform the following statement without branching: - * if (fplist[fpos[1]] > fplist[fpos[0]]) fpos[0] = fpos[1]; - */ - fpos[0] = fpos[(fplist[fpos[1]] > fplist[fpos[0]])]; - if (len1 == SKETCH_BASIC_BLOCK_SZ && ctx->delta_flag) { - uint32_t p1, p2, p3; - /* - * Compute the super sketch value by summing all the representative - * fingerprints of the block. - */ - cur_sketch += fplist[fpos[0]]; - memset(fplist, 0, fplist_sz); - fpos[0] = 0; - - /* - * Find out the top 2 occurring byte values and compute - * a secondary sketch from them. - */ - p1 = 0; - p2 = 0; - p3 = 0; - for (len1=0; len1<256; len1++) { - if (charcounts[len1] > p1) { - p3 = p2; - p2 = p1; - p1 = len1; - } - charcounts[len1] = 0; - } - cur_sketch2 += ((p1 << 16) | (p2 << 8) | p3); - len1 = 0; - j++; - } + fplist[j] = cur_pos_checksum & 0xFFFFFFFFUL; + j++; /* * Window pos has to rotate from 0 .. RAB_POLYNOMIAL_WIN_SIZE-1 @@ -463,23 +400,26 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s ctx->blocks[blknum]->similar = 0; ctx->blocks[blknum]->crc = XXH_strong32(buf1+last_offset, length, 0); - // Accumulate the 2 sketch values into a combined similarity checksum if (ctx->delta_flag) { - ctx->blocks[blknum]->cksum_n_offset = (cur_sketch + cur_sketch2) / 2; - ctx->blocks[blknum]->mean_n_length = cur_sketch / j; + /* + * Reset the heap structure and find the K min values. We use a + * min heap mechanism taken from the heap based priority queue + * implementation in Python. + * Here K = 40%. We are aiming to detect 40% similarity on average. + */ + reset_heap(&heap, FORTY_PCNT(j)); + ksmallest(fplist, j, &heap); + cur_sketch = XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0); memset(fplist, 0, fplist_sz); } else { - ctx->blocks[blknum]->cksum_n_offset = 0; - ctx->blocks[blknum]->mean_n_length = 0; + cur_sketch = ctx->blocks[blknum]->crc; } - fpos[0] = 0; - len1 = 0; + ctx->blocks[blknum]->cksum_n_offset = cur_sketch; cur_sketch = 0; blknum++; last_offset = i+1; length = 0; j = 0; - cur_sketch2 = 0; } } @@ -500,22 +440,31 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s // Insert the last left-over trailing bytes, if any, into a block. if (last_offset < *size) { if (ctx->blocks[blknum] == 0) - ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, sizeof (rabin_blockentry_t)); + ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, + sizeof (rabin_blockentry_t)); ctx->blocks[blknum]->offset = last_offset; ctx->blocks[blknum]->index = blknum; ctx->blocks[blknum]->length = *size - last_offset; ctx->blocks[blknum]->ref = 0; ctx->blocks[blknum]->similar = 0; + ctx->blocks[blknum]->crc = XXH_strong32(buf1+last_offset, ctx->blocks[blknum]->length, 0); if (ctx->delta_flag) { j = (j > 0 ? j:1); - ctx->blocks[blknum]->cksum_n_offset = (cur_sketch + cur_sketch2) / 2; - ctx->blocks[blknum]->mean_n_length = cur_sketch / j; + if (j > 1) { + reset_heap(&heap, FORTY_PCNT(j)); + ksmallest(fplist, j, &heap); + cur_sketch = + XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0); + } else { + cur_sketch = + XXH_fast32((const uchar_t *)fplist, (j*4)/2, 0); + } } else { - ctx->blocks[blknum]->cksum_n_offset = 0; - ctx->blocks[blknum]->mean_n_length = 0; + cur_sketch = ctx->blocks[blknum]->crc; } - ctx->blocks[blknum]->crc = XXH_strong32(buf1+last_offset, ctx->blocks[blknum]->length, 0); + + ctx->blocks[blknum]->cksum_n_offset = cur_sketch; blknum++; last_offset = *size; } diff --git a/utils/heapq.c b/utils/heapq.c new file mode 100644 index 0000000..a7637be --- /dev/null +++ b/utils/heapq.c @@ -0,0 +1,193 @@ +/* + * Functions for a rudimentary fast min-heap implementation. + * Derived from Python's _heapqmodule.c by way of drastic simplification + * and a few optimizations. + */ + +/* + * Original Python _heapqmodule.c implementation was derived directly + * from heapq.py in Py2.3 which was written by Kevin O'Connor, augmented + * by Tim Peters, annotated by François Pinard, and converted to C by + * Raymond Hettinger. + */ + +#include +#include +#include +#include +#include +#include +#include + +#ifndef NDEBUG +#define ERROR_CHK +#endif + +void +reset_heap(heap_t *heap, __TYPE tot) +{ + if (heap) { + heap->len = 0; + heap->tot = tot; + } +} + +static int +_siftdownmax(heap_t *h, __TYPE startpos, __TYPE pos) +{ + __TYPE newitem, parent; + __TYPE parentpos, *heap; + +#ifdef ERROR_CHK + if (pos >= h->len) { + fprintf(stderr, "_siftdownmax: index out of range\n"); + return -1; + } +#endif + + heap = h->ary; + newitem = heap[pos]; + /* Follow the path to the root, moving parents down until finding + a place newitem fits. */ + while (pos > startpos){ + parentpos = (pos - 1) >> 1; + parent = heap[parentpos]; + if (parent < newitem) + break; + heap[pos] = parent; + pos = parentpos; + } + heap[pos] = newitem; + return 0; +} + +static int +_siftupmax(heap_t *h, __TYPE spos, __TYPE epos) +{ + __TYPE endpos, childpos, rightpos; + __TYPE newitem, *heap, pos; + + endpos = h->len; + heap = h->ary; +#ifdef ERROR_CHK + if (pos >= endpos) { + fprintf(stderr, "_siftupmax: index out of range: %u, len: %u\n", pos, endpos); + return -1; + } +#endif + + do { + pos = spos; + /* Bubble up the smaller child until hitting a leaf. */ + newitem = heap[pos]; + childpos = (pos << 1) + 1; /* leftmost child position */ + while (childpos < endpos) { + /* Set childpos to index of smaller child. */ + rightpos = childpos + 1; + if (rightpos < endpos) { + if (heap[rightpos] < heap[childpos]) + childpos = rightpos; + } + /* Move the smaller child up. */ + heap[pos] = heap[childpos]; + pos = childpos; + childpos = (pos << 1) + 1; + } + + /* The leaf at pos is empty now. Put newitem there, and and bubble + it up to its final resting place (by sifting its parents down). */ + heap[pos] = newitem; +#ifdef ERROR_CHK + if (_siftdownmax(h, spos, pos) == -1) + return (-1); +#else + _siftdownmax(h, spos, pos); +#endif + spos--; + } while (spos >= epos); + return (0); +} + +static int +_siftupmax_s(heap_t *h, __TYPE spos) +{ + __TYPE endpos, childpos, rightpos; + __TYPE newitem, *heap, pos; + + endpos = h->len; + heap = h->ary; +#ifdef ERROR_CHK + if (pos >= endpos) { + fprintf(stderr, "_siftupmax: index out of range: %u, len: %u\n", pos, endpos); + return -1; + } +#endif + + pos = spos; + /* Bubble up the smaller child until hitting a leaf. */ + newitem = heap[pos]; + childpos = (pos << 1) + 1; /* leftmost child position */ + while (childpos < endpos) { + /* Set childpos to index of smaller child. */ + rightpos = childpos + 1; + if (rightpos < endpos) { + if (! (heap[rightpos] < heap[childpos])) + childpos = rightpos; + } + /* Move the smaller child up. */ + heap[pos] = heap[childpos]; + pos = childpos; + childpos = (pos << 1) + 1; + } + + /* The leaf at pos is empty now. Put newitem there, and and bubble + it up to its final resting place (by sifting its parents down). */ + heap[pos] = newitem; + return (_siftdownmax(h, spos, pos)); +} + +int +ksmallest(__TYPE *ary, __TYPE len, heap_t *heap) +{ + __TYPE elem, los; + __TYPE i, *hp, n; + +#ifdef ERROR_CHK + if (len >= heap->tot) { + fprintf(stderr, "nsmallest: array size > heap size\n"); + return (-1); + } +#endif + + n = heap->tot; + heap->ary = ary; + hp = ary; + heap->len = n; + +#ifdef ERROR_CHK + if(_siftupmax(heap, n/2-1, 0) == -1) + return (-1); +#else + _siftupmax(heap, n/2-1, 0); +#endif + + los = hp[0]; + for (i = n; i < len; i++) { + elem = ary[i]; + if (elem >= los) { + continue; + } + + hp[0] = elem; +#ifdef ERROR_CHK + if (_siftupmax_s(heap, 0) == -1) + return (-1); +#else + _siftupmax_s(heap, 0); +#endif + los = hp[0]; + } + + return 0; +} + diff --git a/utils/heapq.h b/utils/heapq.h new file mode 100644 index 0000000..155eeca --- /dev/null +++ b/utils/heapq.h @@ -0,0 +1,14 @@ +#ifndef __HEAPQ_H_ + +#define __TYPE int32_t + +typedef struct { + __TYPE *ary; + __TYPE len; + __TYPE tot; +} heap_t; + +extern int ksmallest(__TYPE *ary, __TYPE len, heap_t *heap); +extern void reset_heap(heap_t *h, __TYPE tot); + +#endif