diff --git a/README.md b/README.md index a8f9eef..f9c85cf 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Usage ===== To compress a file: - pcompress -c [-l ] [-s ] + pcompress -c [-l ] [-s ] [-] Where can be the folowing: lzfx - Very fast and small algorithm based on LZF. lz4 - Ultra fast, high-throughput algorithm reaching RAM B/W at level1. @@ -67,8 +67,12 @@ Usage - This can be in bytes or can use the following suffixes: g - Gigabyte, m - Megabyte, k - Kilobyte. Larger chunks produce better compression at the cost of memory. + In case of Global Deduplication (see below) this chunk size is + just a hint and may get automatically adjusted. - Can be a number from 0 meaning minimum and 14 meaning maximum compression. + '-' - If '-' is given as the final argument then it specified that + compressed output should go to stdout. NOTE: The option "libbsc" uses Ilya Grebnov's block sorting compression library from http://libbsc.com/ . It is only available if pcompress in built with @@ -130,6 +134,42 @@ NOTE: The option "libbsc" uses Ilya Grebnov's block sorting compression library '-M' - Display memory allocator statistics '-C' - Display compression statistics + Global Deduplication: + '-G' - This flag enables Global Deduplication. This makes pcompress maintain an + in-memory index to lookup cryptographic block hashes for duplicates. Once + a duplicate is found it is replaced with a reference to the original block. + This allows detecting and eliminating duplicate blocks across the entire + dataset. In contrast using only '-D' or '-F' flags does deduplication only + within the chunk but uses less memory and is much faster than Global Dedupe. + + The '-G' flag can be combined with either '-D' or '-F' flags to indicate + rabin chunking or fixed chunking respectively. If these flags are not + specified then the default is to assume rabin chunking via '-D'. + All other Dedupe flags have the same meanings in this context. + + Delta Encoding is not supported with Global Deduplication at this time. The + in-memory hashtable index can use upto 75% of free RAM depending on the size + of the dataset. In Pipe mode the index will always use 75% of free RAM since + the dataset size is not known. This is the simple full chunk or block index + mode. If the available RAM is not enough to hold all block checksums then + older block entries are discarded automatically from the matching hash slots. + + If pipe mode is not used and the given dataset is a file then Pcompress + checks whether the index size will exceed three times of 75% of the available + free RAM. In such a case it automatically switches to a Segmented Deduplication + mode. Here data is first split into blocks as above. Then upto 2048 blocks are + grouped together to form a larger segment. The individual block hashes for a + segment are stored on a tempfile on disk. A few min-values hashes are then + computed from the block hashes of the segment which are then loaded into the + index. These hashes are used to detect segments that are approximately similar + to each other. Once found the block hashes of the matching segments are loaded + from the temp file and actual deduplication is performed. This allows the + in-memory index size to be approximately 0.0025% of the total dataset size and + requires very few disk reads for every 2048 blocks processed. + + In pipe mode Global Deduplication always uses a segmented similarity based + index. It allows efficient network transfer of large data. + Encryption flags: '-e ' Encrypt chunks using the given encryption algorithm. The algo parameter diff --git a/main.c b/main.c index 7e31d40..38a6361 100644 --- a/main.c +++ b/main.c @@ -82,7 +82,7 @@ static props_func_ptr _props_func; static int main_cancel; static int adapt_mode = 0; -static int pipe_mode = 0; +static int pipe_mode = 0, pipe_out = 0; static int nthreads = 0; static int hide_mem_stats = 1; static int hide_cmp_stats = 1; @@ -1116,7 +1116,7 @@ start_decompress(const char *filename, const char *to_filename) if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size, algo, &props, enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0, - NULL); + NULL, pipe_mode); if (tdat->rctx == NULL) { UNCOMP_BAIL; } @@ -1783,7 +1783,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) /* * Create a temporary file to hold compressed data which is renamed at * the end. The target file name is same as original file with the '.pz' - * extension appended. + * extension appended unless '-' was specified to output to stdout. */ strcpy(tmpfile1, filename); strcpy(tmpfile1, dirname(tmpfile1)); @@ -1794,13 +1794,23 @@ start_compress(const char *filename, uint64_t chunksize, int level) } else { strcpy(tmpdir, tmp); } - strcat(tmpfile1, "/.pcompXXXXXX"); - snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename); - if ((compfd = mkstemp(tmpfile1)) == -1) { - perror("mkstemp "); - COMP_BAIL; + + if (pipe_out) { + compfd = fileno(stdout); + if (compfd == -1) { + perror("fileno "); + COMP_BAIL; + } + f_name = NULL; + } else { + strcat(tmpfile1, "/.pcompXXXXXX"); + snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename); + if ((compfd = mkstemp(tmpfile1)) == -1) { + perror("mkstemp "); + COMP_BAIL; + } + f_name = tmpfile1; } - f_name = tmpfile1; signal(SIGINT, Int_Handler); signal(SIGTERM, Int_Handler); } else { @@ -1840,12 +1850,12 @@ start_compress(const char *filename, uint64_t chunksize, int level) strcpy(tmpdir, tmp); } - if (enable_rabin_global && !pipe_mode) { + if (enable_rabin_global) { my_sysinfo msys_info; get_sys_limits(&msys_info); global_dedupe_bufadjust(rab_blk_size, &chunksize, 0, algo, cksum, - CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads); + CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads, pipe_mode); } /* @@ -1952,7 +1962,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size, algo, &props, enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size, - tmpdir); + tmpdir, pipe_mode); if (tdat->rctx == NULL) { COMP_BAIL; } @@ -2095,7 +2105,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) file_offset = 0; if (enable_rabin_split) { rctx = create_dedupe_context(chunksize, 0, 0, algo, &props, enable_delta_encode, - enable_fixed_scan, VERSION, COMPRESS, 0, NULL); + enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pipe_mode); rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); } else { rbytes = Read(uncompfd, cread_buf, chunksize); @@ -2239,7 +2249,7 @@ comp_done: } if (err) { - if (compfd != -1 && !pipe_mode) + if (compfd != -1 && !pipe_mode && !pipe_out) unlink(tmpfile1); if (filename) fprintf(stderr, "Error compressing file: %s\n", filename); @@ -2260,7 +2270,7 @@ comp_done: * Rename the temporary file to the actual compressed file * unless we are in a pipe. */ - if (!pipe_mode) { + if (!pipe_mode && !pipe_out) { /* * Ownership and mode of target should be same as original. */ @@ -2621,25 +2631,32 @@ main(int argc, char *argv[]) usage(); /* At least 1 filename needed. */ exit(1); - } else if (num_rem == 1) { + } else if (num_rem == 1 || num_rem == 2) { if (do_compress) { char apath[MAXPATHLEN]; if ((filename = realpath(argv[optind], NULL)) == NULL) err_exit(1, "%s", argv[optind]); + + if (num_rem == 2) { + optind++; + if (*(argv[optind]) == '-') { + to_filename = "-"; + pipe_out = 1; + } + to_filename = realpath(argv[optind], NULL); + } else { + strcpy(apath, filename); + strcat(apath, COMP_EXTN); + to_filename = realpath(apath, NULL); + } + /* Check if compressed file exists */ - strcpy(apath, filename); - strcat(apath, COMP_EXTN); - if ((to_filename = realpath(apath, NULL)) != NULL) { + if (to_filename != NULL) { free(filename); err_exit(0, "Compressed file %s exists\n", to_filename); } - } else { - usage(); - exit(1); - } - } else if (num_rem == 2) { - if (do_uncompress) { + } else if (do_uncompress && num_rem == 2) { /* * While decompressing, input can be stdin and output a physical file. */ diff --git a/rabin/global/dedupe_config.c b/rabin/global/dedupe_config.c index 69daac8..b0368c5 100644 --- a/rabin/global/dedupe_config.c +++ b/rabin/global/dedupe_config.c @@ -285,15 +285,13 @@ read_config(char *configfile, archive_config_t *cfg) cfg->chunk_sz_bytes = RAB_BLK_AVG_SZ(cfg->chunk_sz); cfg->directory_levels = 2; + segment_sz_bytes = EIGHT_MB; if (cfg->archive_sz < ONE_TB) { - segment_sz_bytes = FOUR_MB; cfg->directory_fanout = 128; } else if (cfg->archive_sz < ONE_PB) { - segment_sz_bytes = EIGHT_MB; cfg->directory_fanout = 256; } else { - segment_sz_bytes = EIGHT_MB; cfg->directory_fanout = 256; cfg->directory_levels = 3; } diff --git a/rabin/global/index.c b/rabin/global/index.c index c6ba12c..0f9c0df 100644 --- a/rabin/global/index.c +++ b/rabin/global/index.c @@ -115,23 +115,25 @@ setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chun /* * file_sz = 0 and pct_interval = 0 means we are in pipe mode and want a simple * index. Set pct_interval to 100 to indicate that we need to use all of memlimit - * for the simple index. + * for the simple index - This is UNUSED at present. * * If file_sz != 0 but pct_interval = 0 then we need to create a simple index * sized for the given file. If the available memory is not sufficient for a full - * index and required index size is 1.25x of availble mem then switch to a + * index and required index size is 3x of availble memory then switch to a * segmented index. * * If file_sz != 0 and pct_interval != 0 then we explicitly want to create a segmented - * index. This option is auto-selected to support the previous behavior. + * index. This is auto-selected to support the previous behavior. * * If file_sz = 0 and pct_interval != 0 then we are in pipe mode and want a segmented - * index. This is typically for WAN deduplication of large data transfers. + * index. This is auto-selected if Global Dedupe is used in pipe mode. */ - if (pct_interval != 0) - set_user = 0; - else + if (*pct_interval != 0) { set_user = 1; + } else { + set_user = 0; + } + if (file_sz == 0 && *pct_interval == 0) *pct_interval = 100; @@ -156,7 +158,7 @@ set_cfg: } // Compute total hashtable entries first - *hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1; + *hash_entry_size = sizeof (hash_entry_t) + cfg->chunk_cksum_sz - 1; if (*pct_interval == 0) { cfg->sub_intervals = 1; *hash_slots = file_sz / cfg->chunk_sz_bytes + 1; @@ -166,10 +168,19 @@ set_cfg: *hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size); *pct_interval = 0; } else { + *hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1; cfg->intervals = 100 / *pct_interval; cfg->sub_intervals = cfg->intervals; - *hash_slots = file_sz / cfg->segment_sz_bytes; - *hash_slots *= cfg->sub_intervals; + + /* + * If file size is zero we use entire memlimit for hash slots. + */ + if (file_sz == 0) { + *hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size); + } else { + *hash_slots = file_sz / cfg->segment_sz_bytes; + *hash_slots *= cfg->sub_intervals; + } } /* @@ -197,7 +208,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch size_t file_sz, size_t memlimit, int nthreads) { archive_config_t *cfg; - int rv, orig_pct; + int rv; uint32_t hash_slots, intervals, i; uint64_t memreqd; int hash_entry_size; @@ -207,7 +218,6 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch fprintf(stderr, "Disk based index not yet implemented.\n"); return (NULL); } - orig_pct = pct_interval; cfg = calloc(1, sizeof (archive_config_t)); rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim, diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index e1c8ebb..da119a8 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -75,7 +75,6 @@ #include #include #include -#include #include "rabin_dedup.h" #if defined(__USE_SSE_INTRIN__) && defined(__SSE4_1__) && RAB_POLYNOMIAL_WIN_SIZE == 16 @@ -109,8 +108,6 @@ static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; uint64_t ir[256], out[256]; static int inited = 0; archive_config_t *arc = NULL; -static struct blake2_dispatch bdsp; -int seg = 0; static uint32_t dedupe_min_blksz(int rab_blk_sz) @@ -137,7 +134,7 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta int global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, - size_t memlimit, int nthreads) + size_t memlimit, int nthreads, int pipe_mode) { uint64_t memreqd; archive_config_t cfg; @@ -146,6 +143,9 @@ global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_in rv = 0; pct_i = pct_interval; + if (pipe_mode && pct_i == 0) + pct_i = DEFAULT_PCT_INTERVAL; + rv = setup_db_config_s(&cfg, rab_blk_sz, user_chunk_sz, &pct_i, algo, ck, ck_sim, file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit, "/tmp"); return (rv); @@ -157,7 +157,7 @@ global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_in dedupe_context_t * create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, - int file_version, compress_op_t op, uint64_t file_size, char *tmppath) { + int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode) { dedupe_context_t *ctx; uint32_t i; @@ -206,20 +206,23 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s */ if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) { my_sysinfo msys_info; + int pct_interval; /* * Get amount of memory to use. The freeram got here is adjusted amount. */ get_sys_limits(&msys_info); + pct_interval = 0; + if (pipe_mode) + pct_interval = DEFAULT_PCT_INTERVAL; - arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, 0, + arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, pct_interval, algo, props->cksum, GLOBAL_SIM_CKSUM, file_size, msys_info.freeram, props->nthreads); if (arc == NULL) { pthread_mutex_unlock(&init_lock); return (NULL); } - blake2_module_init(&bdsp, &proc_info); } inited = 1; } diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index 9745f38..c9bcd13 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -190,7 +190,7 @@ typedef struct { extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, - int file_version, compress_op_t op, uint64_t file_size, char *tmppath); + int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode); extern void destroy_dedupe_context(dedupe_context_t *ctx); extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf, uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt); @@ -205,6 +205,6 @@ extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char int delta_flag); extern int global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, - size_t memlimit, int nthreads); + size_t memlimit, int nthreads, int pipe_mode); #endif /* _RABIN_POLY_H_ */