Implement global dedupe in pipe mode.

Update hash index calculations to use upto 75% memavail when file size is not known.
Use little-endian nonce format for Salsa20.
This commit is contained in:
Moinak Ghosh 2013-03-29 15:18:25 +05:30
parent 19b304f30c
commit c357452079
5 changed files with 74 additions and 36 deletions

View file

@ -130,7 +130,7 @@ salsa20_init(salsa20_ctx_t *ctx, uchar_t *salt, int saltlen, uchar_t *pwd, int p
n = (uint64_t *)IV; n = (uint64_t *)IV;
n1 = (uint64_t *)(ctx->nonce); n1 = (uint64_t *)(ctx->nonce);
for (i = 0; i < XSALSA20_CRYPTO_NONCEBYTES/8; i++) { for (i = 0; i < XSALSA20_CRYPTO_NONCEBYTES/8; i++) {
*n1 = ntohll(*n); *n1 = LE64(*n);
n++; n++;
n1++; n1++;
} }

56
main.c
View file

@ -693,13 +693,22 @@ start_decompress(const char *filename, const char *to_filename)
* Open files and do sanity checks. * Open files and do sanity checks.
*/ */
if (!pipe_mode) { if (!pipe_mode) {
if ((compfd = open(filename, O_RDONLY, 0)) == -1) if (filename == NULL) {
err_exit(1, "Cannot open: %s", filename); compfd = fileno(stdin);
if (compfd == -1) {
perror("fileno ");
UNCOMP_BAIL;
}
sbuf.st_size = 0;
} else {
if ((compfd = open(filename, O_RDONLY, 0)) == -1)
err_exit(1, "Cannot open: %s", filename);
if (fstat(compfd, &sbuf) == -1) if (fstat(compfd, &sbuf) == -1)
err_exit(1, "Cannot stat: %s", filename); err_exit(1, "Cannot stat: %s", filename);
if (sbuf.st_size == 0) if (sbuf.st_size == 0)
return (1); return (1);
}
if ((uncompfd = open(to_filename, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR | S_IWUSR)) == -1) { if ((uncompfd = open(to_filename, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR | S_IWUSR)) == -1) {
close(compfd); close(compfd);
@ -726,7 +735,10 @@ start_decompress(const char *filename, const char *to_filename)
UNCOMP_BAIL; UNCOMP_BAIL;
} }
if (init_algo(algorithm, 0) != 0) { if (init_algo(algorithm, 0) != 0) {
fprintf(stderr, "%s is not a pcompressed file.\n", filename); if (pipe_mode || filename == NULL)
fprintf(stderr, "Input stream is not pcompressed.\n");
else
fprintf(stderr, "%s is not a pcompressed file.\n", filename);
UNCOMP_BAIL; UNCOMP_BAIL;
} }
algo = algorithm; algo = algorithm;
@ -1279,9 +1291,11 @@ uncomp_done:
/* /*
* Ownership and mode of target should be same as original. * Ownership and mode of target should be same as original.
*/ */
fchmod(uncompfd, sbuf.st_mode); if (filename != NULL) {
if (fchown(uncompfd, sbuf.st_uid, sbuf.st_gid) == -1) fchmod(uncompfd, sbuf.st_mode);
perror("Chown "); if (fchown(uncompfd, sbuf.st_uid, sbuf.st_gid) == -1)
perror("Chown ");
}
if (dary != NULL) { if (dary != NULL) {
for (i = 0; i < nprocs; i++) { for (i = 0; i < nprocs; i++) {
if (!dary[i]) continue; if (!dary[i]) continue;
@ -1299,7 +1313,7 @@ uncomp_done:
slab_free(NULL, dary); slab_free(NULL, dary);
} }
if (!pipe_mode) { if (!pipe_mode) {
if (compfd != -1) close(compfd); if (filename && compfd != -1) close(compfd);
if (uncompfd != -1) close(uncompfd); if (uncompfd != -1) close(uncompfd);
} }
@ -1667,14 +1681,12 @@ start_compress(const char *filename, uint64_t chunksize, int level)
} }
flags = 0; flags = 0;
sbuf.st_size = 0;
dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
if (enable_rabin_global) { if (enable_rabin_global) {
flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED); flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED);
dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL; dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL;
if (pipe_mode) {
return (1);
}
} else if (enable_rabin_scan) { } else if (enable_rabin_scan) {
flags |= FLAG_DEDUP; flags |= FLAG_DEDUP;
dedupe_flag = RABIN_DEDUPE_SEGMENTED; dedupe_flag = RABIN_DEDUPE_SEGMENTED;
@ -2572,11 +2584,6 @@ main(int argc, char *argv[])
enable_rabin_split = 1; enable_rabin_split = 1;
} }
if (enable_rabin_global && pipe_mode) {
fprintf(stderr, "Global Deduplication is not supported in pipe mode.\n");
exit(1);
}
if (enable_rabin_global && enable_delta_encode) { if (enable_rabin_global && enable_delta_encode) {
fprintf(stderr, "Global Deduplication does not support Delta Compression.\n"); fprintf(stderr, "Global Deduplication does not support Delta Compression.\n");
exit(1); exit(1);
@ -2605,8 +2612,15 @@ main(int argc, char *argv[])
} }
} else if (num_rem == 2) { } else if (num_rem == 2) {
if (do_uncompress) { if (do_uncompress) {
if ((filename = realpath(argv[optind], NULL)) == NULL) /*
err_exit(1, "%s", argv[optind]); * While decompressing, input can be stdin and output a physical file.
*/
if (*(argv[optind]) == '-') {
filename = NULL;
} else {
if ((filename = realpath(argv[optind], NULL)) == NULL)
err_exit(1, "%s", argv[optind]);
}
optind++; optind++;
if ((to_filename = realpath(argv[optind], NULL)) != NULL) { if ((to_filename = realpath(argv[optind], NULL)) != NULL) {
free(filename); free(filename);

View file

@ -88,6 +88,9 @@ static cleanup_indx(index_t *indx)
} }
} }
#define MEM_PER_UNIT ( (hash_entry_size + sizeof (hash_entry_t *) + \
(sizeof (hash_entry_t *)) / 2) + sizeof (hash_entry_t **) )
archive_config_t * archive_config_t *
init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz, init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz,
int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
@ -97,11 +100,26 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
int rv; int rv;
float diff; float diff;
/*
* file_sz = 0 and pct_interval = 0 means we are in pipe mode and want a simple
* index. Set pct_interval to 100 to indicate that we need to use all of memlimit
* for the simple index.
*
* If file_sz != 0 but pct_interval = 0 then we need to create a simple index
* sized for the given file.
*
* If file_sz = 0 and pct_interval = 100 then we are in pipe mode and want a segmented
* index. This is typically for WAN deduplication of large data transfers.
*/
if (file_sz == 0 && pct_interval == 0)
pct_interval = 100;
cfg = calloc(1, sizeof (archive_config_t)); cfg = calloc(1, sizeof (archive_config_t));
rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, user_chunk_sz, pct_interval); rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, user_chunk_sz, pct_interval);
if (cfg->dedupe_mode == MODE_SIMPLE) { if (cfg->dedupe_mode == MODE_SIMPLE) {
pct_interval = 0; if (pct_interval != 100)
pct_interval = 0;
cfg->pct_interval = 0; cfg->pct_interval = 0;
} }
@ -115,30 +133,36 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
int hash_entry_size; int hash_entry_size;
index_t *indx; index_t *indx;
hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1;
// Compute total hashtable entries first // Compute total hashtable entries first
if (pct_interval == 0) { if (pct_interval == 0) {
intervals = 1; intervals = 1;
hash_slots = file_sz / cfg->chunk_sz_bytes + 1; hash_slots = file_sz / cfg->chunk_sz_bytes + 1;
} else if (pct_interval == 100) {
intervals = 1;
hash_slots = memlimit / MEM_PER_UNIT - 5;
pct_interval = 0;
} else { } else {
intervals = 100 / pct_interval - 1; intervals = 100 / pct_interval - 1;
hash_slots = file_sz / cfg->segment_sz_bytes + 1; hash_slots = file_sz / cfg->segment_sz_bytes + 1;
hash_slots *= intervals; hash_slots *= intervals;
} }
hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1;
// Compute memory required to hold all hash entries assuming worst case 50% // Compute memory required to hold all hash entries assuming worst case 50%
// occupancy. // occupancy.
memreqd = hash_slots * (hash_entry_size + sizeof (hash_entry_t *) + memreqd = hash_slots * MEM_PER_UNIT;
(sizeof (hash_entry_t *)) / 2);
memreqd += hash_slots * sizeof (hash_entry_t **);
diff = (float)pct_interval / 100.0; diff = (float)pct_interval / 100.0;
// Reduce hash_slots to remain within memlimit // Reduce hash_slots to remain within memlimit
while (memreqd > memlimit) { while (memreqd > memlimit) {
hash_slots -= (hash_slots * diff); if (pct_interval == 0) {
memreqd = hash_slots * (hash_entry_size + sizeof (hash_entry_t *) + hash_slots--;
(sizeof (hash_entry_t *)) / 2); } else {
memreqd += hash_slots * sizeof (hash_entry_t **); hash_slots -= (hash_slots * diff);
}
memreqd = hash_slots * MEM_PER_UNIT;
} }
// Now create as many hash tables as there are similarity match intervals // Now create as many hash tables as there are similarity match intervals
@ -251,7 +275,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
} }
} }
if (do_insert) { if (do_insert) {
if (indx->memused + indx->hash_entry_size >= indx->memlimit) { if (indx->memused + indx->hash_entry_size >= indx->memlimit && htab[htab_entry] != NULL) {
ent = htab[htab_entry]; ent = htab[htab_entry];
htab[htab_entry] = htab[htab_entry]->next; htab[htab_entry] = htab[htab_entry]->next;
} else { } else {

View file

@ -360,7 +360,7 @@ set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim
cfg->archive_sz = file_sz; cfg->archive_sz = file_sz;
cfg->dedupe_mode = MODE_SIMILARITY; cfg->dedupe_mode = MODE_SIMILARITY;
if (cfg->archive_sz <= SIXTEEN_GB || pct_interval == 0) { if (cfg->archive_sz <= SIXTEEN_GB || pct_interval == 0 || pct_interval == 100) {
cfg->dedupe_mode = MODE_SIMPLE; cfg->dedupe_mode = MODE_SIMPLE;
cfg->segment_sz_bytes = user_chunk_sz; cfg->segment_sz_bytes = user_chunk_sz;

View file

@ -188,9 +188,9 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
get_sysinfo(&msys_info); get_sysinfo(&msys_info);
/* /*
* Use a maximum of approx 62% of free RAM for the index. * Use a maximum of approx 75% of free RAM for the index.
*/ */
msys_info.freeram = (msys_info.freeram >> 1) + (msys_info.freeram >> 3); msys_info.freeram = (msys_info.freeram >> 1) + (msys_info.freeram >> 2);
arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0, arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0,
algo, props->cksum, props->cksum, file_size, algo, props->cksum, props->cksum, file_size,
msys_info.freeram, props->nthreads); msys_info.freeram, props->nthreads);
@ -1107,7 +1107,7 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size)
* RAM. Just mem-copy it. * RAM. Just mem-copy it.
* Otherwise it will be in the current output file. We mmap() the relevant * Otherwise it will be in the current output file. We mmap() the relevant
* region and copy it. The way deduplication is done it is guaranteed that * region and copy it. The way deduplication is done it is guaranteed that
* all duplicate reference will be backward references so this approach works. * all duplicate references will be backward references so this approach works.
* *
* However this approach precludes pipe-mode streamed decompression since * However this approach precludes pipe-mode streamed decompression since
* it requires random access to the output file. * it requires random access to the output file.