From c0b4aa011602958b9f1ca20c7cce67c56f0745ed Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Sun, 21 Apr 2013 18:11:16 +0530 Subject: [PATCH] Many optimizations and changes to Segmented Global Dedupe. Use chunk hash based similarity matching rather than content based. Use sorting to order hash buffer rather than min-heap for better accuracy. Use fast CRC64 for similarity hash for speed and lower memory requirements. --- main.c | 2 +- rabin/global/dedupe_config.c | 2 +- rabin/global/dedupe_config.h | 3 +- rabin/global/index.c | 13 ++-- rabin/global/index.h | 2 +- rabin/rabin_dedup.c | 77 +++++++++++------- rabin/rabin_dedup.h | 1 - utils/heapq.c | 146 ++++++++++++++++++++++++++++------- utils/heapq.h | 5 +- utils/utils.h | 1 + 10 files changed, 184 insertions(+), 68 deletions(-) diff --git a/main.c b/main.c index 008753f..394f2b9 100644 --- a/main.c +++ b/main.c @@ -1834,7 +1834,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) } if (!S_ISDIR(st.st_mode)) { if (strcmp(tmp, "/tmp") != 0) { - tmp = "/tmp"; + tmp = "/tmp"; } else { fprintf(stderr, "Unable to find writable temporary dir.\n"); COMP_BAIL; diff --git a/rabin/global/dedupe_config.c b/rabin/global/dedupe_config.c index 59d3b4f..f537378 100644 --- a/rabin/global/dedupe_config.c +++ b/rabin/global/dedupe_config.c @@ -374,7 +374,7 @@ set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim cfg->segment_sz_bytes = user_chunk_sz; cfg->similarity_cksum_sz = cfg->chunk_cksum_sz; - } else if (cfg->archive_sz < ONE_TB) { + } else if (cfg->archive_sz < (ONE_TB * 100)) { cfg->segment_sz_bytes = FOUR_MB; } else { diff --git a/rabin/global/dedupe_config.h b/rabin/global/dedupe_config.h index b4a722d..730a05a 100644 --- a/rabin/global/dedupe_config.h +++ b/rabin/global/dedupe_config.h @@ -37,10 +37,11 @@ extern "C" { #define DEFAULT_CHUNK_CKSUM CKSUM_SHA256 #define DEFAULT_SIMILARITY_CKSUM CKSUM_BLAKE256 #define DEFAULT_COMPRESS COMPRESS_LZ4 -#define DEFAULT_PCT_INTERVAL 5 +#define DEFAULT_PCT_INTERVAL 8 #define CONTAINER_ITEMS 2048 #define MIN_CK 1 #define MAX_CK 5 +#define GLOBAL_SIM_CKSUM CKSUM_CRC64 // 8GB #define MIN_ARCHIVE_SZ (8589934592ULL) diff --git a/rabin/global/index.c b/rabin/global/index.c index aaef51a..7de4aca 100644 --- a/rabin/global/index.c +++ b/rabin/global/index.c @@ -108,7 +108,7 @@ int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz, int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, uint32_t *hash_slots, int *hash_entry_size, - uint64_t *memreqd, size_t memlimit) + uint64_t *memreqd, size_t memlimit, char *tmppath) { int rv, set_user; @@ -184,8 +184,8 @@ set_cfg: * If memory required is more than twice the indicated memory limit then * we switch to Segmented Cumulative Similarity based dedupe. */ - if (*memreqd > (memlimit * 2) && cfg->dedupe_mode == MODE_SIMPLE && - *pct_interval == 0) { + if (*memreqd > (memlimit * 3) && cfg->dedupe_mode == MODE_SIMPLE && + *pct_interval == 0 && tmppath != NULL) { *pct_interval = DEFAULT_PCT_INTERVAL; set_user = 1; goto set_cfg; @@ -213,7 +213,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch cfg = calloc(1, sizeof (archive_config_t)); rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim, - file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit); + file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit, tmppath); // Reduce hash_slots to remain within memlimit while (memreqd > memlimit) { @@ -232,7 +232,10 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch } cfg->nthreads = nthreads; - intervals = cfg->intervals + cfg->sub_intervals; + if (cfg->dedupe_mode == MODE_SIMILARITY) + intervals = 1; + else + intervals = cfg->intervals + cfg->sub_intervals; indx->memlimit = memlimit - (hash_entry_size << 2); indx->list = (htab_t *)calloc(intervals, sizeof (htab_t)); indx->hash_entry_size = hash_entry_size; diff --git a/rabin/global/index.h b/rabin/global/index.h index dd7a32c..51ec0b9 100644 --- a/rabin/global/index.h +++ b/rabin/global/index.h @@ -45,7 +45,7 @@ archive_config_t *init_global_db(char *configfile); int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz, int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, uint32_t *hash_slots, int *hash_entry_size, - uint64_t *memreqd, size_t memlimit); + uint64_t *memreqd, size_t memlimit, char *tmppath); archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 9d307fa..2740a32 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -110,6 +110,7 @@ uint64_t ir[256], out[256]; static int inited = 0; archive_config_t *arc = NULL; static struct blake2_dispatch bdsp; +int seg = 0; static uint32_t dedupe_min_blksz(int rab_blk_sz) @@ -146,7 +147,7 @@ global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_in rv = 0; pct_i = pct_interval; rv = setup_db_config_s(&cfg, rab_blk_sz, user_chunk_sz, &pct_i, algo, ck, ck_sim, - file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit); + file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit, "/tmp"); return (rv); } @@ -211,7 +212,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s */ get_sys_limits(&msys_info); - arc = init_global_db_s(NULL, "/tmp", rab_blk_sz, chunksize, 0, + arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, 0, algo, props->cksum, GLOBAL_SIM_CKSUM, file_size, msys_info.freeram, props->nthreads); if (arc == NULL) { @@ -375,6 +376,20 @@ destroy_dedupe_context(dedupe_context_t *ctx) } } +int +cmpint(const void *a, const void *b) +{ + uint64_t a1 = *((uint64_t *)a); + uint64_t b1 = *((uint64_t *)b); + + if (a1 < b1) + return (-1); + else if (a1 == b1) + return (0); + else + return (1); +} + /** * Perform Deduplication. * Both Semi-Rabin fingerprinting based and Fixed Block Deduplication are supported. @@ -566,7 +581,7 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of if (!(ctx->arc)) { if (ctx->blocks[blknum] == 0) ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, - sizeof (rabin_blockentry_t)); + sizeof (rabin_blockentry_t)); ctx->blocks[blknum]->offset = last_offset; ctx->blocks[blknum]->index = blknum; // Need to store for sorting ctx->blocks[blknum]->length = length; @@ -662,7 +677,8 @@ process_blocks: uint64_t dedupe_index_sz = 0; rabin_blockentry_t *be; DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions); - DEBUG_STAT_EN(double w1, w2); + DEBUG_STAT_EN(double w1 = 0); + DEBUG_STAT_EN(double w2 = 0); DEBUG_STAT_EN(delta_calls = 0); DEBUG_STAT_EN(delta_fails = 0); DEBUG_STAT_EN(hash_collisions = 0); @@ -789,10 +805,11 @@ process_blocks: global_blockentry_t *seg_blocks; uint64_t seg_offset, offset; global_blockentry_t **htab, *be; + int sub_i; /*====================================================================== * This code block implements Segmented similarity based Dedupe with - * in-memory index. + * in-memory index for very large datasets. * ====================================================================== */ cfg = ctx->arc; @@ -817,31 +834,35 @@ process_blocks: tgt += cfg->chunk_cksum_sz; } blks = j+i; + qsort(seg_heap, length/8, 8, cmpint); /* - * Compute the cumulative similarity minhashes. + * Compute the range similarity minhashes. */ sim_ck = ctx->similarity_cksums; crc = 0; - increment = (length / cfg->intervals) / cfg->sub_intervals; - len = increment; - for (j = 0; jsub_intervals; j++) { - reset_heap(&heap, len/8); - ksmallest((int64_t *)seg_heap, length/8, &heap); - crc = lzma_crc64(seg_heap + len - increment, increment, crc); + sub_i = cfg->sub_intervals; + increment = (length / cfg->intervals) / sub_i; + tgt = seg_heap; + while (increment < cfg->chunk_cksum_sz/4 && sub_i > 0) { + sub_i--; + increment = (length / cfg->intervals) / sub_i; + } + len = length; + for (j = 0; jsimilarity_cksum_sz; } increment = length / cfg->intervals; - len = increment * 2; for (j=0; jintervals-1; j++) { - reset_heap(&heap, len/8); - ksmallest((int64_t *)seg_heap, length/8, &heap); - crc = lzma_crc64(seg_heap + len - increment, increment, crc); + crc = lzma_crc64(tgt, increment, 0); *((uint64_t *)sim_ck) = crc; - len += increment; + tgt += increment; + len -= increment; sim_ck += cfg->similarity_cksum_sz; } @@ -854,12 +875,13 @@ process_blocks: sem_wait(ctx->index_sem); DEBUG_STAT_EN(w2 = get_wtime_millis()); } + sim_ck -= cfg->similarity_cksum_sz; seg_offset = db_segcache_pos(cfg, ctx->id); src = (uchar_t *)&(ctx->g_blocks[i]); len = blks * sizeof (global_blockentry_t); - db_segcache_write(cfg, ctx->id, src, len, blks, - ctx->file_offset + ctx->g_blocks[i].offset); + db_segcache_write(cfg, ctx->id, src, len, blks-i, + ctx->file_offset); /* * Insert current segment blocks into local hashtable and do partial @@ -869,7 +891,6 @@ process_blocks: memset(htab, 0, ary_sz); for (k=i; kg_blocks[k].cksum, cfg->chunk_cksum_sz, 0); hent ^= (hent / cfg->chunk_cksum_sz); hent = hent % cfg->segment_sz; @@ -913,10 +934,10 @@ process_blocks: * Now lookup the similarity minhashes starting at the highest * significance level. */ - for (j=cfg->intervals + cfg->sub_intervals; j > 0; j--) { + for (j=cfg->intervals + sub_i; j > 0; j--) { hash_entry_t *he; - he = db_lookup_insert_s(cfg, sim_ck, j-1, seg_offset, 0, 1); + he = db_lookup_insert_s(cfg, sim_ck, 0, seg_offset, 0, 1); if (he) { /* * Match found. Load segment metadata from disk and perform @@ -937,7 +958,7 @@ process_blocks: */ for (k=0; kchunk_cksum_sz, 0); + hent = XXH32(seg_blocks[k].cksum, cfg->chunk_cksum_sz, 0); hent ^= (hent / cfg->chunk_cksum_sz); hent = hent % cfg->segment_sz; @@ -945,7 +966,7 @@ process_blocks: be = htab[hent]; do { if (be->length & RABIN_INDEX_FLAG) - goto next_ent; + goto next_ent; if (memcmp(seg_blocks[k].cksum, be->cksum, cfg->chunk_cksum_sz) == 0 && seg_blocks[k].length == be->length) { @@ -968,7 +989,7 @@ next_ent: } sim_ck -= cfg->similarity_cksum_sz; } - i += blks; + i = blks; } /* @@ -985,7 +1006,7 @@ next_ent: if (!(ctx->g_blocks[i].length & RABIN_INDEX_FLAG)) { /* - * Block match in index not found. + * Block match in index was not found. * Block was added to index. Merge this block. */ if (length + ctx->g_blocks[i].length > RABIN_MAX_BLOCK_SIZE) { @@ -1045,7 +1066,7 @@ next_ent: DEBUG_STAT_EN(en = get_wtime_millis()); DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n", get_mb_s(*size, strt, en_1), get_mb_s(*size, strt, en - (w2 - w1)))); - DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n")); + DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.")); ctx->valid = 0; return (0); } diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index 7fa9a80..9745f38 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -137,7 +137,6 @@ #define SIMILAR_EXACT 1 #define SIMILAR_PARTIAL 2 #define SIMILAR_REF 3 -#define GLOBAL_SIM_CKSUM CKSUM_CRC64 /* * TYpes of delta operations. diff --git a/utils/heapq.c b/utils/heapq.c index 7e8db05..42dddca 100644 --- a/utils/heapq.c +++ b/utils/heapq.c @@ -174,38 +174,128 @@ _siftupmax_s(heap_t *h, __TYPE spos) int ksmallest(__TYPE *ary, __TYPE len, heap_t *heap) { - __TYPE elem, los; - __TYPE i, *hp, n; - - n = heap->tot; - heap->ary = ary; - hp = ary; - heap->len = n; + __TYPE elem, los; + __TYPE i, *hp, n; + __TYPE tmp; + + n = heap->tot; + heap->ary = ary; + hp = ary; + heap->len = n; #ifdef ERROR_CHK - if(_siftupmax(heap, n/2-1, 0) == -1) - return (-1); + if(_siftupmax(heap, n/2-1, 0) == -1) + return (-1); #else - _siftupmax(heap, n/2-1, 0); + _siftupmax(heap, n/2-1, 0); #endif - los = hp[0]; - for (i = n; i < len; i++) { - elem = ary[i]; - if (elem >= los) { - continue; - } - - hp[0] = elem; -#ifdef ERROR_CHK - if (_siftupmax_s(heap, 0) == -1) - return (-1); -#else - _siftupmax_s(heap, 0); -#endif - los = hp[0]; - } - - return 0; + los = hp[0]; + for (i = n; i < len; i++) { + elem = ary[i]; + if (elem >= los) { + continue; + } + + tmp = hp[0]; + hp[0] = elem; + ary[i] = tmp; + #ifdef ERROR_CHK + if (_siftupmax_s(heap, 0) == -1) + return (-1); + #else + _siftupmax_s(heap, 0); + #endif + los = hp[0]; + } + + return 0; } +static int +_siftdown(heap_t *h, __TYPE startpos, __TYPE pos) +{ + __TYPE newitem, parent, *heap; + __TYPE parentpos; + + heap = h->ary; +#ifdef ERROR_CHK + if (pos >= h->tot) { + fprintf(stderr, "_siftdown: index out of range: %" PRId64 ", len: %" PRId64 "\n", pos, h->len); + return -1; + } +#endif + + /* Follow the path to the root, moving parents down until finding + a place newitem fits. */ + newitem = heap[pos]; + while (pos > startpos){ + parentpos = (pos - 1) >> 1; + parent = heap[parentpos]; + if (parent < newitem) { + break; + } + heap[pos] = parent; + pos = parentpos; + } + heap[pos] = newitem; + return (0); +} + +static int +_siftup(heap_t *h, __TYPE pos) +{ + __TYPE startpos, endpos, childpos, rightpos; + __TYPE newitem, *heap; + + endpos = h->tot; + heap = h->ary; + startpos = pos; +#ifdef ERROR_CHK + if (pos >= endpos) { + fprintf(stderr, "_siftup: index out of range: %" PRId64 ", len: %" PRId64 "\n", pos, endpos); + return -1; + } +#endif + + /* Bubble up the smaller child until hitting a leaf. */ + newitem = heap[pos]; + childpos = 2*pos + 1; /* leftmost child position */ + while (childpos < endpos) { + /* Set childpos to index of smaller child. */ + rightpos = childpos + 1; + if (rightpos < endpos) { + if (heap[rightpos] < heap[childpos]) + childpos = rightpos; + } + /* Move the smaller child up. */ + heap[pos] = heap[childpos]; + pos = childpos; + childpos = 2*pos + 1; + } + + /* The leaf at pos is empty now. Put newitem there, and and bubble + it up to its final resting place (by sifting its parents down). */ + heap[pos] = newitem; + return _siftdown(h, startpos, pos); +} + +void +heapify(heap_t *h, __TYPE *ary) +{ + __TYPE i, n; + + n = h->tot; + h->ary = ary; + + /* Transform bottom-up. The largest index there's any point to + looking at is the largest with a child index in-range, so must + have 2*i + 1 < n, or i < (n-1)/2. If n is even = 2*j, this is + (2*j-1)/2 = j-1/2 so j-1 is the largest, which is n//2 - 1. If + n is odd = 2*j+1, this is (2*j+1-1)/2 = j so j-1 is the largest, + and that's again n//2-1. + */ + for (i=n/2-1 ; i>=0 ; i--) + if(_siftup(h, i) == -1) + break; +} diff --git a/utils/heapq.h b/utils/heapq.h index bc81e77..f9dc1c9 100644 --- a/utils/heapq.h +++ b/utils/heapq.h @@ -32,7 +32,8 @@ typedef struct { __TYPE tot; } heap_t; -extern int ksmallest(__TYPE *ary, __TYPE len, heap_t *heap); -extern void reset_heap(heap_t *h, __TYPE tot); +int ksmallest(__TYPE *ary, __TYPE len, heap_t *heap); +void reset_heap(heap_t *h, __TYPE tot); +void heapify(heap_t *h, __TYPE *ary); #endif diff --git a/utils/utils.h b/utils/utils.h index d121551..e9c0924 100644 --- a/utils/utils.h +++ b/utils/utils.h @@ -50,6 +50,7 @@ extern "C" { #define ONE_PB (1125899906842624ULL) #define ONE_TB (1099511627776ULL) +#define TWO_MB (2UL * 1024UL * 1024UL) #define FOUR_MB FOURM #define EIGHT_MB EIGHTM #define EIGHT_GB (8589934592ULL)