Fix bugs and improve accuracy in Segmented Dedupe.

Fix segment hashlist size computation.
Remove unnecessary sync of segment hashlist file writes.
Pass correct number of threads to index creation routine.
Add more error checks.
Handle correct positioning of segment hashlist file offset on write error.
Add missing semaphore signaling at dedupe abort points with global dedupe.
Use closer min-values sampling for improved segmented dedupe accuracy.
Update proper checksum info in README.
This commit is contained in:
Moinak Ghosh 2013-04-30 19:35:18 +05:30
parent 074e265f70
commit b23b5789fb
6 changed files with 73 additions and 35 deletions

View file

@ -119,14 +119,17 @@ NOTE: The option "libbsc" uses Ilya Grebnov's block sorting compression library
datasets.
'-S' <cksum>
- Specify chunk checksum to use: CRC64, SKEIN256, SKEIN512, SHA256 and
SHA512. Default one is SKEIN256. The implementation actually uses SKEIN
512-256. This is 25% slower than simple CRC64 but is many times more
robust than CRC64 in detecting data integrity errors. SKEIN is a
finalist in the NIST SHA-3 standard selection process and is one of
the fastest in the group, especially on x86 platforms. BLAKE is faster
than SKEIN on a few platforms.
SKEIN 512-256 is about 60% faster than SHA 512-256 on x64 platforms.
- Specify chunk checksum to use:
CRC64 - Extremely Fast 64-bit CRC from LZMA SDK.
SHA256 - SHA512/256 version of Intel's optimized (SSE,AVX) SHA2 for x86.
SHA512 - SHA512 version of Intel's optimized (SSE,AVX) SHA2 for x86.
KECCAK256 - Official 256-bit NIST SHA3 optimized implementation.
KECCAK512 - Official 512-bit NIST SHA3 optimized implementation.
BLAKE256 - Very fast 256-bit BLAKE2, derived from the NIST SHA3
runner-up BLAKE.
BLAKE512 - Very fast 256-bit BLAKE2, derived from the NIST SHA3
runner-up BLAKE.
'-F' - Perform Fixed Block Deduplication. This is faster than fingerprinting
based content-aware deduplication in some cases. However this is mostly

11
main.c
View file

@ -1116,7 +1116,7 @@ start_decompress(const char *filename, const char *to_filename)
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size,
algo, &props, enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0,
NULL, pipe_mode);
NULL, pipe_mode, nprocs);
if (tdat->rctx == NULL) {
UNCOMP_BAIL;
}
@ -1962,12 +1962,13 @@ start_compress(const char *filename, uint64_t chunksize, int level)
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size,
algo, &props, enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size,
tmpdir, pipe_mode);
tmpdir, pipe_mode, nprocs);
if (tdat->rctx == NULL) {
COMP_BAIL;
}
tdat->rctx->index_sem = &(tdat->index_sem);
tdat->rctx->id = i;
} else {
tdat->rctx = NULL;
}
@ -1991,9 +1992,9 @@ start_compress(const char *filename, uint64_t chunksize, int level)
tdat = dary[i];
tdat->rctx->index_sem_next = &(dary[(i + 1) % nprocs]->index_sem);
}
// When doing global dedupe first thread does not wait to access the index.
sem_post(&(dary[0]->index_sem));
}
// When doing global dedupe first thread does not wait to access the index.
sem_post(&(dary[0]->index_sem));
w.dary = dary;
w.wfd = compfd;
@ -2105,7 +2106,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
file_offset = 0;
if (enable_rabin_split) {
rctx = create_dedupe_context(chunksize, 0, 0, algo, &props, enable_delta_encode,
enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pipe_mode);
enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pipe_mode, nprocs);
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx);
} else {
rbytes = Read(uncompfd, cread_buf, chunksize);

View file

@ -266,6 +266,8 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
* file is created to hold rabin block hash lists for each segment.
*/
if (pct_interval > 0) {
int errored;
strcpy(cfg->rootdir, tmppath);
strcat(cfg->rootdir, "/.segXXXXXX");
cfg->seg_fd_w = mkstemp(cfg->rootdir);
@ -277,11 +279,26 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
free(cfg);
return (NULL);
}
errored = 0;
for (i = 0; i < nthreads; i++) {
cfg->seg_fd_r[i].fd = open(cfg->rootdir, O_RDONLY);
if (cfg->seg_fd_r[i].fd == -1) {
perror(" ");
errored = 1;
break;
}
cfg->seg_fd_r[i].mapping = NULL;
}
if (errored) {
cleanup_indx(indx);
if (cfg->seg_fd_r)
free(cfg->seg_fd_r);
free(cfg);
return (NULL);
}
/*
* Remove tempfile entry from the filesystem metadata so that file gets
* automatically removed once process exits.
@ -314,27 +331,33 @@ db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, ui
*((uint64_t *)(hdr + 4)) = file_offset;
w = Write(cfg->seg_fd_w, hdr, sizeof (hdr));
if (w < sizeof (hdr))
if (w < sizeof (hdr)) {
/*
* On error restore file pointer to previous position so that
* all subsequent offsets will be properly computed.
*/
lseek(cfg->seg_fd_w, cfg->segcache_pos, SEEK_SET);
return (-1);
}
cfg->segcache_pos += w;
w = Write(cfg->seg_fd_w, buf, len);
if (w < len)
if (w < len) {
/*
* On error restore file pointer to previous position so that
* all subsequent offsets will be properly computed.
*/
lseek(cfg->seg_fd_w, cfg->segcache_pos, SEEK_SET);
return (-1);
}
cfg->segcache_pos += w;
return (0);
}
void
db_segcache_sync(archive_config_t *cfg)
{
fdatasync(cfg->seg_fd_w);
}
/*
* Get the current file pointer position of the metadata file. This indicates the
* position where the next entry will be added.
*/
int
uint64_t
db_segcache_pos(archive_config_t *cfg, int tid)
{
return (cfg->segcache_pos);
@ -369,8 +392,10 @@ db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offs
*/
db_segcache_unmap(cfg, tid);
fd = cfg->seg_fd_r[tid].fd;
if (lseek(fd, *offset, SEEK_SET) != *offset)
if (lseek(fd, *offset, SEEK_SET) != *offset) {
perror(" ");
return (-1);
}
/*
* Mmap hdr and blocks. We assume max # of rabin block entries and mmap (unless remaining
@ -383,8 +408,10 @@ db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offs
len = pos - *offset;
mapbuf = mmap(NULL, len + adj, PROT_READ, MAP_SHARED, fd, *offset - adj);
if (mapbuf == MAP_FAILED)
if (mapbuf == MAP_FAILED) {
perror(" ");
return (-1);
}
cfg->seg_fd_r[tid].cache_offset = *offset;
hdr = mapbuf + adj;

View file

@ -55,8 +55,7 @@ hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int
void destroy_global_db_s(archive_config_t *cfg);
int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset);
void db_segcache_sync(archive_config_t *cfg);
int db_segcache_pos(archive_config_t *cfg, int tid);
uint64_t db_segcache_pos(archive_config_t *cfg, int tid);
int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks);
int db_segcache_unmap(archive_config_t *cfg, int tid);

View file

@ -157,7 +157,8 @@ global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_in
dedupe_context_t *
create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz,
const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode) {
int file_version, compress_op_t op, uint64_t file_size, char *tmppath,
int pipe_mode, int nthreads) {
dedupe_context_t *ctx;
uint32_t i;
@ -218,7 +219,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, pct_interval,
algo, props->cksum, GLOBAL_SIM_CKSUM, file_size,
msys_info.freeram, props->nthreads);
msys_info.freeram, nthreads);
if (arc == NULL) {
pthread_mutex_unlock(&init_lock);
return (NULL);
@ -722,6 +723,10 @@ process_blocks:
DEBUG_STAT_EN(en_1 = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Original size: %" PRId64 ", blknum: %u\n", *size, blknum));
DEBUG_STAT_EN(fprintf(stderr, "Number of maxlen blocks: %u\n", max_count));
if (blknum <=2 && ctx->arc) {
sem_wait(ctx->index_sem);
sem_post(ctx->index_sem_next);
}
if (blknum > 2) {
uint64_t pos, matchlen, pos1 = 0;
int valid = 1;
@ -906,11 +911,11 @@ process_blocks:
sim_ck = ctx->similarity_cksums;
sub_i = cfg->sub_intervals;
tgt = seg_heap;
increment = cfg->chunk_cksum_sz;
increment = cfg->chunk_cksum_sz / 2;
if (increment * sub_i > length)
sub_i = length / increment;
for (j = 0; j<sub_i; j++) {
crc = lzma_crc64(tgt, increment/4, 0);
crc = lzma_crc64(tgt, increment/2, 0);
*((uint64_t *)sim_ck) = crc;
tgt += increment;
sim_ck += cfg->similarity_cksum_sz;
@ -927,9 +932,13 @@ process_blocks:
}
seg_offset = db_segcache_pos(cfg, ctx->id);
len = blks * sizeof (global_blockentry_t);
db_segcache_write(cfg, ctx->id, (uchar_t *)&(ctx->g_blocks[i]),
len, blks-i, ctx->file_offset);
len = (blks-i) * sizeof (global_blockentry_t);
if (db_segcache_write(cfg, ctx->id, (uchar_t *)&(ctx->g_blocks[i]),
len, blks-i, ctx->file_offset) == -1) {
sem_post(ctx->index_sem_next);
ctx->valid = 0;
return (0);
}
/*
* Now lookup all the similarity hashes. We sort the hashes first so that
@ -993,7 +1002,6 @@ process_blocks:
* Signal the next thread in sequence to access the index.
*/
sem_post(ctx->index_sem_next);
db_segcache_sync(cfg);
/*
* Now go through all the matching segments for all the current segments

View file

@ -190,7 +190,7 @@ typedef struct {
extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize,
int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode);
int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode, int nthreads);
extern void destroy_dedupe_context(dedupe_context_t *ctx);
extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf,
uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt);