diff --git a/README.md b/README.md index bc37009..e298011 100644 --- a/README.md +++ b/README.md @@ -119,14 +119,17 @@ NOTE: The option "libbsc" uses Ilya Grebnov's block sorting compression library datasets. '-S' - - Specify chunk checksum to use: CRC64, SKEIN256, SKEIN512, SHA256 and - SHA512. Default one is SKEIN256. The implementation actually uses SKEIN - 512-256. This is 25% slower than simple CRC64 but is many times more - robust than CRC64 in detecting data integrity errors. SKEIN is a - finalist in the NIST SHA-3 standard selection process and is one of - the fastest in the group, especially on x86 platforms. BLAKE is faster - than SKEIN on a few platforms. - SKEIN 512-256 is about 60% faster than SHA 512-256 on x64 platforms. + - Specify chunk checksum to use: + + CRC64 - Extremely Fast 64-bit CRC from LZMA SDK. + SHA256 - SHA512/256 version of Intel's optimized (SSE,AVX) SHA2 for x86. + SHA512 - SHA512 version of Intel's optimized (SSE,AVX) SHA2 for x86. + KECCAK256 - Official 256-bit NIST SHA3 optimized implementation. + KECCAK512 - Official 512-bit NIST SHA3 optimized implementation. + BLAKE256 - Very fast 256-bit BLAKE2, derived from the NIST SHA3 + runner-up BLAKE. + BLAKE512 - Very fast 256-bit BLAKE2, derived from the NIST SHA3 + runner-up BLAKE. '-F' - Perform Fixed Block Deduplication. This is faster than fingerprinting based content-aware deduplication in some cases. However this is mostly diff --git a/main.c b/main.c index 38a6361..fc13817 100644 --- a/main.c +++ b/main.c @@ -1116,7 +1116,7 @@ start_decompress(const char *filename, const char *to_filename) if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size, algo, &props, enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0, - NULL, pipe_mode); + NULL, pipe_mode, nprocs); if (tdat->rctx == NULL) { UNCOMP_BAIL; } @@ -1962,12 +1962,13 @@ start_compress(const char *filename, uint64_t chunksize, int level) if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size, algo, &props, enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size, - tmpdir, pipe_mode); + tmpdir, pipe_mode, nprocs); if (tdat->rctx == NULL) { COMP_BAIL; } tdat->rctx->index_sem = &(tdat->index_sem); + tdat->rctx->id = i; } else { tdat->rctx = NULL; } @@ -1991,9 +1992,9 @@ start_compress(const char *filename, uint64_t chunksize, int level) tdat = dary[i]; tdat->rctx->index_sem_next = &(dary[(i + 1) % nprocs]->index_sem); } + // When doing global dedupe first thread does not wait to access the index. + sem_post(&(dary[0]->index_sem)); } - // When doing global dedupe first thread does not wait to access the index. - sem_post(&(dary[0]->index_sem)); w.dary = dary; w.wfd = compfd; @@ -2105,7 +2106,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) file_offset = 0; if (enable_rabin_split) { rctx = create_dedupe_context(chunksize, 0, 0, algo, &props, enable_delta_encode, - enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pipe_mode); + enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pipe_mode, nprocs); rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); } else { rbytes = Read(uncompfd, cread_buf, chunksize); diff --git a/rabin/global/index.c b/rabin/global/index.c index 684bb60..6793d82 100644 --- a/rabin/global/index.c +++ b/rabin/global/index.c @@ -266,6 +266,8 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch * file is created to hold rabin block hash lists for each segment. */ if (pct_interval > 0) { + int errored; + strcpy(cfg->rootdir, tmppath); strcat(cfg->rootdir, "/.segXXXXXX"); cfg->seg_fd_w = mkstemp(cfg->rootdir); @@ -277,11 +279,26 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch free(cfg); return (NULL); } + + errored = 0; for (i = 0; i < nthreads; i++) { cfg->seg_fd_r[i].fd = open(cfg->rootdir, O_RDONLY); + if (cfg->seg_fd_r[i].fd == -1) { + perror(" "); + errored = 1; + break; + } cfg->seg_fd_r[i].mapping = NULL; } + if (errored) { + cleanup_indx(indx); + if (cfg->seg_fd_r) + free(cfg->seg_fd_r); + free(cfg); + return (NULL); + } + /* * Remove tempfile entry from the filesystem metadata so that file gets * automatically removed once process exits. @@ -314,27 +331,33 @@ db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, ui *((uint64_t *)(hdr + 4)) = file_offset; w = Write(cfg->seg_fd_w, hdr, sizeof (hdr)); - if (w < sizeof (hdr)) + if (w < sizeof (hdr)) { + /* + * On error restore file pointer to previous position so that + * all subsequent offsets will be properly computed. + */ + lseek(cfg->seg_fd_w, cfg->segcache_pos, SEEK_SET); return (-1); + } cfg->segcache_pos += w; w = Write(cfg->seg_fd_w, buf, len); - if (w < len) + if (w < len) { + /* + * On error restore file pointer to previous position so that + * all subsequent offsets will be properly computed. + */ + lseek(cfg->seg_fd_w, cfg->segcache_pos, SEEK_SET); return (-1); + } cfg->segcache_pos += w; return (0); } -void -db_segcache_sync(archive_config_t *cfg) -{ - fdatasync(cfg->seg_fd_w); -} - /* * Get the current file pointer position of the metadata file. This indicates the * position where the next entry will be added. */ -int +uint64_t db_segcache_pos(archive_config_t *cfg, int tid) { return (cfg->segcache_pos); @@ -369,8 +392,10 @@ db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offs */ db_segcache_unmap(cfg, tid); fd = cfg->seg_fd_r[tid].fd; - if (lseek(fd, *offset, SEEK_SET) != *offset) + if (lseek(fd, *offset, SEEK_SET) != *offset) { + perror(" "); return (-1); + } /* * Mmap hdr and blocks. We assume max # of rabin block entries and mmap (unless remaining @@ -383,8 +408,10 @@ db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offs len = pos - *offset; mapbuf = mmap(NULL, len + adj, PROT_READ, MAP_SHARED, fd, *offset - adj); - if (mapbuf == MAP_FAILED) + if (mapbuf == MAP_FAILED) { + perror(" "); return (-1); + } cfg->seg_fd_r[tid].cache_offset = *offset; hdr = mapbuf + adj; diff --git a/rabin/global/index.h b/rabin/global/index.h index a9f0e0e..d63bd0c 100644 --- a/rabin/global/index.h +++ b/rabin/global/index.h @@ -55,8 +55,7 @@ hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int void destroy_global_db_s(archive_config_t *cfg); int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset); -void db_segcache_sync(archive_config_t *cfg); -int db_segcache_pos(archive_config_t *cfg, int tid); +uint64_t db_segcache_pos(archive_config_t *cfg, int tid); int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks); int db_segcache_unmap(archive_config_t *cfg, int tid); diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 86d391e..8785a11 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -157,7 +157,8 @@ global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_in dedupe_context_t * create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, - int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode) { + int file_version, compress_op_t op, uint64_t file_size, char *tmppath, + int pipe_mode, int nthreads) { dedupe_context_t *ctx; uint32_t i; @@ -218,7 +219,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, pct_interval, algo, props->cksum, GLOBAL_SIM_CKSUM, file_size, - msys_info.freeram, props->nthreads); + msys_info.freeram, nthreads); if (arc == NULL) { pthread_mutex_unlock(&init_lock); return (NULL); @@ -722,6 +723,10 @@ process_blocks: DEBUG_STAT_EN(en_1 = get_wtime_millis()); DEBUG_STAT_EN(fprintf(stderr, "Original size: %" PRId64 ", blknum: %u\n", *size, blknum)); DEBUG_STAT_EN(fprintf(stderr, "Number of maxlen blocks: %u\n", max_count)); + if (blknum <=2 && ctx->arc) { + sem_wait(ctx->index_sem); + sem_post(ctx->index_sem_next); + } if (blknum > 2) { uint64_t pos, matchlen, pos1 = 0; int valid = 1; @@ -906,11 +911,11 @@ process_blocks: sim_ck = ctx->similarity_cksums; sub_i = cfg->sub_intervals; tgt = seg_heap; - increment = cfg->chunk_cksum_sz; + increment = cfg->chunk_cksum_sz / 2; if (increment * sub_i > length) sub_i = length / increment; for (j = 0; jsimilarity_cksum_sz; @@ -927,9 +932,13 @@ process_blocks: } seg_offset = db_segcache_pos(cfg, ctx->id); - len = blks * sizeof (global_blockentry_t); - db_segcache_write(cfg, ctx->id, (uchar_t *)&(ctx->g_blocks[i]), - len, blks-i, ctx->file_offset); + len = (blks-i) * sizeof (global_blockentry_t); + if (db_segcache_write(cfg, ctx->id, (uchar_t *)&(ctx->g_blocks[i]), + len, blks-i, ctx->file_offset) == -1) { + sem_post(ctx->index_sem_next); + ctx->valid = 0; + return (0); + } /* * Now lookup all the similarity hashes. We sort the hashes first so that @@ -993,7 +1002,6 @@ process_blocks: * Signal the next thread in sequence to access the index. */ sem_post(ctx->index_sem_next); - db_segcache_sync(cfg); /* * Now go through all the matching segments for all the current segments diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index c9bcd13..f8c5bd0 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -190,7 +190,7 @@ typedef struct { extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, - int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode); + int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode, int nthreads); extern void destroy_dedupe_context(dedupe_context_t *ctx); extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf, uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt);