From 50251107de732bd2000dbd57236212f3909d1d61 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Tue, 9 Apr 2013 22:23:51 +0530 Subject: [PATCH] Work in progress changes for Segmented Global Deduplication. --- Makefile.in | 4 +- main.c | 8 ++ rabin/global/dedupe_config.c | 2 +- rabin/global/{db.c => index.c} | 11 +- rabin/global/{db.h => index.h} | 4 +- rabin/rabin_dedup.c | 189 +++++++++++++++++++-------------- rabin/rabin_dedup.h | 10 +- utils/utils.c | 21 +++- utils/utils.h | 2 +- 9 files changed, 157 insertions(+), 94 deletions(-) rename rabin/global/{db.c => index.c} (97%) rename rabin/global/{db.h => index.h} (98%) diff --git a/Makefile.in b/Makefile.in index 0e593e7..e51ba12 100644 --- a/Makefile.in +++ b/Makefile.in @@ -84,8 +84,8 @@ BZLIB_HDRS = $(MAINHDRS) BZLIB_OBJS = $(BZLIB_SRCS:.c=.o) BZLIB_CPPFLAGS = @LIBBZ2_INC@ -RABINSRCS = rabin/rabin_dedup.c rabin/global/db.c rabin/global/dedupe_config.c -RABINHDRS = rabin/rabin_dedup.h utils/utils.h rabin/global/db.h rabin/global/dedupe_config.h +RABINSRCS = rabin/rabin_dedup.c rabin/global/index.c rabin/global/dedupe_config.c +RABINHDRS = rabin/rabin_dedup.h utils/utils.h rabin/global/index.h rabin/global/dedupe_config.h RABINOBJS = $(RABINSRCS:.c=.o) BSDIFFSRCS = bsdiff/bsdiff.c bsdiff/bspatch.c bsdiff/rle_encoder.c diff --git a/main.c b/main.c index f39a566..f64a193 100644 --- a/main.c +++ b/main.c @@ -1908,6 +1908,14 @@ start_compress(const char *filename, uint64_t chunksize, int level) nprocs = nthreads; fprintf(stderr, "\n"); + if (enable_rabin_global && !pipe_mode) { + my_sysinfo msys_info; + + get_sys_limits(&msys_info); + global_dedupe_bufadjust(rab_blk_size, &chunksize, 0, algo, cksum, + CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads); + } + dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *)); if ((enable_rabin_scan || enable_fixed_scan)) cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize); diff --git a/rabin/global/dedupe_config.c b/rabin/global/dedupe_config.c index 688b9a3..5959cbd 100644 --- a/rabin/global/dedupe_config.c +++ b/rabin/global/dedupe_config.c @@ -36,7 +36,7 @@ #include #include "dedupe_config.h" -#include "db.h" +#include "index.h" static compress_algo_t get_compress_level(compress_algo_t algo) diff --git a/rabin/global/db.c b/rabin/global/index.c similarity index 97% rename from rabin/global/db.c rename to rabin/global/index.c index 63759b8..3333694 100644 --- a/rabin/global/db.c +++ b/rabin/global/index.c @@ -36,7 +36,7 @@ #include #include -#include "db.h" +#include "index.h" /* * Hashtable structures for in-memory index. @@ -117,9 +117,12 @@ setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chun * If file_sz = 0 and pct_interval != 0 then we are in pipe mode and want a segmented * index. This is typically for WAN deduplication of large data transfers. */ + if (pct_interval != 0) + set_user = 0; + else + set_user = 1; if (file_sz == 0 && *pct_interval == 0) *pct_interval = 100; - set_user = 0; set_cfg: rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, *user_chunk_sz, *pct_interval); @@ -131,11 +134,11 @@ set_cfg: } /* - * Adjust user_chunk_sz if this is the second try. + * Adjust user_chunk_sz if indicated. */ if (set_user) { if (*user_chunk_sz < cfg->segment_sz_bytes) { - *user_chunk_sz = cfg->segment_sz_bytes; + *user_chunk_sz = cfg->segment_sz_bytes + (cfg->segment_sz_bytes >> 1); } else { *user_chunk_sz = (*user_chunk_sz / cfg->segment_sz_bytes) * cfg->segment_sz_bytes; } diff --git a/rabin/global/db.h b/rabin/global/index.h similarity index 98% rename from rabin/global/db.h rename to rabin/global/index.h index 6d55579..efc9355 100644 --- a/rabin/global/db.h +++ b/rabin/global/index.h @@ -22,8 +22,8 @@ * moinakg@belenix.org, http://moinakg.wordpress.com/ */ -#ifndef _DB_H -#define _DB_H +#ifndef _INDEX_H +#define _INDEX_H #include diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 2fcefe6..56b399e 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -131,7 +131,7 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta * to align with deduplication requirements. */ int -global_dedupe_chkmem(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, +global_dedupe_bufadjust(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, int nthreads) { @@ -202,30 +202,12 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s */ if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) { my_sysinfo msys_info; - char *val; /* - * Get available free memory. + * Get amount of memory to use. The freeram got here is adjusted amount. */ - get_sysinfo(&msys_info); + get_sys_limits(&msys_info); - if ((val = getenv("PCOMPRESS_INDEX_MEM")) != NULL) { - uint64_t mem; - - /* - * Externally specified index limit in MB. - */ - mem = strtoull(val, NULL, 0); - mem *= (1024 * 1024); - if (mem > (1024 * 1024) && mem < msys_info.freeram) { - msys_info.freeram = mem; - } - } - - /* - * Use a maximum of approx 75% of free RAM for the index. - */ - msys_info.freeram = (msys_info.freeram >> 1) + (msys_info.freeram >> 2); arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0, algo, props->cksum, props->cksum, file_size, msys_info.freeram, props->nthreads); @@ -270,6 +252,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s ctx->delta_flag = 0; ctx->deltac_min_distance = props->deltac_min_distance; ctx->pagesize = sysconf(_SC_PAGE_SIZE); + ctx->similarity_cksums = NULL; /* * Scale down similarity percentage based on avg block size unless user specified @@ -318,6 +301,17 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s return (NULL); } + if (arc && dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) { + ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, 90 / arc->pct_interval, 32); + if (!ctx->similarity_cksums) { + fprintf(stderr, + "Could not allocate dedupe context, out of memory\n"); + destroy_dedupe_context(ctx); + return (NULL); + } + ctx->similarity_count = 90 / arc->pct_interval; + } + ctx->lzma_data = NULL; ctx->level = 14; if (real_chunksize > 0) { @@ -371,6 +365,7 @@ destroy_dedupe_context(dedupe_context_t *ctx) } slab_free(NULL, ctx->blocks); } + if (ctx->similarity_cksums) slab_free(NULL, ctx->similarity_cksums); if (ctx->lzma_data) lzma_deinit(&(ctx->lzma_data)); slab_free(NULL, ctx); } @@ -670,36 +665,39 @@ process_blocks: * If global dedupe is enabled then process it here. */ if (ctx->arc) { - if (ctx->arc->dedupe_mode == MODE_SIMPLE) { - uchar_t *g_dedupe_idx, *tgt, *src; - /* - * First compute all the rabin chunk/block cryptographic hashes. - */ + uchar_t *g_dedupe_idx, *tgt, *src; + + /* + * First compute all the rabin chunk/block cryptographic hashes. + */ #if defined(_OPENMP) # pragma omp parallel for if (mt) #endif - for (i=0; ig_blocks[i].cksum, - ctx->arc->chunk_cksum_type, buf1+ctx->g_blocks[i].offset, - ctx->g_blocks[i].length, 0, 0); - } + for (i=0; ig_blocks[i].cksum, + ctx->arc->chunk_cksum_type, buf1+ctx->g_blocks[i].offset, + ctx->g_blocks[i].length, 0, 0); + } + /* + * Index table within this segment. + */ + g_dedupe_idx = ctx->cbuf + RABIN_HDR_SIZE; + dedupe_index_sz = 0; + + /* + * First entry in table is the original file offset where this + * data segment begins. + */ + *((uint64_t *)g_dedupe_idx) = LE64(ctx->file_offset); + g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); + dedupe_index_sz += 2; + matchlen = 0; + + if (ctx->arc->dedupe_mode == MODE_SIMPLE) { /* - * Index table within this segment. + * This code block implements Global Dedupe with simple in-memory index. */ - g_dedupe_idx = ctx->cbuf + RABIN_HDR_SIZE; - dedupe_index_sz = 0; - - /* - * First entry in table in the original file offset where this - * data segment begins. - */ - *((uint64_t *)g_dedupe_idx) = LE64(ctx->file_offset); - g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); - dedupe_index_sz += 2; - length = 0; - matchlen = 0; - /* * Now lookup blocks in index. First wait for our semaphore to be * signaled. If the previous thread in sequence is using the index @@ -707,6 +705,7 @@ process_blocks: * predictable serialization of index access in a sequence of * threads without locking. */ + length = 0; sem_wait(ctx->index_sem); for (i=0; ivalid = 0; - return (0); - } + } else { + uchar_t *seg_heap, *sim_ck; + archive_config_t *cfg; + uint32_t increment, len; /* - * Now copy the block data; + * This code block implements Segmented similarity based Dedupe with + * in-memory index. */ - for (i=0; iarc; + seg_heap = (uchar_t *)ctx->g_blocks - cfg->segment_sz_bytes; + for (i=0; isegment_sz) { + length = 0; - j = length & RABIN_INDEX_FLAG; - length = length & RABIN_INDEX_VALUE; - if (!j) { - memcpy(tgt, src, length); - tgt += length; - src += length; - } else { - src += length; - g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); - i += 2; + /* + * Compute length of current segment. + */ + for (j=i; jsegment_sz; j++) + length += ctx->g_blocks[i].length; + + /* + * Compute the cumulative similarity minhashes. + */ + sim_ck = ctx->similarity_cksums; + increment = length / ctx->similarity_count; + len = increment; + src = buf1 + ctx->g_blocks[i].offset; + tgt = seg_heap; + memcpy(tgt, src, length); + for (j=0; jsimilarity_count; j++) { + reset_heap(&heap, len/8); + ksmallest((int64_t *)seg_heap, length, &heap); + compute_checksum(sim_ck, CKSUM_BLAKE256, seg_heap, len, 0, 0); + len += increment; + sim_ck += 32; } } - pos1 = tgt - ctx->cbuf; - blknum |= GLOBAL_FLAG; - - } else { - /* - * Segmented similarity based Dedupe. - */ - } + + /* + * Deduplication reduction should at least be greater than block list metadata. + */ + if (matchlen < dedupe_index_sz) { + DEBUG_STAT_EN(en = get_wtime_millis()); + DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n", + get_mb_s(*size, strt, en_1), get_mb_s(*size, strt, en))); + DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n")); + ctx->valid = 0; + return (0); + } + + /* + * Now copy the block data; + */ + for (i=0; icbuf; + blknum |= GLOBAL_FLAG; goto dedupe_done; } diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index 9e7fadc..d109771 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -64,7 +64,7 @@ #define _RABIN_POLY_H_ #include "utils.h" -#include +#include #include #include #include @@ -188,6 +188,8 @@ typedef struct { archive_config_t *arc; sem_t *index_sem; sem_t *index_sem_next; + uchar_t *similarity_cksums; + int similarity_count; uint32_t pagesize; int out_fd; int id; @@ -208,8 +210,8 @@ extern void update_dedupe_hdr(uchar_t *buf, uint64_t dedupe_index_sz_cmp, extern void reset_dedupe_context(dedupe_context_t *ctx); extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta_flag); -extern int global_dedupe_chkmem(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, - const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, - size_t memlimit, int nthreads); +extern int global_dedupe_bufadjust(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, + const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, + size_t memlimit, int nthreads); #endif /* _RABIN_POLY_H_ */ diff --git a/utils/utils.c b/utils/utils.c index 5060ccc..dac3640 100644 --- a/utils/utils.c +++ b/utils/utils.c @@ -371,10 +371,11 @@ get_mb_s(uint64_t bytes, double strt, double en) } void -get_sysinfo(my_sysinfo *msys_info) +get_sys_limits(my_sysinfo *msys_info) { struct sysinfo sys_info; int rv; + char *val; rv = sysinfo(&sys_info); @@ -386,4 +387,22 @@ get_sysinfo(my_sysinfo *msys_info) msys_info->totalswap = sys_info.totalswap; msys_info->freeswap = sys_info.freeswap; msys_info->mem_unit = sys_info.mem_unit; + + if ((val = getenv("PCOMPRESS_INDEX_MEM")) != NULL) { + uint64_t mem; + + /* + * Externally specified index limit in MB. + */ + mem = strtoull(val, NULL, 0); + mem *= (1024 * 1024); + if (mem > (1024 * 1024) && mem < msys_info->freeram) { + msys_info->freeram = mem; + } + } + + /* + * Use a maximum of approx 75% of free RAM for the index. + */ + msys_info->freeram = (msys_info->freeram >> 1) + (msys_info->freeram >> 2); } diff --git a/utils/utils.h b/utils/utils.h index c2ec2a4..d121551 100644 --- a/utils/utils.h +++ b/utils/utils.h @@ -217,7 +217,7 @@ extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs, extern uint64_t get_total_ram(); extern double get_wtime_millis(void); extern double get_mb_s(uint64_t bytes, double strt, double en); -extern void get_sysinfo(my_sysinfo *msys_info); +extern void get_sys_limits(my_sysinfo *msys_info); extern void init_algo_props(algo_props_t *props); extern void init_pcompress();