From 8ae571124d683d570d2ecefdb7d84c1a0ac0f7ce Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Thu, 18 Apr 2013 21:26:24 +0530 Subject: [PATCH] Complete implementation for Segmented Global Deduplication. --- main.c | 62 ++++---- rabin/global/dedupe_config.c | 18 ++- rabin/global/dedupe_config.h | 18 ++- rabin/global/index.c | 75 ++++----- rabin/global/index.h | 4 +- rabin/rabin_dedup.c | 289 ++++++++++++++++++++++++++++------- rabin/rabin_dedup.h | 11 +- utils/utils.c | 12 +- 8 files changed, 332 insertions(+), 157 deletions(-) diff --git a/main.c b/main.c index f64a193..3d27ded 100644 --- a/main.c +++ b/main.c @@ -1659,37 +1659,9 @@ start_compress(const char *filename, uint64_t chunksize, int level) props.cksum = cksum; props.buf_extra = 0; cread_buf = NULL; - - if (_props_func) { - _props_func(&props, level, chunksize); - if (chunksize + props.buf_extra > compressed_chunksize) { - compressed_chunksize += (chunksize + props.buf_extra - - compressed_chunksize); - } - } - flags = 0; sbuf.st_size = 0; dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler - if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { - if (enable_rabin_global) { - flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED); - dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL; - } else if (enable_rabin_scan) { - flags |= FLAG_DEDUP; - dedupe_flag = RABIN_DEDUPE_SEGMENTED; - } else { - flags |= FLAG_DEDUP_FIXED; - dedupe_flag = RABIN_DEDUPE_FIXED; - } - /* Additional scratch space for dedup arrays. */ - if (chunksize + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode) - > compressed_chunksize) { - compressed_chunksize += (chunksize + - dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode)) - - compressed_chunksize; - } - } if (encrypt_type) { uchar_t pw[MAX_PW_LEN]; @@ -1867,6 +1839,14 @@ start_compress(const char *filename, uint64_t chunksize, int level) strcpy(tmpdir, tmp); } + if (enable_rabin_global && !pipe_mode) { + my_sysinfo msys_info; + + get_sys_limits(&msys_info); + global_dedupe_bufadjust(rab_blk_size, &chunksize, 0, algo, cksum, + CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads); + } + /* * Compressed buffer size must include zlib/dedup scratch space and * chunk header space. @@ -1885,7 +1865,25 @@ start_compress(const char *filename, uint64_t chunksize, int level) compressed_chunksize); } + if (_props_func) { + _props_func(&props, level, chunksize); + if (chunksize + props.buf_extra > compressed_chunksize) { + compressed_chunksize += (chunksize + props.buf_extra - + compressed_chunksize); + } + } + if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { + if (enable_rabin_global) { + flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED); + dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL; + } else if (enable_rabin_scan) { + flags |= FLAG_DEDUP; + dedupe_flag = RABIN_DEDUPE_SEGMENTED; + } else { + flags |= FLAG_DEDUP_FIXED; + dedupe_flag = RABIN_DEDUPE_FIXED; + } /* Additional scratch space for dedup arrays. */ if (chunksize + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode) > compressed_chunksize) { @@ -1908,14 +1906,6 @@ start_compress(const char *filename, uint64_t chunksize, int level) nprocs = nthreads; fprintf(stderr, "\n"); - if (enable_rabin_global && !pipe_mode) { - my_sysinfo msys_info; - - get_sys_limits(&msys_info); - global_dedupe_bufadjust(rab_blk_size, &chunksize, 0, algo, cksum, - CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads); - } - dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *)); if ((enable_rabin_scan || enable_fixed_scan)) cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize); diff --git a/rabin/global/dedupe_config.c b/rabin/global/dedupe_config.c index 5959cbd..59d3b4f 100644 --- a/rabin/global/dedupe_config.c +++ b/rabin/global/dedupe_config.c @@ -108,7 +108,10 @@ get_compress_str(compress_algo_t algo) static cksum_t get_cksum_type(char *cksum_name) { - if (strcmp(cksum_name, "SHA256") == 0) { + if (strcmp(cksum_name, "CRC64") == 0) { + return (CKSUM_CRC64); + + } else if (strcmp(cksum_name, "SHA256") == 0) { return (CKSUM_SHA256); } else if (strcmp(cksum_name, "SHA512") == 0) { @@ -132,7 +135,10 @@ get_cksum_type(char *cksum_name) static char * get_cksum_str(cksum_t ck) { - if (ck == CKSUM_SHA256) { + if (ck == CKSUM_CRC64) { + return ("CRC64"); + + } else if (ck == CKSUM_SHA256) { return ("SHA256"); } else if (ck == CKSUM_SHA512) { @@ -156,7 +162,10 @@ get_cksum_str(cksum_t ck) static int get_cksum_sz(cksum_t ck) { - if (ck == CKSUM_SHA256 || ck == CKSUM_BLAKE256 || ck == CKSUM_KECCAK256) { + if (ck == CKSUM_CRC64) { + return (8); + + } else if (ck == CKSUM_SHA256 || ck == CKSUM_BLAKE256 || ck == CKSUM_KECCAK256) { return (32); } else if (ck == CKSUM_SHA512 || ck == CKSUM_BLAKE512 || ck == CKSUM_KECCAK512) { @@ -360,9 +369,10 @@ set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim cfg->archive_sz = file_sz; cfg->dedupe_mode = MODE_SIMILARITY; - if (cfg->archive_sz <= SIXTEEN_GB || pct_interval == 0 || pct_interval == 100) { + if (cfg->archive_sz <= SIXTEEN_GB && (pct_interval == 0 || pct_interval == 100)) { cfg->dedupe_mode = MODE_SIMPLE; cfg->segment_sz_bytes = user_chunk_sz; + cfg->similarity_cksum_sz = cfg->chunk_cksum_sz; } else if (cfg->archive_sz < ONE_TB) { cfg->segment_sz_bytes = FOUR_MB; diff --git a/rabin/global/dedupe_config.h b/rabin/global/dedupe_config.h index bd3e153..b4a722d 100644 --- a/rabin/global/dedupe_config.h +++ b/rabin/global/dedupe_config.h @@ -37,7 +37,7 @@ extern "C" { #define DEFAULT_CHUNK_CKSUM CKSUM_SHA256 #define DEFAULT_SIMILARITY_CKSUM CKSUM_BLAKE256 #define DEFAULT_COMPRESS COMPRESS_LZ4 -#define DEFAULT_PCT_INTERVAL 2 +#define DEFAULT_PCT_INTERVAL 5 #define CONTAINER_ITEMS 2048 #define MIN_CK 1 #define MAX_CK 5 @@ -71,7 +71,7 @@ typedef struct { int pct_interval; // Similarity based match intervals in %age. // The items below are computed given the above // components. - int intervals; + int intervals, sub_intervals; dedupe_mode_t dedupe_mode; uint32_t chunk_sz_bytes; // Average chunk size @@ -83,6 +83,7 @@ typedef struct { int num_containers; // Number of containers in a directory int nthreads; // Number of threads processing data segments in parallel int seg_fd_w; + uint64_t segcache_pos; uint32_t pagesize; struct seg_map_fd *seg_fd_r; // One read-only fd per thread for mapping in portions of the // segment metadata cache. @@ -90,11 +91,14 @@ typedef struct { void *dbdata; } archive_config_t; -typedef struct _segment_entry { - uint64_t chunk_offset; - uint32_t chunk_length; - uchar_t *chunk_cksum; -} segment_entry_t; +#pragma pack(1) +typedef struct global_blockentry { + uint32_t length; + uint64_t offset; + struct global_blockentry *next; // Reqd when part of a hashtable + uchar_t cksum[CKSUM_MAX_BYTES]; +} global_blockentry_t; +#pragma pack() int read_config(char *configfile, archive_config_t *cfg); int write_config(char *configfile, archive_config_t *cfg); diff --git a/rabin/global/index.c b/rabin/global/index.c index be62a41..078a210 100644 --- a/rabin/global/index.c +++ b/rabin/global/index.c @@ -108,7 +108,7 @@ int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz, int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, uint32_t *hash_slots, int *hash_entry_size, - uint32_t *intervals, uint64_t *memreqd, size_t memlimit) + uint64_t *memreqd, size_t memlimit) { int rv, set_user; @@ -158,17 +158,20 @@ set_cfg: // Compute total hashtable entries first *hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1; if (*pct_interval == 0) { - *intervals = 1; + cfg->intervals = 1; + cfg->sub_intervals = 0; *hash_slots = file_sz / cfg->chunk_sz_bytes + 1; } else if (*pct_interval == 100) { - *intervals = 1; + cfg->intervals = 1; + cfg->sub_intervals = 0; *hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size); *pct_interval = 0; } else { - *intervals = 90 / *pct_interval - 1; + cfg->intervals = 100 / *pct_interval; + cfg->sub_intervals = cfg->segment_sz / cfg->intervals; *hash_slots = file_sz / cfg->segment_sz_bytes + 1; - *hash_slots *= *intervals; + *hash_slots *= (cfg->intervals + cfg->sub_intervals); } // Compute memory required to hold all hash entries assuming worst case 50% @@ -191,7 +194,6 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch { archive_config_t *cfg; int rv, orig_pct; - float diff; uint32_t hash_slots, intervals, i; uint64_t memreqd; int hash_entry_size; @@ -204,17 +206,12 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch orig_pct = pct_interval; cfg = calloc(1, sizeof (archive_config_t)); - diff = (float)pct_interval / 90.0; rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim, - file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit); + file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit); // Reduce hash_slots to remain within memlimit while (memreqd > memlimit) { - if (pct_interval == 0) { - hash_slots--; - } else { - hash_slots -= (hash_slots * diff); - } + hash_slots--; memreqd = hash_slots * MEM_PER_UNIT(hash_entry_size); } @@ -228,23 +225,22 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch return (NULL); } + cfg->nthreads = nthreads; + intervals = cfg->intervals + cfg->sub_intervals; indx->memlimit = memlimit - (hash_entry_size << 2); indx->list = (htab_t *)calloc(intervals, sizeof (htab_t)); indx->hash_entry_size = hash_entry_size; indx->intervals = intervals; indx->hash_slots = hash_slots / intervals; - cfg->nthreads = nthreads; - cfg->intervals = intervals; for (i = 0; i < intervals; i++) { - indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals, - sizeof (hash_entry_t *)); + indx->list[i].tab = (hash_entry_t **)calloc(indx->hash_slots, sizeof (hash_entry_t *)); if (!(indx->list[i].tab)) { cleanup_indx(indx); free(cfg); return (NULL); } - indx->memused += ((hash_slots / intervals) * (sizeof (hash_entry_t *))); + indx->memused += ((indx->hash_slots) * (sizeof (hash_entry_t *))); } if (pct_interval > 0) { @@ -264,6 +260,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch cfg->seg_fd_r[i].mapping = NULL; } } + cfg->segcache_pos = 0; cfg->dbdata = indx; return (cfg); } @@ -277,23 +274,25 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch * Add new segment block list array into the metadata cache. Once added the entry is * not removed till the program exits. */ +#define SEGCACHE_HDR_SZ 12 int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset) { int64_t w; - uchar_t *hdr[16]; + uchar_t hdr[SEGCACHE_HDR_SZ]; - *((uint32_t *)hdr) = len; - *((uint32_t *)(hdr + 4)) = blknum; - *((uint32_t *)(hdr + 8)) = file_offset; + *((uint32_t *)(hdr)) = blknum; + *((uint64_t *)(hdr + 4)) = file_offset; w = Write(cfg->seg_fd_w, hdr, sizeof (hdr)); if (w < sizeof (hdr)) return (-1); + cfg->segcache_pos += w; w = Write(cfg->seg_fd_w, buf, len); if (w < len) return (-1); + cfg->segcache_pos += w; return (0); } @@ -304,20 +303,19 @@ db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, ui int db_segcache_pos(archive_config_t *cfg, int tid) { - return (lseek(cfg->seg_fd_w, 0, SEEK_CUR)); + return (cfg->segcache_pos); } /* * Mmap the requested segment metadata array. */ int -db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks) +db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t *blocks) { - uchar_t *mapbuf; - uchar_t *hdr[16]; - int64_t r; + uchar_t *mapbuf, *hdr; int fd; uint32_t len, adj; + uint64_t pos; /* * Ensure previous mapping is removed. @@ -328,22 +326,25 @@ db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offs return (-1); /* - * Read header first so that we know how much to map. + * Mmap hdr and blocks. We assume max # of rabin block entries and mmap (unless remaining + * file length is less). The header contains actual number of block entries so mmap-ing + * extra has no consequence other than address space usage. */ - r = Read(fd, hdr, sizeof (hdr)); - if (r < sizeof (hdr)) - return (-1); + len = cfg->segment_sz * sizeof (global_blockentry_t) + SEGCACHE_HDR_SZ; + pos = cfg->segcache_pos; + if (pos - *offset < len) + len = pos - *offset; - *offset += sizeof (hdr); - len = *((uint32_t *)hdr); adj = *offset % cfg->pagesize; - *blknum = *((uint32_t *)(hdr + 4)); mapbuf = mmap(NULL, len + adj, PROT_READ, MAP_SHARED, fd, *offset - adj); - if (mapbuf == MAP_FAILED) return (-1); - *offset = *((uint32_t *)(hdr + 8)); - *blocks = mapbuf + adj; + + hdr = mapbuf + adj; + *blknum = *((uint32_t *)(hdr)); + *offset = *((uint64_t *)(hdr + 4)); + memcpy(blocks, hdr + SEGCACHE_HDR_SZ, *blknum * sizeof (global_blockentry_t)); + cfg->seg_fd_r[tid].mapping = mapbuf; cfg->seg_fd_r[tid].len = len + adj; diff --git a/rabin/global/index.h b/rabin/global/index.h index 83c4862..0e4aeb6 100644 --- a/rabin/global/index.h +++ b/rabin/global/index.h @@ -45,7 +45,7 @@ archive_config_t *init_global_db(char *configfile); int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz, int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, uint32_t *hash_slots, int *hash_entry_size, - uint32_t *intervals, uint64_t *memreqd, size_t memlimit); + uint64_t *memreqd, size_t memlimit); archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, @@ -56,7 +56,7 @@ void destroy_global_db_s(archive_config_t *cfg); int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset); int db_segcache_pos(archive_config_t *cfg, int tid); -int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks); +int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t *blocks); int db_segcache_unmap(archive_config_t *cfg, int tid); #ifdef __cplusplus diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 0ccccbe..891fc1a 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -103,6 +103,7 @@ extern int bsdiff(u_char *oldbuf, bsize_t oldsize, u_char *newbuf, bsize_t newsi extern bsize_t get_bsdiff_sz(u_char *pbuf); extern int bspatch(u_char *pbuf, u_char *oldbuf, bsize_t oldsize, u_char *newbuf, bsize_t *_newsize); +extern uint64_t lzma_crc64(const uint8_t *buf, size_t size, uint64_t crc); static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; uint64_t ir[256], out[256]; @@ -133,19 +134,19 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta * to align with deduplication requirements. */ int -global_dedupe_bufadjust(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, +global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, int nthreads) { uint64_t memreqd; archive_config_t cfg; int rv, pct_i, hash_entry_size; - uint32_t intervals, hash_slots; + uint32_t hash_slots; rv = 0; pct_i = pct_interval; - rv = setup_db_config_s(&cfg, chunksize, user_chunk_sz, &pct_i, algo, ck, ck_sim, - file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit); + rv = setup_db_config_s(&cfg, rab_blk_sz, user_chunk_sz, &pct_i, algo, ck, ck_sim, + file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit); return (rv); } @@ -210,8 +211,8 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s */ get_sys_limits(&msys_info); - arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0, - algo, props->cksum, props->cksum, file_size, + arc = init_global_db_s(NULL, "/tmp", rab_blk_sz, chunksize, 0, + algo, props->cksum, CKSUM_CRC64, file_size, msys_info.freeram, props->nthreads); if (arc == NULL) { pthread_mutex_unlock(&init_lock); @@ -307,7 +308,8 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s } if (arc && dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) { - ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, arc->intervals, 32); + ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, arc->intervals + arc->sub_intervals, + arc->similarity_cksum_sz); if (!ctx->similarity_cksums) { fprintf(stderr, "Could not allocate dedupe context, out of memory\n"); @@ -436,21 +438,22 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of } if (rabin_pos == NULL) { - /* - * Initialize arrays for sketch computation. We re-use memory allocated - * for the compressed chunk temporarily. - */ - ary_sz = ctx->rabin_poly_max_block_size; - ctx_heap = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz); - /* * If global dedupe is active, the global blocks array uses temp space in * the target buffer. */ + ary_sz = 0; if (ctx->arc != NULL) { - ary_sz += (sizeof (global_blockentry_t) * (*size / ctx->rabin_poly_min_block_size + 1)); + ary_sz = (sizeof (global_blockentry_t) * (*size / ctx->rabin_poly_min_block_size + 1)); ctx->g_blocks = (global_blockentry_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz); } + + /* + * Initialize arrays for sketch computation. We re-use memory allocated + * for the compressed chunk temporarily. + */ + ary_sz += ctx->rabin_poly_max_block_size; + ctx_heap = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz); } #ifndef SSE_MODE memset(ctx->current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE); @@ -699,8 +702,9 @@ process_blocks: matchlen = 0; if (ctx->arc->dedupe_mode == MODE_SIMPLE) { - /* + /*====================================================================== * This code block implements Global Dedupe with simple in-memory index. + *====================================================================== */ /* * Now lookup blocks in index. First wait for our semaphore to be @@ -780,60 +784,89 @@ process_blocks: } else { uchar_t *seg_heap, *sim_ck; archive_config_t *cfg; - uint32_t increment, len, mapped_blks; + uint32_t increment, len, blks, o_blks, k; global_blockentry_t *seg_blocks; uint64_t seg_offset, offset; - int written; - global_blockentry_t **htab; + global_blockentry_t **htab, *be; - /* + /*====================================================================== * This code block implements Segmented similarity based Dedupe with * in-memory index. + * ====================================================================== */ cfg = ctx->arc; - seg_heap = (uchar_t *)ctx->g_blocks - cfg->segment_sz_bytes; + seg_heap = (uchar_t *)(ctx->g_blocks) - cfg->segment_sz_bytes; ary_sz = cfg->segment_sz * sizeof (global_blockentry_t **); htab = (global_blockentry_t **)(seg_heap - ary_sz); - for (i=0; isegment_sz) { - blake2b_state S1, S2; + seg_blocks = (global_blockentry_t *)(seg_heap - ary_sz - \ + cfg->segment_sz * sizeof (global_blockentry_t)); + for (i=0; isegment_sz; j++) - length += ctx->g_blocks[i].length; + blks = cfg->segment_sz; + if (blks > blknum-i) blks = blknum-i; + len = 0; + for (j=0; jg_blocks[j+i].length; + if (len > cfg->segment_sz_bytes) { + break; + } + length = len; + } + blks = j+i; /* * Compute the cumulative similarity minhashes. */ sim_ck = ctx->similarity_cksums; - increment = length / cfg->intervals; - len = increment; src = buf1 + ctx->g_blocks[i].offset; tgt = seg_heap; memcpy(tgt, src, length); - bdsp.blake2b_init(&S1, 32); - for (j=0; jintervals; j++) { + crc = 0; + increment = (length / cfg->intervals) / cfg->sub_intervals; + len = increment; + for (j=0; jsub_intervals; j++) { reset_heap(&heap, len/8); - ksmallest((int64_t *)seg_heap, length, &heap); - bdsp.blake2b_update(&S1, seg_heap + len - increment, increment); - memcpy(&S2, &S1, sizeof (S1)); - bdsp.blake2b_final(&S2, sim_ck, 32); + ksmallest((int64_t *)seg_heap, length/8, &heap); + crc = lzma_crc64(seg_heap + len - increment, increment, crc); + *((uint64_t *)sim_ck) = crc; len += increment; - sim_ck += 32; + sim_ck += cfg->similarity_cksum_sz; } + len -= increment; + increment = length / cfg->intervals; + len = increment * 2; + for (j=0; jintervals-1; j++) { + reset_heap(&heap, len/8); + ksmallest((int64_t *)seg_heap, length/8, &heap); + crc = lzma_crc64(seg_heap + len - increment, increment, crc); + *((uint64_t *)sim_ck) = crc; + len += increment; + sim_ck += cfg->similarity_cksum_sz; + } + + /* + * Begin shared index access and write segment metadata to cache + * first. + */ + if (i == 0) sem_wait(ctx->index_sem); + sim_ck -= cfg->similarity_cksum_sz; + seg_offset = db_segcache_pos(cfg, ctx->id); + src = (uchar_t *)&(ctx->g_blocks[i]); + len = blks * sizeof (global_blockentry_t); + db_segcache_write(cfg, ctx->id, src, len, blks, + ctx->file_offset + ctx->g_blocks[i].offset); + /* * Now lookup the similarity minhashes starting at the highest * significance level. */ - sim_ck -= 32; - written = 0; - increment = cfg->intervals * cfg->pct_interval; - sem_wait(ctx->index_sem); - seg_offset = db_segcache_pos(cfg, ctx->id); - for (j=cfg->intervals; j > 0; j--) { + for (j=cfg->intervals + cfg->sub_intervals; j > 0; j--) { hash_entry_t *he; he = db_lookup_insert_s(cfg, sim_ck, j-1, seg_offset, 0, 1); @@ -844,35 +877,181 @@ process_blocks: */ memset(htab, 0, ary_sz); offset = he->item_offset; - if (db_segcache_map(cfg, ctx->id, &mapped_blks, &offset, - (uchar_t **)&seg_blocks) == -1) { + if (db_segcache_map(cfg, ctx->id, &o_blks, &offset, + (uchar_t *)seg_blocks) == -1) { fprintf(stderr, "Segment cache mmap failed.\n"); ctx->valid = 0; return (0); } - - if (increment > 70) { - j = cfg->intervals - j; + + /* + * First insert all the unique blocks from the mapped segment + * blocks(chunks) array into the hashtable. + */ + for (k=0; kchunk_cksum_sz, 0); + hent ^= (hent / cfg->chunk_cksum_sz); + hent = hent % cfg->segment_sz; + + if (htab[hent] == NULL) { + htab[hent] = &(seg_blocks[k]); + seg_blocks[k].offset += offset; + seg_blocks[k].next = NULL; + } else { + be = htab[hent]; + do { + if (memcmp(seg_blocks[k].cksum, + be->cksum, cfg->chunk_cksum_sz) == 0 && + seg_blocks[k].length == be->length) { + be = NULL; + break; + } + if (be->next) + be = be->next; + else + break; + } while(1); + + /* + * be will be non-NULL if no match was found. + * It will the last bucket in the hash slot. + */ + if (be) { + be->next = &(seg_blocks[k]); + seg_blocks[k].offset += offset; + seg_blocks[k].next = NULL; + } + } } - } else if (!written) { - if (blknum - i >= cfg->segment_sz) { - db_segcache_write(cfg, ctx->id, src, len, cfg->segment_sz, - ctx->file_offset); - } else { - db_segcache_write(cfg, ctx->id, src, len, blknum-i, - ctx->file_offset); + + /* + * Now lookup current segment blocks(chunks) in the hashtable and + * perform the actual deduplication. + */ + be = NULL; + for (k=i; kg_blocks[k].length & RABIN_INDEX_FLAG) continue; + hent = XXH32(ctx->g_blocks[k].cksum, cfg->chunk_cksum_sz, 0); + hent ^= (hent / cfg->chunk_cksum_sz); + hent = hent % cfg->segment_sz; + + if (htab[hent] == NULL) { + htab[hent] = &(ctx->g_blocks[k]); + ctx->g_blocks[k].offset += ctx->file_offset; + ctx->g_blocks[k].next = NULL; + be = NULL; + } else { + be = htab[hent]; + do { + if (memcmp(ctx->g_blocks[k].cksum, + be->cksum, cfg->chunk_cksum_sz) == 0 && + ctx->g_blocks[k].length == be->length) { + break; + } + if (be->next) { + be = be->next; + } else { + be->next = &(ctx->g_blocks[k]); + be->next->offset += + ctx->file_offset; + be->next->next = NULL; + be = NULL; + break; + } + } while(1); + } + + /* + * be will be non-NULL if match was found. It will + * point to the matching bucket. + */ + if (be) { + global_blockentry_t *en; + /* + * Block match in index was found. Update g_blocks + * array. + */ + en = &(ctx->g_blocks[k]); + en->length = (en->length | RABIN_INDEX_FLAG) & + CLEAR_SIMILARITY_FLAG; + en->offset = be->offset; + } } - written = 1; + break; } - increment -= cfg->pct_interval; - sim_ck -= 32; + sim_ck -= cfg->similarity_cksum_sz; } + i += blks; } /* * Signal the next thread in sequence to access the index. */ sem_post(ctx->index_sem_next); + + /*====================================================================== + * Finally scan the blocks array and update dedupe index. + *====================================================================== + */ + length = 0; + for (i=0; ig_blocks[i].length & RABIN_INDEX_FLAG)) { + /* + * Block match in index not found. + * Block was added to index. Merge this block. + */ + if (length + ctx->g_blocks[i].length > RABIN_MAX_BLOCK_SIZE) { + *((uint32_t *)g_dedupe_idx) = LE32(length); + g_dedupe_idx += RABIN_ENTRY_SIZE; + length = 0; + dedupe_index_sz++; + } + length += ctx->g_blocks[i].length; + } else { + /* + * Block match in index was found. + */ + if (length > 0) { + /* + * Write pending accumulated block length value. + */ + *((uint32_t *)g_dedupe_idx) = LE32(length); + g_dedupe_idx += RABIN_ENTRY_SIZE; + length = 0; + dedupe_index_sz++; + } + /* + * Add a reference entry to the dedupe array. + */ + *((uint32_t *)g_dedupe_idx) = LE32(ctx->g_blocks[i].length); + g_dedupe_idx += RABIN_ENTRY_SIZE; + *((uint64_t *)g_dedupe_idx) = LE64(ctx->g_blocks[i].offset); + g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); + matchlen += (ctx->g_blocks[i].length & RABIN_INDEX_VALUE); + dedupe_index_sz += 3; + } + } + + /* + * Write final pending block length value (if any). + */ + if (length > 0) { + *((uint32_t *)g_dedupe_idx) = LE32(length); + g_dedupe_idx += RABIN_ENTRY_SIZE; + length = 0; + dedupe_index_sz++; + } + + blknum = dedupe_index_sz; // Number of entries in block list + tgt = g_dedupe_idx; + g_dedupe_idx = ctx->cbuf + RABIN_HDR_SIZE; + dedupe_index_sz = tgt - g_dedupe_idx; + src = buf1; + g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); } /* diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index ae10534..9745f38 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -163,15 +163,6 @@ typedef struct rab_blockentry { struct rab_blockentry *next; } rabin_blockentry_t; -#pragma pack(1) -typedef struct global_blockentry { - uint32_t length; - uint64_t offset; - struct global_blockentry *next; // Reqd when part of a hashtable - uchar_t cksum[CKSUM_MAX_BYTES]; -} global_blockentry_t; -#pragma pack() - typedef struct { unsigned char *current_window_data; rabin_blockentry_t **blocks; @@ -212,7 +203,7 @@ extern void update_dedupe_hdr(uchar_t *buf, uint64_t dedupe_index_sz_cmp, extern void reset_dedupe_context(dedupe_context_t *ctx); extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta_flag); -extern int global_dedupe_bufadjust(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, +extern int global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, int nthreads); diff --git a/utils/utils.c b/utils/utils.c index dac3640..074ed77 100644 --- a/utils/utils.c +++ b/utils/utils.c @@ -396,13 +396,13 @@ get_sys_limits(my_sysinfo *msys_info) */ mem = strtoull(val, NULL, 0); mem *= (1024 * 1024); - if (mem > (1024 * 1024) && mem < msys_info->freeram) { + if (mem >= (1024 * 1024) && mem < msys_info->freeram) { msys_info->freeram = mem; } + } else { + /* + * Use a maximum of approx 75% of free RAM for the index(if limit was not specified). + */ + msys_info->freeram = (msys_info->freeram >> 1) + (msys_info->freeram >> 2); } - - /* - * Use a maximum of approx 75% of free RAM for the index. - */ - msys_info->freeram = (msys_info->freeram >> 1) + (msys_info->freeram >> 2); }