Implement K-min-values Sketch for Similarity detection.
This commit is contained in:
parent
117382c141
commit
f3f472b860
4 changed files with 257 additions and 101 deletions
|
@ -24,8 +24,8 @@
|
|||
PROG= pcompress
|
||||
MAINSRCS = main.c utils/utils.c allocator.c zlib_compress.c bzip2_compress.c \
|
||||
lzma_compress.c ppmd_compress.c adaptive_compress.c lzfx_compress.c \
|
||||
lz4_compress.c none_compress.c utils/xxhash.c
|
||||
MAINHDRS = allocator.h pcompress.h utils/utils.h utils/xxhash.h
|
||||
lz4_compress.c none_compress.c utils/xxhash.c utils/heapq.c
|
||||
MAINHDRS = allocator.h pcompress.h utils/utils.h utils/xxhash.h utils/heapq.h
|
||||
MAINOBJS = $(MAINSRCS:.c=.o)
|
||||
|
||||
RABINSRCS = rabin/rabin_dedup.c
|
||||
|
|
|
@ -63,9 +63,12 @@
|
|||
#include <allocator.h>
|
||||
#include <utils.h>
|
||||
#include <pthread.h>
|
||||
#include <heapq.h>
|
||||
|
||||
#include "rabin_dedup.h"
|
||||
|
||||
#define FORTY_PCNT(x) (((x)/5 << 1))
|
||||
|
||||
extern int lzma_init(void **data, int *level, ssize_t chunksize);
|
||||
extern int lzma_compress(void *src, size_t srclen, void *dst,
|
||||
size_t *destlen, int level, uchar_t chdr, void *data);
|
||||
|
@ -87,19 +90,7 @@ rabin_min_blksz(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta_
|
|||
{
|
||||
uint32_t min_blk;
|
||||
|
||||
min_blk = 1 << (rab_blk_sz + RAB_BLK_MIN_BITS);
|
||||
if (rab_blk_sz > 1)
|
||||
return (min_blk);
|
||||
|
||||
if (((memcmp(algo, "lzma", 4) == 0 || memcmp(algo, "adapt", 5) == 0) &&
|
||||
chunksize <= LZMA_WINDOW_MAX) || delta_flag) {
|
||||
if (memcmp(algo, "lzfx", 4) == 0 || memcmp(algo, "lz4", 3) == 0 ||
|
||||
memcmp(algo, "zlib", 4) == 0 || memcmp(algo, "none", 4) == 0) {
|
||||
min_blk = 1 << (rab_blk_sz + RAB_BLK_MIN_BITS - 1);
|
||||
}
|
||||
} else {
|
||||
min_blk = 1 << (rab_blk_sz + RAB_BLK_MIN_BITS - 1);
|
||||
}
|
||||
min_blk = 1 << (rab_blk_sz + RAB_BLK_MIN_BITS - 1);
|
||||
return (min_blk);
|
||||
}
|
||||
|
||||
|
@ -298,32 +289,25 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s
|
|||
char *buf1 = (char *)buf;
|
||||
uint32_t length;
|
||||
uint64_t cur_roll_checksum, cur_pos_checksum, cur_sketch;
|
||||
uint64_t *fplist;
|
||||
uint32_t len1, fpos[2], cur_sketch2;
|
||||
uint32_t *charcounts, byts;
|
||||
uint32_t *fplist;
|
||||
heap_t heap;
|
||||
|
||||
if (rabin_pos == NULL) {
|
||||
/*
|
||||
* Initialize arrays for sketch computation. We re-use memory allocated
|
||||
* for the compressed chunk temporarily.
|
||||
*/
|
||||
fplist_sz = 8 * ctx->rabin_poly_avg_block_size;
|
||||
fplist = (uint64_t *)(ctx->cbuf + ctx->real_chunksize - fplist_sz - 256 * 4);
|
||||
charcounts = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - 256 * 4);
|
||||
fplist_sz = 4 * ctx->rabin_poly_max_block_size;
|
||||
fplist = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - fplist_sz);
|
||||
memset(fplist, 0, fplist_sz);
|
||||
memset(charcounts, 0, 256 * 4);
|
||||
fpos[0] = 0;
|
||||
fpos[1] = 0;
|
||||
len1 = 0;
|
||||
reset_heap(&heap, fplist_sz/2);
|
||||
}
|
||||
length = offset;
|
||||
last_offset = 0;
|
||||
blknum = 0;
|
||||
ctx->valid = 0;
|
||||
cur_roll_checksum = 0;
|
||||
j = 0;
|
||||
cur_sketch = 0;
|
||||
cur_sketch2 = 0;
|
||||
|
||||
/*
|
||||
* If rabin_pos is non-zero then we are being asked to scan for the last rabin boundary
|
||||
|
@ -362,6 +346,8 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s
|
|||
}
|
||||
|
||||
if (*size < ctx->rabin_poly_avg_block_size) return;
|
||||
j = 0;
|
||||
|
||||
for (i=offset; i<*size; i++) {
|
||||
uint32_t *splits;
|
||||
uchar_t cur_byte = buf1[i];
|
||||
|
@ -379,68 +365,19 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s
|
|||
cur_pos_checksum = cur_roll_checksum ^ ir[pushed_out];
|
||||
|
||||
/*
|
||||
* Compute a super sketch value of the block. We store a sum of relative
|
||||
* maximal rabin hash values per 1K(SKETCH_BASIC_BLOCK_SZ) of data. So we
|
||||
* get upto 128 sums for a max block size of 128K. The bottom blocksize bits
|
||||
* of the hash are only used which are then biased with the occurrence count.
|
||||
* This is a representative fingerprint sketch of the block. Storing and
|
||||
* comparing upto 128 fingerprints per block is very expensive (compute & RAM)
|
||||
* so we eventually sum all the fingerprints for the block to create a single
|
||||
* super sketch value representing maximal features of the block. In addition
|
||||
* the top 2 commonly occuring byte values are used to compute a second sketch
|
||||
* to refine the earlier one.
|
||||
* Retain a list of all fingerprints in the block. We then compute
|
||||
* the K min values sketch from that list and generate a super sketch
|
||||
* by hashing over the K min values sketch. We only store the least
|
||||
* significant 32 bits of the fingerprint. This uses less memory,
|
||||
* requires smaller memset() calls and generates a sufficiently large
|
||||
* number of similarity matches without false positives - determined
|
||||
* by experimentation.
|
||||
*
|
||||
* This value can be used for similarity detection for delta encoding. Exact
|
||||
* match for deduplication is additionally detected via a memcmp(). This is a
|
||||
* variant of some approaches detailed in:
|
||||
* http://www.armedia.com/wp/SimilarityIndex.pdf
|
||||
* This is called minhashing and is used widely, for example in various
|
||||
* search engines to detect similar documents.
|
||||
*/
|
||||
len1++;
|
||||
fpos[1] = cur_pos_checksum & ctx->rabin_avg_block_mask;
|
||||
splits = (uint32_t *)(&fplist[fpos[1]]);
|
||||
#if BYTE_ORDER == BIG_ENDIAN
|
||||
splits[0]++;
|
||||
splits[1] += cur_pos_checksum & ctx->fp_mask;
|
||||
#else
|
||||
splits[1]++;
|
||||
splits[0] += cur_pos_checksum & ctx->fp_mask;
|
||||
#endif
|
||||
charcounts[cur_byte]++;
|
||||
|
||||
/*
|
||||
* Perform the following statement without branching:
|
||||
* if (fplist[fpos[1]] > fplist[fpos[0]]) fpos[0] = fpos[1];
|
||||
*/
|
||||
fpos[0] = fpos[(fplist[fpos[1]] > fplist[fpos[0]])];
|
||||
if (len1 == SKETCH_BASIC_BLOCK_SZ && ctx->delta_flag) {
|
||||
uint32_t p1, p2, p3;
|
||||
/*
|
||||
* Compute the super sketch value by summing all the representative
|
||||
* fingerprints of the block.
|
||||
*/
|
||||
cur_sketch += fplist[fpos[0]];
|
||||
memset(fplist, 0, fplist_sz);
|
||||
fpos[0] = 0;
|
||||
|
||||
/*
|
||||
* Find out the top 2 occurring byte values and compute
|
||||
* a secondary sketch from them.
|
||||
*/
|
||||
p1 = 0;
|
||||
p2 = 0;
|
||||
p3 = 0;
|
||||
for (len1=0; len1<256; len1++) {
|
||||
if (charcounts[len1] > p1) {
|
||||
p3 = p2;
|
||||
p2 = p1;
|
||||
p1 = len1;
|
||||
}
|
||||
charcounts[len1] = 0;
|
||||
}
|
||||
cur_sketch2 += ((p1 << 16) | (p2 << 8) | p3);
|
||||
len1 = 0;
|
||||
j++;
|
||||
}
|
||||
fplist[j] = cur_pos_checksum & 0xFFFFFFFFUL;
|
||||
j++;
|
||||
|
||||
/*
|
||||
* Window pos has to rotate from 0 .. RAB_POLYNOMIAL_WIN_SIZE-1
|
||||
|
@ -463,23 +400,26 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s
|
|||
ctx->blocks[blknum]->similar = 0;
|
||||
ctx->blocks[blknum]->crc = XXH_strong32(buf1+last_offset, length, 0);
|
||||
|
||||
// Accumulate the 2 sketch values into a combined similarity checksum
|
||||
if (ctx->delta_flag) {
|
||||
ctx->blocks[blknum]->cksum_n_offset = (cur_sketch + cur_sketch2) / 2;
|
||||
ctx->blocks[blknum]->mean_n_length = cur_sketch / j;
|
||||
/*
|
||||
* Reset the heap structure and find the K min values. We use a
|
||||
* min heap mechanism taken from the heap based priority queue
|
||||
* implementation in Python.
|
||||
* Here K = 40%. We are aiming to detect 40% similarity on average.
|
||||
*/
|
||||
reset_heap(&heap, FORTY_PCNT(j));
|
||||
ksmallest(fplist, j, &heap);
|
||||
cur_sketch = XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0);
|
||||
memset(fplist, 0, fplist_sz);
|
||||
} else {
|
||||
ctx->blocks[blknum]->cksum_n_offset = 0;
|
||||
ctx->blocks[blknum]->mean_n_length = 0;
|
||||
cur_sketch = ctx->blocks[blknum]->crc;
|
||||
}
|
||||
fpos[0] = 0;
|
||||
len1 = 0;
|
||||
ctx->blocks[blknum]->cksum_n_offset = cur_sketch;
|
||||
cur_sketch = 0;
|
||||
blknum++;
|
||||
last_offset = i+1;
|
||||
length = 0;
|
||||
j = 0;
|
||||
cur_sketch2 = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -500,22 +440,31 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, s
|
|||
// Insert the last left-over trailing bytes, if any, into a block.
|
||||
if (last_offset < *size) {
|
||||
if (ctx->blocks[blknum] == 0)
|
||||
ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, sizeof (rabin_blockentry_t));
|
||||
ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL,
|
||||
sizeof (rabin_blockentry_t));
|
||||
ctx->blocks[blknum]->offset = last_offset;
|
||||
ctx->blocks[blknum]->index = blknum;
|
||||
ctx->blocks[blknum]->length = *size - last_offset;
|
||||
ctx->blocks[blknum]->ref = 0;
|
||||
ctx->blocks[blknum]->similar = 0;
|
||||
ctx->blocks[blknum]->crc = XXH_strong32(buf1+last_offset, ctx->blocks[blknum]->length, 0);
|
||||
|
||||
if (ctx->delta_flag) {
|
||||
j = (j > 0 ? j:1);
|
||||
ctx->blocks[blknum]->cksum_n_offset = (cur_sketch + cur_sketch2) / 2;
|
||||
ctx->blocks[blknum]->mean_n_length = cur_sketch / j;
|
||||
if (j > 1) {
|
||||
reset_heap(&heap, FORTY_PCNT(j));
|
||||
ksmallest(fplist, j, &heap);
|
||||
cur_sketch =
|
||||
XXH_fast32((const uchar_t *)fplist, FORTY_PCNT(j)*4, 0);
|
||||
} else {
|
||||
cur_sketch =
|
||||
XXH_fast32((const uchar_t *)fplist, (j*4)/2, 0);
|
||||
}
|
||||
} else {
|
||||
ctx->blocks[blknum]->cksum_n_offset = 0;
|
||||
ctx->blocks[blknum]->mean_n_length = 0;
|
||||
cur_sketch = ctx->blocks[blknum]->crc;
|
||||
}
|
||||
ctx->blocks[blknum]->crc = XXH_strong32(buf1+last_offset, ctx->blocks[blknum]->length, 0);
|
||||
|
||||
ctx->blocks[blknum]->cksum_n_offset = cur_sketch;
|
||||
blknum++;
|
||||
last_offset = *size;
|
||||
}
|
||||
|
|
193
utils/heapq.c
Normal file
193
utils/heapq.c
Normal file
|
@ -0,0 +1,193 @@
|
|||
/*
|
||||
* Functions for a rudimentary fast min-heap implementation.
|
||||
* Derived from Python's _heapqmodule.c by way of drastic simplification
|
||||
* and a few optimizations.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Original Python _heapqmodule.c implementation was derived directly
|
||||
* from heapq.py in Py2.3 which was written by Kevin O'Connor, augmented
|
||||
* by Tim Peters, annotated by François Pinard, and converted to C by
|
||||
* Raymond Hettinger.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <limits.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/types.h>
|
||||
#include <stdint.h>
|
||||
#include <heapq.h>
|
||||
|
||||
#ifndef NDEBUG
|
||||
#define ERROR_CHK
|
||||
#endif
|
||||
|
||||
void
|
||||
reset_heap(heap_t *heap, __TYPE tot)
|
||||
{
|
||||
if (heap) {
|
||||
heap->len = 0;
|
||||
heap->tot = tot;
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
_siftdownmax(heap_t *h, __TYPE startpos, __TYPE pos)
|
||||
{
|
||||
__TYPE newitem, parent;
|
||||
__TYPE parentpos, *heap;
|
||||
|
||||
#ifdef ERROR_CHK
|
||||
if (pos >= h->len) {
|
||||
fprintf(stderr, "_siftdownmax: index out of range\n");
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
heap = h->ary;
|
||||
newitem = heap[pos];
|
||||
/* Follow the path to the root, moving parents down until finding
|
||||
a place newitem fits. */
|
||||
while (pos > startpos){
|
||||
parentpos = (pos - 1) >> 1;
|
||||
parent = heap[parentpos];
|
||||
if (parent < newitem)
|
||||
break;
|
||||
heap[pos] = parent;
|
||||
pos = parentpos;
|
||||
}
|
||||
heap[pos] = newitem;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
_siftupmax(heap_t *h, __TYPE spos, __TYPE epos)
|
||||
{
|
||||
__TYPE endpos, childpos, rightpos;
|
||||
__TYPE newitem, *heap, pos;
|
||||
|
||||
endpos = h->len;
|
||||
heap = h->ary;
|
||||
#ifdef ERROR_CHK
|
||||
if (pos >= endpos) {
|
||||
fprintf(stderr, "_siftupmax: index out of range: %u, len: %u\n", pos, endpos);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
do {
|
||||
pos = spos;
|
||||
/* Bubble up the smaller child until hitting a leaf. */
|
||||
newitem = heap[pos];
|
||||
childpos = (pos << 1) + 1; /* leftmost child position */
|
||||
while (childpos < endpos) {
|
||||
/* Set childpos to index of smaller child. */
|
||||
rightpos = childpos + 1;
|
||||
if (rightpos < endpos) {
|
||||
if (heap[rightpos] < heap[childpos])
|
||||
childpos = rightpos;
|
||||
}
|
||||
/* Move the smaller child up. */
|
||||
heap[pos] = heap[childpos];
|
||||
pos = childpos;
|
||||
childpos = (pos << 1) + 1;
|
||||
}
|
||||
|
||||
/* The leaf at pos is empty now. Put newitem there, and and bubble
|
||||
it up to its final resting place (by sifting its parents down). */
|
||||
heap[pos] = newitem;
|
||||
#ifdef ERROR_CHK
|
||||
if (_siftdownmax(h, spos, pos) == -1)
|
||||
return (-1);
|
||||
#else
|
||||
_siftdownmax(h, spos, pos);
|
||||
#endif
|
||||
spos--;
|
||||
} while (spos >= epos);
|
||||
return (0);
|
||||
}
|
||||
|
||||
static int
|
||||
_siftupmax_s(heap_t *h, __TYPE spos)
|
||||
{
|
||||
__TYPE endpos, childpos, rightpos;
|
||||
__TYPE newitem, *heap, pos;
|
||||
|
||||
endpos = h->len;
|
||||
heap = h->ary;
|
||||
#ifdef ERROR_CHK
|
||||
if (pos >= endpos) {
|
||||
fprintf(stderr, "_siftupmax: index out of range: %u, len: %u\n", pos, endpos);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
pos = spos;
|
||||
/* Bubble up the smaller child until hitting a leaf. */
|
||||
newitem = heap[pos];
|
||||
childpos = (pos << 1) + 1; /* leftmost child position */
|
||||
while (childpos < endpos) {
|
||||
/* Set childpos to index of smaller child. */
|
||||
rightpos = childpos + 1;
|
||||
if (rightpos < endpos) {
|
||||
if (! (heap[rightpos] < heap[childpos]))
|
||||
childpos = rightpos;
|
||||
}
|
||||
/* Move the smaller child up. */
|
||||
heap[pos] = heap[childpos];
|
||||
pos = childpos;
|
||||
childpos = (pos << 1) + 1;
|
||||
}
|
||||
|
||||
/* The leaf at pos is empty now. Put newitem there, and and bubble
|
||||
it up to its final resting place (by sifting its parents down). */
|
||||
heap[pos] = newitem;
|
||||
return (_siftdownmax(h, spos, pos));
|
||||
}
|
||||
|
||||
int
|
||||
ksmallest(__TYPE *ary, __TYPE len, heap_t *heap)
|
||||
{
|
||||
__TYPE elem, los;
|
||||
__TYPE i, *hp, n;
|
||||
|
||||
#ifdef ERROR_CHK
|
||||
if (len >= heap->tot) {
|
||||
fprintf(stderr, "nsmallest: array size > heap size\n");
|
||||
return (-1);
|
||||
}
|
||||
#endif
|
||||
|
||||
n = heap->tot;
|
||||
heap->ary = ary;
|
||||
hp = ary;
|
||||
heap->len = n;
|
||||
|
||||
#ifdef ERROR_CHK
|
||||
if(_siftupmax(heap, n/2-1, 0) == -1)
|
||||
return (-1);
|
||||
#else
|
||||
_siftupmax(heap, n/2-1, 0);
|
||||
#endif
|
||||
|
||||
los = hp[0];
|
||||
for (i = n; i < len; i++) {
|
||||
elem = ary[i];
|
||||
if (elem >= los) {
|
||||
continue;
|
||||
}
|
||||
|
||||
hp[0] = elem;
|
||||
#ifdef ERROR_CHK
|
||||
if (_siftupmax_s(heap, 0) == -1)
|
||||
return (-1);
|
||||
#else
|
||||
_siftupmax_s(heap, 0);
|
||||
#endif
|
||||
los = hp[0];
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
14
utils/heapq.h
Normal file
14
utils/heapq.h
Normal file
|
@ -0,0 +1,14 @@
|
|||
#ifndef __HEAPQ_H_
|
||||
|
||||
#define __TYPE int32_t
|
||||
|
||||
typedef struct {
|
||||
__TYPE *ary;
|
||||
__TYPE len;
|
||||
__TYPE tot;
|
||||
} heap_t;
|
||||
|
||||
extern int ksmallest(__TYPE *ary, __TYPE len, heap_t *heap);
|
||||
extern void reset_heap(heap_t *h, __TYPE tot);
|
||||
|
||||
#endif
|
Loading…
Reference in a new issue