Capability to output data to stdout when compressing.

Always use segmented similarity bases dedupe when using -G option in pipe mode.
Standardize on average 8MB segment size for segmented dedupe.
Fix hashtable sizing.
Some miscellaneous cleanups.
Update README with details of new features.
This commit is contained in:
Moinak Ghosh 2013-04-24 23:03:58 +05:30
parent 6c5d8d9e18
commit 79a6e7f770
6 changed files with 118 additions and 50 deletions

View file

@ -39,7 +39,7 @@ Usage
=====
To compress a file:
pcompress -c <algorithm> [-l <compress level>] [-s <chunk size>] <file>
pcompress -c <algorithm> [-l <compress level>] [-s <chunk size>] <file> [-]
Where <algorithm> can be the folowing:
lzfx - Very fast and small algorithm based on LZF.
lz4 - Ultra fast, high-throughput algorithm reaching RAM B/W at level1.
@ -67,8 +67,12 @@ Usage
<chunk_size> - This can be in bytes or can use the following suffixes:
g - Gigabyte, m - Megabyte, k - Kilobyte.
Larger chunks produce better compression at the cost of memory.
In case of Global Deduplication (see below) this chunk size is
just a hint and may get automatically adjusted.
<compress_level> - Can be a number from 0 meaning minimum and 14 meaning
maximum compression.
'-' - If '-' is given as the final argument then it specified that
compressed output should go to stdout.
NOTE: The option "libbsc" uses Ilya Grebnov's block sorting compression library
from http://libbsc.com/ . It is only available if pcompress in built with
@ -130,6 +134,42 @@ NOTE: The option "libbsc" uses Ilya Grebnov's block sorting compression library
'-M' - Display memory allocator statistics
'-C' - Display compression statistics
Global Deduplication:
'-G' - This flag enables Global Deduplication. This makes pcompress maintain an
in-memory index to lookup cryptographic block hashes for duplicates. Once
a duplicate is found it is replaced with a reference to the original block.
This allows detecting and eliminating duplicate blocks across the entire
dataset. In contrast using only '-D' or '-F' flags does deduplication only
within the chunk but uses less memory and is much faster than Global Dedupe.
The '-G' flag can be combined with either '-D' or '-F' flags to indicate
rabin chunking or fixed chunking respectively. If these flags are not
specified then the default is to assume rabin chunking via '-D'.
All other Dedupe flags have the same meanings in this context.
Delta Encoding is not supported with Global Deduplication at this time. The
in-memory hashtable index can use upto 75% of free RAM depending on the size
of the dataset. In Pipe mode the index will always use 75% of free RAM since
the dataset size is not known. This is the simple full chunk or block index
mode. If the available RAM is not enough to hold all block checksums then
older block entries are discarded automatically from the matching hash slots.
If pipe mode is not used and the given dataset is a file then Pcompress
checks whether the index size will exceed three times of 75% of the available
free RAM. In such a case it automatically switches to a Segmented Deduplication
mode. Here data is first split into blocks as above. Then upto 2048 blocks are
grouped together to form a larger segment. The individual block hashes for a
segment are stored on a tempfile on disk. A few min-values hashes are then
computed from the block hashes of the segment which are then loaded into the
index. These hashes are used to detect segments that are approximately similar
to each other. Once found the block hashes of the matching segments are loaded
from the temp file and actual deduplication is performed. This allows the
in-memory index size to be approximately 0.0025% of the total dataset size and
requires very few disk reads for every 2048 blocks processed.
In pipe mode Global Deduplication always uses a segmented similarity based
index. It allows efficient network transfer of large data.
Encryption flags:
'-e <ALGO>'
Encrypt chunks using the given encryption algorithm. The algo parameter

67
main.c
View file

@ -82,7 +82,7 @@ static props_func_ptr _props_func;
static int main_cancel;
static int adapt_mode = 0;
static int pipe_mode = 0;
static int pipe_mode = 0, pipe_out = 0;
static int nthreads = 0;
static int hide_mem_stats = 1;
static int hide_cmp_stats = 1;
@ -1116,7 +1116,7 @@ start_decompress(const char *filename, const char *to_filename)
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size,
algo, &props, enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0,
NULL);
NULL, pipe_mode);
if (tdat->rctx == NULL) {
UNCOMP_BAIL;
}
@ -1783,7 +1783,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
/*
* Create a temporary file to hold compressed data which is renamed at
* the end. The target file name is same as original file with the '.pz'
* extension appended.
* extension appended unless '-' was specified to output to stdout.
*/
strcpy(tmpfile1, filename);
strcpy(tmpfile1, dirname(tmpfile1));
@ -1794,13 +1794,23 @@ start_compress(const char *filename, uint64_t chunksize, int level)
} else {
strcpy(tmpdir, tmp);
}
strcat(tmpfile1, "/.pcompXXXXXX");
snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename);
if ((compfd = mkstemp(tmpfile1)) == -1) {
perror("mkstemp ");
COMP_BAIL;
if (pipe_out) {
compfd = fileno(stdout);
if (compfd == -1) {
perror("fileno ");
COMP_BAIL;
}
f_name = NULL;
} else {
strcat(tmpfile1, "/.pcompXXXXXX");
snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename);
if ((compfd = mkstemp(tmpfile1)) == -1) {
perror("mkstemp ");
COMP_BAIL;
}
f_name = tmpfile1;
}
f_name = tmpfile1;
signal(SIGINT, Int_Handler);
signal(SIGTERM, Int_Handler);
} else {
@ -1840,12 +1850,12 @@ start_compress(const char *filename, uint64_t chunksize, int level)
strcpy(tmpdir, tmp);
}
if (enable_rabin_global && !pipe_mode) {
if (enable_rabin_global) {
my_sysinfo msys_info;
get_sys_limits(&msys_info);
global_dedupe_bufadjust(rab_blk_size, &chunksize, 0, algo, cksum,
CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads);
CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads, pipe_mode);
}
/*
@ -1952,7 +1962,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size,
algo, &props, enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size,
tmpdir);
tmpdir, pipe_mode);
if (tdat->rctx == NULL) {
COMP_BAIL;
}
@ -2095,7 +2105,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
file_offset = 0;
if (enable_rabin_split) {
rctx = create_dedupe_context(chunksize, 0, 0, algo, &props, enable_delta_encode,
enable_fixed_scan, VERSION, COMPRESS, 0, NULL);
enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pipe_mode);
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx);
} else {
rbytes = Read(uncompfd, cread_buf, chunksize);
@ -2239,7 +2249,7 @@ comp_done:
}
if (err) {
if (compfd != -1 && !pipe_mode)
if (compfd != -1 && !pipe_mode && !pipe_out)
unlink(tmpfile1);
if (filename)
fprintf(stderr, "Error compressing file: %s\n", filename);
@ -2260,7 +2270,7 @@ comp_done:
* Rename the temporary file to the actual compressed file
* unless we are in a pipe.
*/
if (!pipe_mode) {
if (!pipe_mode && !pipe_out) {
/*
* Ownership and mode of target should be same as original.
*/
@ -2621,25 +2631,32 @@ main(int argc, char *argv[])
usage(); /* At least 1 filename needed. */
exit(1);
} else if (num_rem == 1) {
} else if (num_rem == 1 || num_rem == 2) {
if (do_compress) {
char apath[MAXPATHLEN];
if ((filename = realpath(argv[optind], NULL)) == NULL)
err_exit(1, "%s", argv[optind]);
if (num_rem == 2) {
optind++;
if (*(argv[optind]) == '-') {
to_filename = "-";
pipe_out = 1;
}
to_filename = realpath(argv[optind], NULL);
} else {
strcpy(apath, filename);
strcat(apath, COMP_EXTN);
to_filename = realpath(apath, NULL);
}
/* Check if compressed file exists */
strcpy(apath, filename);
strcat(apath, COMP_EXTN);
if ((to_filename = realpath(apath, NULL)) != NULL) {
if (to_filename != NULL) {
free(filename);
err_exit(0, "Compressed file %s exists\n", to_filename);
}
} else {
usage();
exit(1);
}
} else if (num_rem == 2) {
if (do_uncompress) {
} else if (do_uncompress && num_rem == 2) {
/*
* While decompressing, input can be stdin and output a physical file.
*/

View file

@ -285,15 +285,13 @@ read_config(char *configfile, archive_config_t *cfg)
cfg->chunk_sz_bytes = RAB_BLK_AVG_SZ(cfg->chunk_sz);
cfg->directory_levels = 2;
segment_sz_bytes = EIGHT_MB;
if (cfg->archive_sz < ONE_TB) {
segment_sz_bytes = FOUR_MB;
cfg->directory_fanout = 128;
} else if (cfg->archive_sz < ONE_PB) {
segment_sz_bytes = EIGHT_MB;
cfg->directory_fanout = 256;
} else {
segment_sz_bytes = EIGHT_MB;
cfg->directory_fanout = 256;
cfg->directory_levels = 3;
}

View file

@ -115,23 +115,25 @@ setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chun
/*
* file_sz = 0 and pct_interval = 0 means we are in pipe mode and want a simple
* index. Set pct_interval to 100 to indicate that we need to use all of memlimit
* for the simple index.
* for the simple index - This is UNUSED at present.
*
* If file_sz != 0 but pct_interval = 0 then we need to create a simple index
* sized for the given file. If the available memory is not sufficient for a full
* index and required index size is 1.25x of availble mem then switch to a
* index and required index size is 3x of availble memory then switch to a
* segmented index.
*
* If file_sz != 0 and pct_interval != 0 then we explicitly want to create a segmented
* index. This option is auto-selected to support the previous behavior.
* index. This is auto-selected to support the previous behavior.
*
* If file_sz = 0 and pct_interval != 0 then we are in pipe mode and want a segmented
* index. This is typically for WAN deduplication of large data transfers.
* index. This is auto-selected if Global Dedupe is used in pipe mode.
*/
if (pct_interval != 0)
set_user = 0;
else
if (*pct_interval != 0) {
set_user = 1;
} else {
set_user = 0;
}
if (file_sz == 0 && *pct_interval == 0)
*pct_interval = 100;
@ -156,7 +158,7 @@ set_cfg:
}
// Compute total hashtable entries first
*hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1;
*hash_entry_size = sizeof (hash_entry_t) + cfg->chunk_cksum_sz - 1;
if (*pct_interval == 0) {
cfg->sub_intervals = 1;
*hash_slots = file_sz / cfg->chunk_sz_bytes + 1;
@ -166,10 +168,19 @@ set_cfg:
*hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size);
*pct_interval = 0;
} else {
*hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1;
cfg->intervals = 100 / *pct_interval;
cfg->sub_intervals = cfg->intervals;
*hash_slots = file_sz / cfg->segment_sz_bytes;
*hash_slots *= cfg->sub_intervals;
/*
* If file size is zero we use entire memlimit for hash slots.
*/
if (file_sz == 0) {
*hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size);
} else {
*hash_slots = file_sz / cfg->segment_sz_bytes;
*hash_slots *= cfg->sub_intervals;
}
}
/*
@ -197,7 +208,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
size_t file_sz, size_t memlimit, int nthreads)
{
archive_config_t *cfg;
int rv, orig_pct;
int rv;
uint32_t hash_slots, intervals, i;
uint64_t memreqd;
int hash_entry_size;
@ -207,7 +218,6 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
fprintf(stderr, "Disk based index not yet implemented.\n");
return (NULL);
}
orig_pct = pct_interval;
cfg = calloc(1, sizeof (archive_config_t));
rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim,

View file

@ -75,7 +75,6 @@
#include <pthread.h>
#include <heapq.h>
#include <xxhash.h>
#include <blake2_digest.h>
#include "rabin_dedup.h"
#if defined(__USE_SSE_INTRIN__) && defined(__SSE4_1__) && RAB_POLYNOMIAL_WIN_SIZE == 16
@ -109,8 +108,6 @@ static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER;
uint64_t ir[256], out[256];
static int inited = 0;
archive_config_t *arc = NULL;
static struct blake2_dispatch bdsp;
int seg = 0;
static uint32_t
dedupe_min_blksz(int rab_blk_sz)
@ -137,7 +134,7 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta
int
global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_interval,
const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz,
size_t memlimit, int nthreads)
size_t memlimit, int nthreads, int pipe_mode)
{
uint64_t memreqd;
archive_config_t cfg;
@ -146,6 +143,9 @@ global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_in
rv = 0;
pct_i = pct_interval;
if (pipe_mode && pct_i == 0)
pct_i = DEFAULT_PCT_INTERVAL;
rv = setup_db_config_s(&cfg, rab_blk_sz, user_chunk_sz, &pct_i, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit, "/tmp");
return (rv);
@ -157,7 +157,7 @@ global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_in
dedupe_context_t *
create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz,
const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op, uint64_t file_size, char *tmppath) {
int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode) {
dedupe_context_t *ctx;
uint32_t i;
@ -206,20 +206,23 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
*/
if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) {
my_sysinfo msys_info;
int pct_interval;
/*
* Get amount of memory to use. The freeram got here is adjusted amount.
*/
get_sys_limits(&msys_info);
pct_interval = 0;
if (pipe_mode)
pct_interval = DEFAULT_PCT_INTERVAL;
arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, 0,
arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, pct_interval,
algo, props->cksum, GLOBAL_SIM_CKSUM, file_size,
msys_info.freeram, props->nthreads);
if (arc == NULL) {
pthread_mutex_unlock(&init_lock);
return (NULL);
}
blake2_module_init(&bdsp, &proc_info);
}
inited = 1;
}

View file

@ -190,7 +190,7 @@ typedef struct {
extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize,
int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op, uint64_t file_size, char *tmppath);
int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode);
extern void destroy_dedupe_context(dedupe_context_t *ctx);
extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf,
uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt);
@ -205,6 +205,6 @@ extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char
int delta_flag);
extern int global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_interval,
const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz,
size_t memlimit, int nthreads);
size_t memlimit, int nthreads, int pipe_mode);
#endif /* _RABIN_POLY_H_ */