Many optimizations and changes to Segmented Global Dedupe.

Use chunk hash based similarity matching rather than content based.
Use sorting to order hash buffer rather than min-heap for better accuracy.
Use fast CRC64 for similarity hash for speed and lower memory requirements.
This commit is contained in:
Moinak Ghosh 2013-04-21 18:11:16 +05:30
parent 3b8a5813fd
commit c0b4aa0116
10 changed files with 184 additions and 68 deletions

2
main.c
View file

@ -1834,7 +1834,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
} }
if (!S_ISDIR(st.st_mode)) { if (!S_ISDIR(st.st_mode)) {
if (strcmp(tmp, "/tmp") != 0) { if (strcmp(tmp, "/tmp") != 0) {
tmp = "/tmp"; tmp = "/tmp";
} else { } else {
fprintf(stderr, "Unable to find writable temporary dir.\n"); fprintf(stderr, "Unable to find writable temporary dir.\n");
COMP_BAIL; COMP_BAIL;

View file

@ -374,7 +374,7 @@ set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim
cfg->segment_sz_bytes = user_chunk_sz; cfg->segment_sz_bytes = user_chunk_sz;
cfg->similarity_cksum_sz = cfg->chunk_cksum_sz; cfg->similarity_cksum_sz = cfg->chunk_cksum_sz;
} else if (cfg->archive_sz < ONE_TB) { } else if (cfg->archive_sz < (ONE_TB * 100)) {
cfg->segment_sz_bytes = FOUR_MB; cfg->segment_sz_bytes = FOUR_MB;
} else { } else {

View file

@ -37,10 +37,11 @@ extern "C" {
#define DEFAULT_CHUNK_CKSUM CKSUM_SHA256 #define DEFAULT_CHUNK_CKSUM CKSUM_SHA256
#define DEFAULT_SIMILARITY_CKSUM CKSUM_BLAKE256 #define DEFAULT_SIMILARITY_CKSUM CKSUM_BLAKE256
#define DEFAULT_COMPRESS COMPRESS_LZ4 #define DEFAULT_COMPRESS COMPRESS_LZ4
#define DEFAULT_PCT_INTERVAL 5 #define DEFAULT_PCT_INTERVAL 8
#define CONTAINER_ITEMS 2048 #define CONTAINER_ITEMS 2048
#define MIN_CK 1 #define MIN_CK 1
#define MAX_CK 5 #define MAX_CK 5
#define GLOBAL_SIM_CKSUM CKSUM_CRC64
// 8GB // 8GB
#define MIN_ARCHIVE_SZ (8589934592ULL) #define MIN_ARCHIVE_SZ (8589934592ULL)

View file

@ -108,7 +108,7 @@ int
setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz, setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz,
int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
size_t file_sz, uint32_t *hash_slots, int *hash_entry_size, size_t file_sz, uint32_t *hash_slots, int *hash_entry_size,
uint64_t *memreqd, size_t memlimit) uint64_t *memreqd, size_t memlimit, char *tmppath)
{ {
int rv, set_user; int rv, set_user;
@ -184,8 +184,8 @@ set_cfg:
* If memory required is more than twice the indicated memory limit then * If memory required is more than twice the indicated memory limit then
* we switch to Segmented Cumulative Similarity based dedupe. * we switch to Segmented Cumulative Similarity based dedupe.
*/ */
if (*memreqd > (memlimit * 2) && cfg->dedupe_mode == MODE_SIMPLE && if (*memreqd > (memlimit * 3) && cfg->dedupe_mode == MODE_SIMPLE &&
*pct_interval == 0) { *pct_interval == 0 && tmppath != NULL) {
*pct_interval = DEFAULT_PCT_INTERVAL; *pct_interval = DEFAULT_PCT_INTERVAL;
set_user = 1; set_user = 1;
goto set_cfg; goto set_cfg;
@ -213,7 +213,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
cfg = calloc(1, sizeof (archive_config_t)); cfg = calloc(1, sizeof (archive_config_t));
rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim, rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit); file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit, tmppath);
// Reduce hash_slots to remain within memlimit // Reduce hash_slots to remain within memlimit
while (memreqd > memlimit) { while (memreqd > memlimit) {
@ -232,7 +232,10 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
} }
cfg->nthreads = nthreads; cfg->nthreads = nthreads;
intervals = cfg->intervals + cfg->sub_intervals; if (cfg->dedupe_mode == MODE_SIMILARITY)
intervals = 1;
else
intervals = cfg->intervals + cfg->sub_intervals;
indx->memlimit = memlimit - (hash_entry_size << 2); indx->memlimit = memlimit - (hash_entry_size << 2);
indx->list = (htab_t *)calloc(intervals, sizeof (htab_t)); indx->list = (htab_t *)calloc(intervals, sizeof (htab_t));
indx->hash_entry_size = hash_entry_size; indx->hash_entry_size = hash_entry_size;

View file

@ -45,7 +45,7 @@ archive_config_t *init_global_db(char *configfile);
int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz, int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz,
int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
size_t file_sz, uint32_t *hash_slots, int *hash_entry_size, size_t file_sz, uint32_t *hash_slots, int *hash_entry_size,
uint64_t *memreqd, size_t memlimit); uint64_t *memreqd, size_t memlimit, char *tmppath);
archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize, archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize,
uint64_t user_chunk_sz, int pct_interval, const char *algo, uint64_t user_chunk_sz, int pct_interval, const char *algo,
cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit,

View file

@ -110,6 +110,7 @@ uint64_t ir[256], out[256];
static int inited = 0; static int inited = 0;
archive_config_t *arc = NULL; archive_config_t *arc = NULL;
static struct blake2_dispatch bdsp; static struct blake2_dispatch bdsp;
int seg = 0;
static uint32_t static uint32_t
dedupe_min_blksz(int rab_blk_sz) dedupe_min_blksz(int rab_blk_sz)
@ -146,7 +147,7 @@ global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_in
rv = 0; rv = 0;
pct_i = pct_interval; pct_i = pct_interval;
rv = setup_db_config_s(&cfg, rab_blk_sz, user_chunk_sz, &pct_i, algo, ck, ck_sim, rv = setup_db_config_s(&cfg, rab_blk_sz, user_chunk_sz, &pct_i, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit); file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit, "/tmp");
return (rv); return (rv);
} }
@ -211,7 +212,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
*/ */
get_sys_limits(&msys_info); get_sys_limits(&msys_info);
arc = init_global_db_s(NULL, "/tmp", rab_blk_sz, chunksize, 0, arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, 0,
algo, props->cksum, GLOBAL_SIM_CKSUM, file_size, algo, props->cksum, GLOBAL_SIM_CKSUM, file_size,
msys_info.freeram, props->nthreads); msys_info.freeram, props->nthreads);
if (arc == NULL) { if (arc == NULL) {
@ -375,6 +376,20 @@ destroy_dedupe_context(dedupe_context_t *ctx)
} }
} }
int
cmpint(const void *a, const void *b)
{
uint64_t a1 = *((uint64_t *)a);
uint64_t b1 = *((uint64_t *)b);
if (a1 < b1)
return (-1);
else if (a1 == b1)
return (0);
else
return (1);
}
/** /**
* Perform Deduplication. * Perform Deduplication.
* Both Semi-Rabin fingerprinting based and Fixed Block Deduplication are supported. * Both Semi-Rabin fingerprinting based and Fixed Block Deduplication are supported.
@ -566,7 +581,7 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of
if (!(ctx->arc)) { if (!(ctx->arc)) {
if (ctx->blocks[blknum] == 0) if (ctx->blocks[blknum] == 0)
ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL,
sizeof (rabin_blockentry_t)); sizeof (rabin_blockentry_t));
ctx->blocks[blknum]->offset = last_offset; ctx->blocks[blknum]->offset = last_offset;
ctx->blocks[blknum]->index = blknum; // Need to store for sorting ctx->blocks[blknum]->index = blknum; // Need to store for sorting
ctx->blocks[blknum]->length = length; ctx->blocks[blknum]->length = length;
@ -662,7 +677,8 @@ process_blocks:
uint64_t dedupe_index_sz = 0; uint64_t dedupe_index_sz = 0;
rabin_blockentry_t *be; rabin_blockentry_t *be;
DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions); DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions);
DEBUG_STAT_EN(double w1, w2); DEBUG_STAT_EN(double w1 = 0);
DEBUG_STAT_EN(double w2 = 0);
DEBUG_STAT_EN(delta_calls = 0); DEBUG_STAT_EN(delta_calls = 0);
DEBUG_STAT_EN(delta_fails = 0); DEBUG_STAT_EN(delta_fails = 0);
DEBUG_STAT_EN(hash_collisions = 0); DEBUG_STAT_EN(hash_collisions = 0);
@ -789,10 +805,11 @@ process_blocks:
global_blockentry_t *seg_blocks; global_blockentry_t *seg_blocks;
uint64_t seg_offset, offset; uint64_t seg_offset, offset;
global_blockentry_t **htab, *be; global_blockentry_t **htab, *be;
int sub_i;
/*====================================================================== /*======================================================================
* This code block implements Segmented similarity based Dedupe with * This code block implements Segmented similarity based Dedupe with
* in-memory index. * in-memory index for very large datasets.
* ====================================================================== * ======================================================================
*/ */
cfg = ctx->arc; cfg = ctx->arc;
@ -817,31 +834,35 @@ process_blocks:
tgt += cfg->chunk_cksum_sz; tgt += cfg->chunk_cksum_sz;
} }
blks = j+i; blks = j+i;
qsort(seg_heap, length/8, 8, cmpint);
/* /*
* Compute the cumulative similarity minhashes. * Compute the range similarity minhashes.
*/ */
sim_ck = ctx->similarity_cksums; sim_ck = ctx->similarity_cksums;
crc = 0; crc = 0;
increment = (length / cfg->intervals) / cfg->sub_intervals; sub_i = cfg->sub_intervals;
len = increment; increment = (length / cfg->intervals) / sub_i;
for (j = 0; j<cfg->sub_intervals; j++) { tgt = seg_heap;
reset_heap(&heap, len/8); while (increment < cfg->chunk_cksum_sz/4 && sub_i > 0) {
ksmallest((int64_t *)seg_heap, length/8, &heap); sub_i--;
crc = lzma_crc64(seg_heap + len - increment, increment, crc); increment = (length / cfg->intervals) / sub_i;
}
len = length;
for (j = 0; j<sub_i; j++) {
crc = lzma_crc64(tgt, increment, 0);
*((uint64_t *)sim_ck) = crc; *((uint64_t *)sim_ck) = crc;
len += increment; tgt += increment;
len -= increment;
sim_ck += cfg->similarity_cksum_sz; sim_ck += cfg->similarity_cksum_sz;
} }
increment = length / cfg->intervals; increment = length / cfg->intervals;
len = increment * 2;
for (j=0; j<cfg->intervals-1; j++) { for (j=0; j<cfg->intervals-1; j++) {
reset_heap(&heap, len/8); crc = lzma_crc64(tgt, increment, 0);
ksmallest((int64_t *)seg_heap, length/8, &heap);
crc = lzma_crc64(seg_heap + len - increment, increment, crc);
*((uint64_t *)sim_ck) = crc; *((uint64_t *)sim_ck) = crc;
len += increment; tgt += increment;
len -= increment;
sim_ck += cfg->similarity_cksum_sz; sim_ck += cfg->similarity_cksum_sz;
} }
@ -854,12 +875,13 @@ process_blocks:
sem_wait(ctx->index_sem); sem_wait(ctx->index_sem);
DEBUG_STAT_EN(w2 = get_wtime_millis()); DEBUG_STAT_EN(w2 = get_wtime_millis());
} }
sim_ck -= cfg->similarity_cksum_sz; sim_ck -= cfg->similarity_cksum_sz;
seg_offset = db_segcache_pos(cfg, ctx->id); seg_offset = db_segcache_pos(cfg, ctx->id);
src = (uchar_t *)&(ctx->g_blocks[i]); src = (uchar_t *)&(ctx->g_blocks[i]);
len = blks * sizeof (global_blockentry_t); len = blks * sizeof (global_blockentry_t);
db_segcache_write(cfg, ctx->id, src, len, blks, db_segcache_write(cfg, ctx->id, src, len, blks-i,
ctx->file_offset + ctx->g_blocks[i].offset); ctx->file_offset);
/* /*
* Insert current segment blocks into local hashtable and do partial * Insert current segment blocks into local hashtable and do partial
@ -869,7 +891,6 @@ process_blocks:
memset(htab, 0, ary_sz); memset(htab, 0, ary_sz);
for (k=i; k<blks; k++) { for (k=i; k<blks; k++) {
uint32_t hent; uint32_t hent;
hent = XXH32(ctx->g_blocks[k].cksum, cfg->chunk_cksum_sz, 0); hent = XXH32(ctx->g_blocks[k].cksum, cfg->chunk_cksum_sz, 0);
hent ^= (hent / cfg->chunk_cksum_sz); hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz; hent = hent % cfg->segment_sz;
@ -913,10 +934,10 @@ process_blocks:
* Now lookup the similarity minhashes starting at the highest * Now lookup the similarity minhashes starting at the highest
* significance level. * significance level.
*/ */
for (j=cfg->intervals + cfg->sub_intervals; j > 0; j--) { for (j=cfg->intervals + sub_i; j > 0; j--) {
hash_entry_t *he; hash_entry_t *he;
he = db_lookup_insert_s(cfg, sim_ck, j-1, seg_offset, 0, 1); he = db_lookup_insert_s(cfg, sim_ck, 0, seg_offset, 0, 1);
if (he) { if (he) {
/* /*
* Match found. Load segment metadata from disk and perform * Match found. Load segment metadata from disk and perform
@ -937,7 +958,7 @@ process_blocks:
*/ */
for (k=0; k<o_blks; k++) { for (k=0; k<o_blks; k++) {
uint32_t hent; uint32_t hent;
hent = XXH32(&(seg_blocks[k].cksum[0]), cfg->chunk_cksum_sz, 0); hent = XXH32(seg_blocks[k].cksum, cfg->chunk_cksum_sz, 0);
hent ^= (hent / cfg->chunk_cksum_sz); hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz; hent = hent % cfg->segment_sz;
@ -945,7 +966,7 @@ process_blocks:
be = htab[hent]; be = htab[hent];
do { do {
if (be->length & RABIN_INDEX_FLAG) if (be->length & RABIN_INDEX_FLAG)
goto next_ent; goto next_ent;
if (memcmp(seg_blocks[k].cksum, if (memcmp(seg_blocks[k].cksum,
be->cksum, cfg->chunk_cksum_sz) == 0 && be->cksum, cfg->chunk_cksum_sz) == 0 &&
seg_blocks[k].length == be->length) { seg_blocks[k].length == be->length) {
@ -968,7 +989,7 @@ next_ent:
} }
sim_ck -= cfg->similarity_cksum_sz; sim_ck -= cfg->similarity_cksum_sz;
} }
i += blks; i = blks;
} }
/* /*
@ -985,7 +1006,7 @@ next_ent:
if (!(ctx->g_blocks[i].length & RABIN_INDEX_FLAG)) { if (!(ctx->g_blocks[i].length & RABIN_INDEX_FLAG)) {
/* /*
* Block match in index not found. * Block match in index was not found.
* Block was added to index. Merge this block. * Block was added to index. Merge this block.
*/ */
if (length + ctx->g_blocks[i].length > RABIN_MAX_BLOCK_SIZE) { if (length + ctx->g_blocks[i].length > RABIN_MAX_BLOCK_SIZE) {
@ -1045,7 +1066,7 @@ next_ent:
DEBUG_STAT_EN(en = get_wtime_millis()); DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n", DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n",
get_mb_s(*size, strt, en_1), get_mb_s(*size, strt, en - (w2 - w1)))); get_mb_s(*size, strt, en_1), get_mb_s(*size, strt, en - (w2 - w1))));
DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n")); DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible."));
ctx->valid = 0; ctx->valid = 0;
return (0); return (0);
} }

View file

@ -137,7 +137,6 @@
#define SIMILAR_EXACT 1 #define SIMILAR_EXACT 1
#define SIMILAR_PARTIAL 2 #define SIMILAR_PARTIAL 2
#define SIMILAR_REF 3 #define SIMILAR_REF 3
#define GLOBAL_SIM_CKSUM CKSUM_CRC64
/* /*
* TYpes of delta operations. * TYpes of delta operations.

View file

@ -174,38 +174,128 @@ _siftupmax_s(heap_t *h, __TYPE spos)
int int
ksmallest(__TYPE *ary, __TYPE len, heap_t *heap) ksmallest(__TYPE *ary, __TYPE len, heap_t *heap)
{ {
__TYPE elem, los; __TYPE elem, los;
__TYPE i, *hp, n; __TYPE i, *hp, n;
__TYPE tmp;
n = heap->tot; n = heap->tot;
heap->ary = ary; heap->ary = ary;
hp = ary; hp = ary;
heap->len = n; heap->len = n;
#ifdef ERROR_CHK #ifdef ERROR_CHK
if(_siftupmax(heap, n/2-1, 0) == -1) if(_siftupmax(heap, n/2-1, 0) == -1)
return (-1); return (-1);
#else #else
_siftupmax(heap, n/2-1, 0); _siftupmax(heap, n/2-1, 0);
#endif #endif
los = hp[0]; los = hp[0];
for (i = n; i < len; i++) { for (i = n; i < len; i++) {
elem = ary[i]; elem = ary[i];
if (elem >= los) { if (elem >= los) {
continue; continue;
} }
hp[0] = elem; tmp = hp[0];
#ifdef ERROR_CHK hp[0] = elem;
if (_siftupmax_s(heap, 0) == -1) ary[i] = tmp;
return (-1); #ifdef ERROR_CHK
#else if (_siftupmax_s(heap, 0) == -1)
_siftupmax_s(heap, 0); return (-1);
#endif #else
los = hp[0]; _siftupmax_s(heap, 0);
} #endif
los = hp[0];
}
return 0; return 0;
} }
static int
_siftdown(heap_t *h, __TYPE startpos, __TYPE pos)
{
__TYPE newitem, parent, *heap;
__TYPE parentpos;
heap = h->ary;
#ifdef ERROR_CHK
if (pos >= h->tot) {
fprintf(stderr, "_siftdown: index out of range: %" PRId64 ", len: %" PRId64 "\n", pos, h->len);
return -1;
}
#endif
/* Follow the path to the root, moving parents down until finding
a place newitem fits. */
newitem = heap[pos];
while (pos > startpos){
parentpos = (pos - 1) >> 1;
parent = heap[parentpos];
if (parent < newitem) {
break;
}
heap[pos] = parent;
pos = parentpos;
}
heap[pos] = newitem;
return (0);
}
static int
_siftup(heap_t *h, __TYPE pos)
{
__TYPE startpos, endpos, childpos, rightpos;
__TYPE newitem, *heap;
endpos = h->tot;
heap = h->ary;
startpos = pos;
#ifdef ERROR_CHK
if (pos >= endpos) {
fprintf(stderr, "_siftup: index out of range: %" PRId64 ", len: %" PRId64 "\n", pos, endpos);
return -1;
}
#endif
/* Bubble up the smaller child until hitting a leaf. */
newitem = heap[pos];
childpos = 2*pos + 1; /* leftmost child position */
while (childpos < endpos) {
/* Set childpos to index of smaller child. */
rightpos = childpos + 1;
if (rightpos < endpos) {
if (heap[rightpos] < heap[childpos])
childpos = rightpos;
}
/* Move the smaller child up. */
heap[pos] = heap[childpos];
pos = childpos;
childpos = 2*pos + 1;
}
/* The leaf at pos is empty now. Put newitem there, and and bubble
it up to its final resting place (by sifting its parents down). */
heap[pos] = newitem;
return _siftdown(h, startpos, pos);
}
void
heapify(heap_t *h, __TYPE *ary)
{
__TYPE i, n;
n = h->tot;
h->ary = ary;
/* Transform bottom-up. The largest index there's any point to
looking at is the largest with a child index in-range, so must
have 2*i + 1 < n, or i < (n-1)/2. If n is even = 2*j, this is
(2*j-1)/2 = j-1/2 so j-1 is the largest, which is n//2 - 1. If
n is odd = 2*j+1, this is (2*j+1-1)/2 = j so j-1 is the largest,
and that's again n//2-1.
*/
for (i=n/2-1 ; i>=0 ; i--)
if(_siftup(h, i) == -1)
break;
}

View file

@ -32,7 +32,8 @@ typedef struct {
__TYPE tot; __TYPE tot;
} heap_t; } heap_t;
extern int ksmallest(__TYPE *ary, __TYPE len, heap_t *heap); int ksmallest(__TYPE *ary, __TYPE len, heap_t *heap);
extern void reset_heap(heap_t *h, __TYPE tot); void reset_heap(heap_t *h, __TYPE tot);
void heapify(heap_t *h, __TYPE *ary);
#endif #endif

View file

@ -50,6 +50,7 @@ extern "C" {
#define ONE_PB (1125899906842624ULL) #define ONE_PB (1125899906842624ULL)
#define ONE_TB (1099511627776ULL) #define ONE_TB (1099511627776ULL)
#define TWO_MB (2UL * 1024UL * 1024UL)
#define FOUR_MB FOURM #define FOUR_MB FOURM
#define EIGHT_MB EIGHTM #define EIGHT_MB EIGHTM
#define EIGHT_GB (8589934592ULL) #define EIGHT_GB (8589934592ULL)