Many optimizations to segmented global dedupe.

Use chunk hash based cumulative similarity matching instead of chunk content.
This commit is contained in:
Moinak Ghosh 2013-04-19 22:51:51 +05:30
parent 2f6ccca6e5
commit 3b8a5813fd
4 changed files with 82 additions and 95 deletions

View file

@ -316,7 +316,7 @@ db_segcache_pos(archive_config_t *cfg, int tid)
* Mmap the requested segment metadata array. * Mmap the requested segment metadata array.
*/ */
int int
db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t *blocks) db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks)
{ {
uchar_t *mapbuf, *hdr; uchar_t *mapbuf, *hdr;
int fd; int fd;
@ -349,7 +349,7 @@ db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offs
hdr = mapbuf + adj; hdr = mapbuf + adj;
*blknum = *((uint32_t *)(hdr)); *blknum = *((uint32_t *)(hdr));
*offset = *((uint64_t *)(hdr + 4)); *offset = *((uint64_t *)(hdr + 4));
memcpy(blocks, hdr + SEGCACHE_HDR_SZ, *blknum * sizeof (global_blockentry_t)); *blocks = hdr + SEGCACHE_HDR_SZ;
cfg->seg_fd_r[tid].mapping = mapbuf; cfg->seg_fd_r[tid].mapping = mapbuf;
cfg->seg_fd_r[tid].len = len + adj; cfg->seg_fd_r[tid].len = len + adj;

View file

@ -56,7 +56,7 @@ void destroy_global_db_s(archive_config_t *cfg);
int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset); int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset);
int db_segcache_pos(archive_config_t *cfg, int tid); int db_segcache_pos(archive_config_t *cfg, int tid);
int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t *blocks); int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks);
int db_segcache_unmap(archive_config_t *cfg, int tid); int db_segcache_unmap(archive_config_t *cfg, int tid);
#ifdef __cplusplus #ifdef __cplusplus

View file

@ -212,7 +212,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
get_sys_limits(&msys_info); get_sys_limits(&msys_info);
arc = init_global_db_s(NULL, "/tmp", rab_blk_sz, chunksize, 0, arc = init_global_db_s(NULL, "/tmp", rab_blk_sz, chunksize, 0,
algo, props->cksum, CKSUM_CRC64, file_size, algo, props->cksum, GLOBAL_SIM_CKSUM, file_size,
msys_info.freeram, props->nthreads); msys_info.freeram, props->nthreads);
if (arc == NULL) { if (arc == NULL) {
pthread_mutex_unlock(&init_lock); pthread_mutex_unlock(&init_lock);
@ -662,6 +662,7 @@ process_blocks:
uint64_t dedupe_index_sz = 0; uint64_t dedupe_index_sz = 0;
rabin_blockentry_t *be; rabin_blockentry_t *be;
DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions); DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions);
DEBUG_STAT_EN(double w1, w2);
DEBUG_STAT_EN(delta_calls = 0); DEBUG_STAT_EN(delta_calls = 0);
DEBUG_STAT_EN(delta_fails = 0); DEBUG_STAT_EN(delta_fails = 0);
DEBUG_STAT_EN(hash_collisions = 0); DEBUG_STAT_EN(hash_collisions = 0);
@ -712,7 +713,9 @@ process_blocks:
* threads without locking. * threads without locking.
*/ */
length = 0; length = 0;
DEBUG_STAT_EN(w1 = get_wtime_millis());
sem_wait(ctx->index_sem); sem_wait(ctx->index_sem);
DEBUG_STAT_EN(w2 = get_wtime_millis());
for (i=0; i<blknum; i++) { for (i=0; i<blknum; i++) {
hash_entry_t *he; hash_entry_t *he;
@ -793,11 +796,9 @@ process_blocks:
* ====================================================================== * ======================================================================
*/ */
cfg = ctx->arc; cfg = ctx->arc;
seg_heap = (uchar_t *)(ctx->g_blocks) - cfg->segment_sz_bytes; seg_heap = (uchar_t *)(ctx->g_blocks) - cfg->segment_sz * cfg->chunk_cksum_sz;
ary_sz = cfg->segment_sz * sizeof (global_blockentry_t **); ary_sz = cfg->segment_sz * sizeof (global_blockentry_t **);
htab = (global_blockentry_t **)(seg_heap - ary_sz); htab = (global_blockentry_t **)(seg_heap - ary_sz);
seg_blocks = (global_blockentry_t *)(seg_heap - ary_sz - \
cfg->segment_sz * sizeof (global_blockentry_t));
for (i=0; i<blknum;) { for (i=0; i<blknum;) {
uint64_t crc; uint64_t crc;
length = 0; length = 0;
@ -808,12 +809,12 @@ process_blocks:
blks = cfg->segment_sz; blks = cfg->segment_sz;
if (blks > blknum-i) blks = blknum-i; if (blks > blknum-i) blks = blknum-i;
len = 0; len = 0;
length = 0;
tgt = seg_heap;
for (j=0; j<blks; j++) { for (j=0; j<blks; j++) {
len += ctx->g_blocks[j+i].length; memcpy(tgt, ctx->g_blocks[j+i].cksum, cfg->chunk_cksum_sz);
if (len > cfg->segment_sz_bytes) { length += cfg->chunk_cksum_sz;
break; tgt += cfg->chunk_cksum_sz;
}
length = len;
} }
blks = j+i; blks = j+i;
@ -821,13 +822,10 @@ process_blocks:
* Compute the cumulative similarity minhashes. * Compute the cumulative similarity minhashes.
*/ */
sim_ck = ctx->similarity_cksums; sim_ck = ctx->similarity_cksums;
src = buf1 + ctx->g_blocks[i].offset;
tgt = seg_heap;
memcpy(tgt, src, length);
crc = 0; crc = 0;
increment = (length / cfg->intervals) / cfg->sub_intervals; increment = (length / cfg->intervals) / cfg->sub_intervals;
len = increment; len = increment;
for (j=0; j<cfg->sub_intervals; j++) { for (j = 0; j<cfg->sub_intervals; j++) {
reset_heap(&heap, len/8); reset_heap(&heap, len/8);
ksmallest((int64_t *)seg_heap, length/8, &heap); ksmallest((int64_t *)seg_heap, length/8, &heap);
crc = lzma_crc64(seg_heap + len - increment, increment, crc); crc = lzma_crc64(seg_heap + len - increment, increment, crc);
@ -836,7 +834,6 @@ process_blocks:
sim_ck += cfg->similarity_cksum_sz; sim_ck += cfg->similarity_cksum_sz;
} }
len -= increment;
increment = length / cfg->intervals; increment = length / cfg->intervals;
len = increment * 2; len = increment * 2;
for (j=0; j<cfg->intervals-1; j++) { for (j=0; j<cfg->intervals-1; j++) {
@ -852,7 +849,11 @@ process_blocks:
* Begin shared index access and write segment metadata to cache * Begin shared index access and write segment metadata to cache
* first. * first.
*/ */
if (i == 0) sem_wait(ctx->index_sem); if (i == 0) {
DEBUG_STAT_EN(w1 = get_wtime_millis());
sem_wait(ctx->index_sem);
DEBUG_STAT_EN(w2 = get_wtime_millis());
}
sim_ck -= cfg->similarity_cksum_sz; sim_ck -= cfg->similarity_cksum_sz;
seg_offset = db_segcache_pos(cfg, ctx->id); seg_offset = db_segcache_pos(cfg, ctx->id);
src = (uchar_t *)&(ctx->g_blocks[i]); src = (uchar_t *)&(ctx->g_blocks[i]);
@ -860,6 +861,54 @@ process_blocks:
db_segcache_write(cfg, ctx->id, src, len, blks, db_segcache_write(cfg, ctx->id, src, len, blks,
ctx->file_offset + ctx->g_blocks[i].offset); ctx->file_offset + ctx->g_blocks[i].offset);
/*
* Insert current segment blocks into local hashtable and do partial
* in-segment deduplication.
*/
be = NULL;
memset(htab, 0, ary_sz);
for (k=i; k<blks; k++) {
uint32_t hent;
hent = XXH32(ctx->g_blocks[k].cksum, cfg->chunk_cksum_sz, 0);
hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz;
if (htab[hent] == NULL) {
htab[hent] = &(ctx->g_blocks[k]);
ctx->g_blocks[k].offset += ctx->file_offset;
ctx->g_blocks[k].next = NULL;
be = NULL;
} else {
be = htab[hent];
do {
if (memcmp(ctx->g_blocks[k].cksum,
be->cksum, cfg->chunk_cksum_sz) == 0 &&
ctx->g_blocks[k].length == be->length) {
global_blockentry_t *en;
/*
* Block match in index was found. Update g_blocks
* array.
*/
en = &(ctx->g_blocks[k]);
en->length = (en->length | RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG;
en->offset = be->offset;
break;
}
if (be->next) {
be = be->next;
} else {
be->next = &(ctx->g_blocks[k]);
be->next->offset += ctx->file_offset;
be->next->next = NULL;
break;
}
} while(1);
}
}
/* /*
* Now lookup the similarity minhashes starting at the highest * Now lookup the similarity minhashes starting at the highest
* significance level. * significance level.
@ -873,18 +922,18 @@ process_blocks:
* Match found. Load segment metadata from disk and perform * Match found. Load segment metadata from disk and perform
* identity deduplication with the segment chunks. * identity deduplication with the segment chunks.
*/ */
memset(htab, 0, ary_sz);
offset = he->item_offset; offset = he->item_offset;
if (db_segcache_map(cfg, ctx->id, &o_blks, &offset, if (db_segcache_map(cfg, ctx->id, &o_blks, &offset,
(uchar_t *)seg_blocks) == -1) { (uchar_t **)&seg_blocks) == -1) {
fprintf(stderr, "Segment cache mmap failed.\n"); fprintf(stderr, "Segment cache mmap failed.\n");
ctx->valid = 0; ctx->valid = 0;
return (0); return (0);
} }
/* /*
* First insert all the unique blocks from the mapped segment * Now lookup loaded segment blocks in hashtable. If match is
* blocks(chunks) array into the hashtable. * found then the hashtable entry is updated to point to the
* loaded segment block.
*/ */
for (k=0; k<o_blks; k++) { for (k=0; k<o_blks; k++) {
uint32_t hent; uint32_t hent;
@ -892,90 +941,27 @@ process_blocks:
hent ^= (hent / cfg->chunk_cksum_sz); hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz; hent = hent % cfg->segment_sz;
if (htab[hent] == NULL) { if (htab[hent] != NULL) {
htab[hent] = &(seg_blocks[k]);
seg_blocks[k].offset += offset;
seg_blocks[k].next = NULL;
} else {
be = htab[hent]; be = htab[hent];
do { do {
if (be->length & RABIN_INDEX_FLAG)
goto next_ent;
if (memcmp(seg_blocks[k].cksum, if (memcmp(seg_blocks[k].cksum,
be->cksum, cfg->chunk_cksum_sz) == 0 && be->cksum, cfg->chunk_cksum_sz) == 0 &&
seg_blocks[k].length == be->length) { seg_blocks[k].length == be->length) {
be = NULL; be->length = (be->length |
RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG;
be->offset = seg_blocks[k].offset +
offset;
break; break;
} }
next_ent:
if (be->next) if (be->next)
be = be->next; be = be->next;
else else
break; break;
} while(1); } while(1);
/*
* be will be non-NULL if no match was found.
* It will the last bucket in the hash slot.
*/
if (be) {
be->next = &(seg_blocks[k]);
seg_blocks[k].offset += offset;
seg_blocks[k].next = NULL;
}
}
}
/*
* Now lookup current segment blocks(chunks) in the hashtable and
* perform the actual deduplication.
*/
be = NULL;
for (k=i; k<blks; k++) {
uint32_t hent;
if (ctx->g_blocks[k].length & RABIN_INDEX_FLAG) continue;
hent = XXH32(ctx->g_blocks[k].cksum, cfg->chunk_cksum_sz, 0);
hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz;
if (htab[hent] == NULL) {
htab[hent] = &(ctx->g_blocks[k]);
ctx->g_blocks[k].offset += ctx->file_offset;
ctx->g_blocks[k].next = NULL;
be = NULL;
} else {
be = htab[hent];
do {
if (memcmp(ctx->g_blocks[k].cksum,
be->cksum, cfg->chunk_cksum_sz) == 0 &&
ctx->g_blocks[k].length == be->length) {
break;
}
if (be->next) {
be = be->next;
} else {
be->next = &(ctx->g_blocks[k]);
be->next->offset +=
ctx->file_offset;
be->next->next = NULL;
be = NULL;
break;
}
} while(1);
}
/*
* be will be non-NULL if match was found. It will
* point to the matching bucket.
*/
if (be) {
global_blockentry_t *en;
/*
* Block match in index was found. Update g_blocks
* array.
*/
en = &(ctx->g_blocks[k]);
en->length = (en->length | RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG;
en->offset = be->offset;
} }
} }
break; break;
@ -1058,7 +1044,7 @@ process_blocks:
if (matchlen < dedupe_index_sz) { if (matchlen < dedupe_index_sz) {
DEBUG_STAT_EN(en = get_wtime_millis()); DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n", DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n",
get_mb_s(*size, strt, en_1), get_mb_s(*size, strt, en))); get_mb_s(*size, strt, en_1), get_mb_s(*size, strt, en - (w2 - w1))));
DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n")); DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n"));
ctx->valid = 0; ctx->valid = 0;
return (0); return (0);

View file

@ -137,6 +137,7 @@
#define SIMILAR_EXACT 1 #define SIMILAR_EXACT 1
#define SIMILAR_PARTIAL 2 #define SIMILAR_PARTIAL 2
#define SIMILAR_REF 3 #define SIMILAR_REF 3
#define GLOBAL_SIM_CKSUM CKSUM_CRC64
/* /*
* TYpes of delta operations. * TYpes of delta operations.