diff --git a/main.c b/main.c index ef25835..f39a566 100644 --- a/main.c +++ b/main.c @@ -677,7 +677,7 @@ start_decompress(const char *filename, const char *to_filename) int uncompfd = -1, err, np, bail; int nprocs = 1, thread = 0, level; unsigned int i; - short version, flags; + unsigned short version, flags; int64_t chunksize, compressed_chunksize; struct cmp_data **dary, *tdat; pthread_t writer_thr; @@ -1644,7 +1644,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) char to_filename[MAXPATHLEN]; uint64_t compressed_chunksize, n_chunksize, file_offset; int64_t rbytes, rabin_count; - short version, flags; + unsigned short version, flags; struct stat sbuf; int compfd = -1, uncompfd = -1, err; int thread, bail, single_chunk; @@ -1655,21 +1655,9 @@ start_compress(const char *filename, uint64_t chunksize, int level) dedupe_context_t *rctx; algo_props_t props; - /* - * Compressed buffer size must include zlib/dedup scratch space and - * chunk header space. - * See http://www.zlib.net/manual.html#compress2 - * - * We do this unconditionally whether user mentioned zlib or not - * to keep it simple. While zlib scratch space is only needed at - * runtime, chunk header is stored in the file. - * - * See start_decompress() routine for details of chunk header. - * We also keep extra 8-byte space for the last chunk's size. - */ - compressed_chunksize = chunksize + CHUNK_HDR_SZ + zlib_buf_extra(chunksize); init_algo_props(&props); props.cksum = cksum; + props.buf_extra = 0; cread_buf = NULL; if (_props_func) { @@ -1757,9 +1745,6 @@ start_compress(const char *filename, uint64_t chunksize, int level) thread = 0; single_chunk = 0; rctx = NULL; - slab_cache_add(chunksize); - slab_cache_add(compressed_chunksize); - slab_cache_add(sizeof (struct cmp_data)); nprocs = sysconf(_SC_NPROCESSORS_ONLN); if (nthreads > 0 && nthreads < nprocs) @@ -1798,6 +1783,18 @@ start_compress(const char *filename, uint64_t chunksize, int level) single_chunk = 1; props.is_single_chunk = 1; flags |= FLAG_SINGLE_CHUNK; + + /* + * Switch to simple Deduplication if global is enabled. + */ + if (enable_rabin_global) { + unsigned short flg; + enable_rabin_scan = 1; + enable_rabin_global = 0; + dedupe_flag = RABIN_DEDUPE_SEGMENTED; + flg = FLAG_DEDUP_FIXED; + flags &= ~flg; + } } else { if (nthreads == 0 || nthreads > sbuf.st_size / chunksize) { nthreads = sbuf.st_size / chunksize; @@ -1870,6 +1867,38 @@ start_compress(const char *filename, uint64_t chunksize, int level) strcpy(tmpdir, tmp); } + /* + * Compressed buffer size must include zlib/dedup scratch space and + * chunk header space. + * See http://www.zlib.net/manual.html#compress2 + * + * We do this unconditionally whether user mentioned zlib or not + * to keep it simple. While zlib scratch space is only needed at + * runtime, chunk header is stored in the file. + * + * See start_decompress() routine for details of chunk header. + * We also keep extra 8-byte space for the last chunk's size. + */ + compressed_chunksize = chunksize + CHUNK_HDR_SZ + zlib_buf_extra(chunksize); + if (chunksize + props.buf_extra > compressed_chunksize) { + compressed_chunksize += (chunksize + props.buf_extra - + compressed_chunksize); + } + + if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { + /* Additional scratch space for dedup arrays. */ + if (chunksize + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode) + > compressed_chunksize) { + compressed_chunksize += (chunksize + + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode)) - + compressed_chunksize; + } + } + + slab_cache_add(chunksize); + slab_cache_add(compressed_chunksize); + slab_cache_add(sizeof (struct cmp_data)); + if (encrypt_type) flags |= encrypt_type; diff --git a/rabin/global/db.c b/rabin/global/db.c index 11d2ac5..63759b8 100644 --- a/rabin/global/db.c +++ b/rabin/global/db.c @@ -88,17 +88,18 @@ static cleanup_indx(index_t *indx) } } -#define MEM_PER_UNIT ( (hash_entry_size + sizeof (hash_entry_t *) + \ +#define MEM_PER_UNIT(ent_sz) ( (ent_sz + sizeof (hash_entry_t *) + \ (sizeof (hash_entry_t *)) / 2) + sizeof (hash_entry_t **) ) +#define MEM_REQD(hslots, ent_sz) (hslots * MEM_PER_UNIT(ent_sz)) +#define SLOTS_FOR_MEM(memlimit, ent_sz) (memlimit / MEM_PER_UNIT(ent_sz) - 5) -archive_config_t * -init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz, - int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, - size_t file_sz, size_t memlimit, int nthreads) +int +setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz, + int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, + size_t file_sz, uint32_t *hash_slots, int *hash_entry_size, + uint32_t *intervals, uint64_t *memreqd, size_t memlimit) { - archive_config_t *cfg; - int rv; - float diff; + int rv, set_user; /* * file_sz = 0 and pct_interval = 0 means we are in pipe mode and want a simple @@ -106,110 +107,146 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch * for the simple index. * * If file_sz != 0 but pct_interval = 0 then we need to create a simple index - * sized for the given file. + * sized for the given file. If the available memory is not sufficient for a full + * index and required index size is 1.25x of availble mem then switch to a + * segmented index. * - * If file_sz = 0 and pct_interval = 100 then we are in pipe mode and want a segmented + * If file_sz != 0 and pct_interval != 0 then we explicitly want to create a segmented + * index. This option is auto-selected to support the previous behavior. + * + * If file_sz = 0 and pct_interval != 0 then we are in pipe mode and want a segmented * index. This is typically for WAN deduplication of large data transfers. */ - if (file_sz == 0 && pct_interval == 0) - pct_interval = 100; + if (file_sz == 0 && *pct_interval == 0) + *pct_interval = 100; + set_user = 0; - cfg = calloc(1, sizeof (archive_config_t)); - rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, user_chunk_sz, pct_interval); +set_cfg: + rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, *user_chunk_sz, *pct_interval); if (cfg->dedupe_mode == MODE_SIMPLE) { - if (pct_interval != 100) - pct_interval = 0; + if (*pct_interval != 100) + *pct_interval = 0; cfg->pct_interval = 0; } + /* + * Adjust user_chunk_sz if this is the second try. + */ + if (set_user) { + if (*user_chunk_sz < cfg->segment_sz_bytes) { + *user_chunk_sz = cfg->segment_sz_bytes; + } else { + *user_chunk_sz = (*user_chunk_sz / cfg->segment_sz_bytes) * cfg->segment_sz_bytes; + } + } + + // Compute total hashtable entries first + *hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1; + if (*pct_interval == 0) { + *intervals = 1; + *hash_slots = file_sz / cfg->chunk_sz_bytes + 1; + + } else if (*pct_interval == 100) { + *intervals = 1; + *hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size); + *pct_interval = 0; + } else { + *intervals = 100 / *pct_interval - 1; + *hash_slots = file_sz / cfg->segment_sz_bytes + 1; + *hash_slots *= *intervals; + } + + // Compute memory required to hold all hash entries assuming worst case 50% + // occupancy. + *memreqd = MEM_REQD(*hash_slots, *hash_entry_size); + + if (*memreqd > (memlimit + (memlimit >> 2)) && cfg->dedupe_mode == MODE_SIMPLE && + *pct_interval == 0) { + *pct_interval = DEFAULT_PCT_INTERVAL; + set_user = 1; + goto set_cfg; + } + return (rv); +} + +archive_config_t * +init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz, + int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, + size_t file_sz, size_t memlimit, int nthreads) +{ + archive_config_t *cfg; + int rv, orig_pct; + float diff; + uint32_t hash_slots, intervals, i; + uint64_t memreqd; + int hash_entry_size; + index_t *indx; + if (path != NULL) { fprintf(stderr, "Disk based index not yet implemented.\n"); + return (NULL); + } + orig_pct = pct_interval; + cfg = calloc(1, sizeof (archive_config_t)); + + diff = (float)pct_interval / 100.0; + rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim, + file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit); + + // Reduce hash_slots to remain within memlimit + while (memreqd > memlimit) { + if (pct_interval == 0) { + hash_slots--; + } else { + hash_slots -= (hash_slots * diff); + } + memreqd = hash_slots * MEM_PER_UNIT(hash_entry_size); + } + + // Now create as many hash tables as there are similarity match intervals + // each having hash_slots / intervals slots. + indx = calloc(1, sizeof (index_t)); + if (!indx) { free(cfg); return (NULL); - } else { - uint32_t hash_slots, intervals, i; - uint64_t memreqd; - int hash_entry_size; - index_t *indx; + } - hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1; + indx->memlimit = memlimit - (hash_entry_size << 2); + indx->list = (htab_t *)calloc(intervals, sizeof (htab_t)); + indx->hash_entry_size = hash_entry_size; + indx->intervals = intervals; + indx->hash_slots = hash_slots / intervals; + cfg->nthreads = nthreads; - // Compute total hashtable entries first - if (pct_interval == 0) { - intervals = 1; - hash_slots = file_sz / cfg->chunk_sz_bytes + 1; - - } else if (pct_interval == 100) { - intervals = 1; - hash_slots = memlimit / MEM_PER_UNIT - 5; - pct_interval = 0; - } else { - intervals = 100 / pct_interval - 1; - hash_slots = file_sz / cfg->segment_sz_bytes + 1; - hash_slots *= intervals; - } - - // Compute memory required to hold all hash entries assuming worst case 50% - // occupancy. - memreqd = hash_slots * MEM_PER_UNIT; - diff = (float)pct_interval / 100.0; - - // Reduce hash_slots to remain within memlimit - while (memreqd > memlimit) { - if (pct_interval == 0) { - hash_slots--; - } else { - hash_slots -= (hash_slots * diff); - } - memreqd = hash_slots * MEM_PER_UNIT; - } - - // Now create as many hash tables as there are similarity match intervals - // each having hash_slots / intervals slots. - indx = calloc(1, sizeof (index_t)); - if (!indx) { + for (i = 0; i < intervals; i++) { + indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals, + sizeof (hash_entry_t *)); + if (!(indx->list[i].tab)) { + cleanup_indx(indx); free(cfg); return (NULL); } - - indx->memlimit = memlimit - (hash_entry_size << 2); - indx->list = (htab_t *)calloc(intervals, sizeof (htab_t)); - indx->hash_entry_size = hash_entry_size; - indx->intervals = intervals; - indx->hash_slots = hash_slots / intervals; - cfg->nthreads = nthreads; - - for (i = 0; i < intervals; i++) { - indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals, - sizeof (hash_entry_t *)); - if (!(indx->list[i].tab)) { - cleanup_indx(indx); - free(cfg); - return (NULL); - } - indx->memused += ((hash_slots / intervals) * (sizeof (hash_entry_t *))); - } - - if (pct_interval > 0) { - strcpy(cfg->rootdir, tmppath); - strcat(cfg->rootdir, "/.segXXXXXX"); - cfg->seg_fd_w = mkstemp(cfg->rootdir); - cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads); - if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) { - cleanup_indx(indx); - if (cfg->seg_fd_r) - free(cfg->seg_fd_r); - free(cfg); - return (NULL); - } - - for (i = 0; i < nthreads; i++) { - cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY); - } - } - cfg->dbdata = indx; + indx->memused += ((hash_slots / intervals) * (sizeof (hash_entry_t *))); } + + if (pct_interval > 0) { + strcpy(cfg->rootdir, tmppath); + strcat(cfg->rootdir, "/.segXXXXXX"); + cfg->seg_fd_w = mkstemp(cfg->rootdir); + cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads); + if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) { + cleanup_indx(indx); + if (cfg->seg_fd_r) + free(cfg->seg_fd_r); + free(cfg); + return (NULL); + } + for (i = 0; i < nthreads; i++) { + cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY); + } + } + cfg->dbdata = indx; return (cfg); } diff --git a/rabin/global/db.h b/rabin/global/db.h index 58285fb..6d55579 100644 --- a/rabin/global/db.h +++ b/rabin/global/db.h @@ -42,6 +42,10 @@ typedef struct _hash_entry { } hash_entry_t; archive_config_t *init_global_db(char *configfile); +int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz, + int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, + size_t file_sz, uint32_t *hash_slots, int *hash_entry_size, + uint32_t *intervals, uint64_t *memreqd, size_t memlimit); archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, diff --git a/rabin/global/dedupe_config.h b/rabin/global/dedupe_config.h index 996e1f8..d75cb55 100644 --- a/rabin/global/dedupe_config.h +++ b/rabin/global/dedupe_config.h @@ -37,6 +37,7 @@ extern "C" { #define DEFAULT_CHUNK_CKSUM CKSUM_SHA256 #define DEFAULT_SIMILARITY_CKSUM CKSUM_BLAKE256 #define DEFAULT_COMPRESS COMPRESS_LZ4 +#define DEFAULT_PCT_INTERVAL 2 #define CONTAINER_ITEMS 2048 #define MIN_CK 1 #define MAX_CK 5 @@ -77,6 +78,7 @@ typedef struct { int seg_fd_w; int *seg_fd_r; // One read-only fd per thread for mapping in portions of the // segment metadata cache. + int valid; void *dbdata; } archive_config_t; diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 33e922e..2fcefe6 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -126,6 +126,27 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta return ((chunksize / dedupe_min_blksz(rab_blk_sz)) * sizeof (uint32_t)); } +/* + * Helper function to let caller size the the user specific compression chunk/segment + * to align with deduplication requirements. + */ +int +global_dedupe_chkmem(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, + const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, + size_t memlimit, int nthreads) +{ + uint64_t memreqd; + archive_config_t cfg; + int rv, pct_i, hash_entry_size; + uint32_t intervals, hash_slots; + + rv = 0; + pct_i = pct_interval; + rv = setup_db_config_s(&cfg, chunksize, user_chunk_sz, &pct_i, algo, ck, ck_sim, + file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit); + return (rv); +} + /* * Initialize the algorithm with the default params. */ @@ -181,12 +202,26 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s */ if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) { my_sysinfo msys_info; + char *val; /* * Get available free memory. */ get_sysinfo(&msys_info); + if ((val = getenv("PCOMPRESS_INDEX_MEM")) != NULL) { + uint64_t mem; + + /* + * Externally specified index limit in MB. + */ + mem = strtoull(val, NULL, 0); + mem *= (1024 * 1024); + if (mem > (1024 * 1024) && mem < msys_info.freeram) { + msys_info.freeram = mem; + } + } + /* * Use a maximum of approx 75% of free RAM for the index. */ @@ -773,6 +808,12 @@ process_blocks: } pos1 = tgt - ctx->cbuf; blknum |= GLOBAL_FLAG; + + } else { + /* + * Segmented similarity based Dedupe. + */ + } goto dedupe_done; } diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index c84784a..9e7fadc 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -127,7 +127,6 @@ #define RABIN_DEDUPE_SEGMENTED 0 #define RABIN_DEDUPE_FIXED 1 #define RABIN_DEDUPE_FILE_GLOBAL 2 -#define DEFAULT_PCT_INTERVAL 2 // Mask to extract value from a rabin index entry #define RABIN_INDEX_VALUE (0x3FFFFFFFUL) @@ -202,12 +201,15 @@ extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf, uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt); extern void dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size); extern void parse_dedupe_hdr(uchar_t *buf, unsigned int *blknum, uint64_t *dedupe_index_sz, - uint64_t *dedupe_data_sz, uint64_t *rabin_index_sz_cmp, - uint64_t *dedupe_data_sz_cmp, uint64_t *deduped_size); + uint64_t *dedupe_data_sz, uint64_t *rabin_index_sz_cmp, + uint64_t *dedupe_data_sz_cmp, uint64_t *deduped_size); extern void update_dedupe_hdr(uchar_t *buf, uint64_t dedupe_index_sz_cmp, - uint64_t dedupe_data_sz_cmp); + uint64_t dedupe_data_sz_cmp); extern void reset_dedupe_context(dedupe_context_t *ctx); extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta_flag); +extern int global_dedupe_chkmem(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, + const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, + size_t memlimit, int nthreads); #endif /* _RABIN_POLY_H_ */