diff --git a/crypto/crypto_utils.c b/crypto/crypto_utils.c index 52d8889..7d7dd0f 100644 --- a/crypto/crypto_utils.c +++ b/crypto/crypto_utils.c @@ -203,7 +203,7 @@ get_crypto_alg(char *name) * need or want another level of parallelism to cause contention. */ int -compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, int mt) +compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, int mt, int verbose) { DEBUG_STAT_EN(double strt, en); @@ -213,7 +213,7 @@ compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, in assert(mt == 0 || mt == 1); #endif - DEBUG_STAT_EN(strt = get_wtime_millis()); + DEBUG_STAT_EN(if (verbose) strt = get_wtime_millis()); if (cksum == CKSUM_CRC64) { uint64_t *ck = (uint64_t *)cksum_buf; *ck = lzma_crc64(buf, bytes, 0); @@ -321,8 +321,8 @@ compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, in } else { return (-1); } - DEBUG_STAT_EN(en = get_wtime_millis()); - DEBUG_STAT_EN(fprintf(stderr, "Checksum computed at %.3f MB/s\n", get_mb_s(bytes, strt, en))); + DEBUG_STAT_EN(if (verbose) en = get_wtime_millis()); + DEBUG_STAT_EN(if (verbose) fprintf(stderr, "Checksum computed at %.3f MB/s\n", get_mb_s(bytes, strt, en))); return (0); } @@ -789,10 +789,10 @@ init_crypto(crypto_ctx_t *cctx, uchar_t *pwd, int pwd_len, int crypto_alg, b += 4; *((uint32_t *)&sb[b]) = getpid(); b += 4; - compute_checksum(&sb[b], CKSUM_SHA256, sb, b, 0); + compute_checksum(&sb[b], CKSUM_SHA256, sb, b, 0, 0); b = 8 + 4; *((uint32_t *)&sb[b]) = rand(); - compute_checksum(salt, CKSUM_SHA256, &sb[b], 32 + 4, 0); + compute_checksum(salt, CKSUM_SHA256, &sb[b], 32 + 4, 0, 0); } } diff --git a/crypto/crypto_utils.h b/crypto/crypto_utils.h index 0afe015..4f8f7ba 100644 --- a/crypto/crypto_utils.h +++ b/crypto/crypto_utils.h @@ -80,7 +80,7 @@ typedef struct { /* * Generic message digest functions. */ -int compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, int mt); +int compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, int mt, int verbose); void list_checksums(FILE *strm, char *pad); int get_checksum_props(const char *name, int *cksum, int *cksum_bytes, int *mac_bytes, int accept_compatible); diff --git a/main.c b/main.c index 7d4ad11..1a1a0e2 100644 --- a/main.c +++ b/main.c @@ -492,9 +492,9 @@ redo: deserialize_checksum(tdat->checksum, tdat->compressed_chunk, cksum_bytes); } - if ((enable_rabin_scan || enable_fixed_scan) && (HDR & CHUNK_FLAG_DEDUP)) { + if ((enable_rabin_scan || enable_fixed_scan || enable_rabin_global) && + (HDR & CHUNK_FLAG_DEDUP)) { uchar_t *cmpbuf, *ubuf; - /* Extract various sizes from dedupe header. */ parse_dedupe_hdr(cseg, &blknum, &dedupe_index_sz, &dedupe_data_sz, &dedupe_index_sz_cmp, &dedupe_data_sz_cmp, &_chunksize); @@ -519,8 +519,8 @@ redo: rv = tdat->decompress(cmpbuf, dedupe_data_sz_cmp, ubuf, &_chunksize, tdat->level, HDR, tdat->data); DEBUG_STAT_EN(en = get_wtime_millis()); - DEBUG_STAT_EN(fprintf(stderr, "Chunk decompression speed %.3f MB/s\n", - get_mb_s(_chunksize, strt, en))); + DEBUG_STAT_EN(fprintf(stderr, "Chunk %d decompression speed %.3f MB/s\n", + tdat->id, get_mb_s(_chunksize, strt, en))); } if (rv == -1) { tdat->len_cmp = 0; @@ -551,6 +551,13 @@ redo: memcpy(ubuf, cmpbuf, dedupe_index_sz); } else { + /* + * This chunk was not deduplicated, however we still need to down the + * semaphore in order to maintain proper thread coordination. + */ + if (enable_rabin_global) { + sem_wait(tdat->rctx->index_sem); + } if (HDR & COMPRESSED) { if (HDR & CHUNK_FLAG_PREPROC) { rv = preproc_decompress(tdat->decompress, cseg, tdat->len_cmp, @@ -607,7 +614,7 @@ redo: * If it does not match we set length of chunk to 0 to indicate * exit to the writer thread. */ - compute_checksum(checksum, cksum, tdat->uncompressed_chunk, _chunksize, tdat->cksum_mt); + compute_checksum(checksum, cksum, tdat->uncompressed_chunk, _chunksize, tdat->cksum_mt, 1); if (memcmp(checksum, tdat->checksum, cksum_bytes) != 0) { tdat->len_cmp = 0; fprintf(stderr, "ERROR: Chunk %d, checksums do not match.\n", tdat->id); @@ -663,9 +670,10 @@ start_decompress(const char *filename, const char *to_filename) char algorithm[ALGO_SZ]; struct stat sbuf; struct wdata w; - int compfd = -1, i, p; + int compfd = -1, p, dedupe_flag; int uncompfd = -1, err, np, bail; int nprocs = 1, thread = 0, level; + unsigned int i; short version, flags; int64_t chunksize, compressed_chunksize; struct cmp_data **dary, *tdat; @@ -690,7 +698,7 @@ start_decompress(const char *filename, const char *to_filename) if (sbuf.st_size == 0) return (1); - if ((uncompfd = open(to_filename, O_WRONLY|O_CREAT|O_TRUNC, 0)) == -1) { + if ((uncompfd = open(to_filename, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR | S_IWUSR)) == -1) { close(compfd); err_exit(1, "Cannot open: %s", to_filename); } @@ -768,12 +776,20 @@ start_decompress(const char *filename, const char *to_filename) } } + dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler if (flags & FLAG_DEDUP) { enable_rabin_scan = 1; + dedupe_flag = RABIN_DEDUPE_SEGMENTED; if (flags & FLAG_DEDUP_FIXED) { if (version > 7) { + if (pipe_mode) { + fprintf(stderr, "Global Deduplication is not supported with pipe mode.\n"); + err = 1; + goto uncomp_done; + } enable_rabin_global = 1; + dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL; } else { fprintf(stderr, "Invalid file deduplication flags.\n"); err = 1; @@ -782,6 +798,7 @@ start_decompress(const char *filename, const char *to_filename) } } else if (flags & FLAG_DEDUP_FIXED) { enable_fixed_scan = 1; + dedupe_flag = RABIN_DEDUPE_FIXED; } if (flags & FLAG_SINGLE_CHUNK) { @@ -1054,6 +1071,7 @@ start_decompress(const char *filename, const char *to_filename) tdat->compress = _compress_func; tdat->decompress = _decompress_func; tdat->cancel = 0; + tdat->decompressing = 1; if (props.is_single_chunk) { tdat->cksum_mt = 1; if (version == 6) { @@ -1068,19 +1086,28 @@ start_decompress(const char *filename, const char *to_filename) sem_init(&(tdat->start_sem), 0, 0); sem_init(&(tdat->cmp_done_sem), 0, 0); sem_init(&(tdat->write_done_sem), 0, 1); + sem_init(&(tdat->index_sem), 0, 0); + if (_init_func) { if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, version, DECOMPRESS) != 0) { UNCOMP_BAIL; } } - if (enable_rabin_scan || enable_fixed_scan) { + if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size, - algo, &props, enable_delta_encode, enable_fixed_scan, version, DECOMPRESS, 0, + algo, &props, enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0, NULL); if (tdat->rctx == NULL) { UNCOMP_BAIL; } + if (enable_rabin_global) { + if ((tdat->rctx->out_fd = open(to_filename, O_RDONLY, 0)) == -1) { + perror("Unable to get new read handle to output file"); + UNCOMP_BAIL; + } + } + tdat->rctx->index_sem = &(tdat->index_sem); } else { tdat->rctx = NULL; } @@ -1099,6 +1126,15 @@ start_decompress(const char *filename, const char *to_filename) } thread = 1; + if (enable_rabin_global) { + for (i = 0; i < nprocs; i++) { + tdat = dary[i]; + tdat->rctx->index_sem_next = &(dary[(i + 1) % nprocs]->index_sem); + } + } + // When doing global dedupe first thread does not wait to start dedupe recovery. + sem_post(&(dary[0]->index_sem)); + if (encrypt_type) { /* Erase encryption key bytes stored as a plain array. No longer reqd. */ crypto_clean_pkey(&crypto_ctx); @@ -1132,6 +1168,7 @@ start_decompress(const char *filename, const char *to_filename) sem_wait(&tdat->write_done_sem); if (main_cancel) break; tdat->id = chunk_num; + if (tdat->rctx) tdat->rctx->id = tdat->id; /* * First read length of compressed chunk. @@ -1301,7 +1338,7 @@ redo: */ if (!encrypt_type) compute_checksum(tdat->checksum, cksum, tdat->cmp_seg, tdat->rbytes, - tdat->cksum_mt); + tdat->cksum_mt, 1); rctx = tdat->rctx; reset_dedupe_context(tdat->rctx); @@ -1318,7 +1355,7 @@ redo: */ if (!encrypt_type) compute_checksum(tdat->checksum, cksum, tdat->uncompressed_chunk, - tdat->rbytes, tdat->cksum_mt); + tdat->rbytes, tdat->cksum_mt, 1); } /* @@ -1562,9 +1599,14 @@ do_cancel: main_cancel = 1; tdat->cancel = 1; sem_post(&tdat->start_sem); + if (tdat->rctx && enable_rabin_global) + sem_post(tdat->rctx->index_sem_next); sem_post(&tdat->write_done_sem); return (0); } + if (tdat->decompressing && tdat->rctx && enable_rabin_global) { + sem_post(tdat->rctx->index_sem_next); + } sem_post(&tdat->write_done_sem); } goto repeat; @@ -1583,7 +1625,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) struct wdata w; char tmpfile1[MAXPATHLEN], tmpdir[MAXPATHLEN]; char to_filename[MAXPATHLEN]; - uint64_t compressed_chunksize, n_chunksize; + uint64_t compressed_chunksize, n_chunksize, file_offset; int64_t rbytes, rabin_count; short version, flags; struct stat sbuf; @@ -1628,7 +1670,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED); dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL; if (pipe_mode) { - sbuf.st_size = SIXTEEN_GB; + return (1); } } else if (enable_rabin_scan) { flags |= FLAG_DEDUP; @@ -1845,6 +1887,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) tdat->decompress = _decompress_func; tdat->uncompressed_chunk = (uchar_t *)1; tdat->cancel = 0; + tdat->decompressing = 0; if (single_chunk) tdat->cksum_mt = 1; else @@ -1857,21 +1900,21 @@ start_compress(const char *filename, uint64_t chunksize, int level) sem_init(&(tdat->write_done_sem), 0, 1); sem_init(&(tdat->index_sem), 0, 0); - // i is unsigned so this wraps around backwards for i == 0 - tdat->index_sem_other = &(dary[(i - 1) % nprocs]->index_sem); if (_init_func) { if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, VERSION, COMPRESS) != 0) { COMP_BAIL; } } - if (enable_rabin_scan || enable_fixed_scan) { + if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size, algo, &props, enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size, tmpdir); if (tdat->rctx == NULL) { COMP_BAIL; } + + tdat->rctx->index_sem = &(tdat->index_sem); } else { tdat->rctx = NULL; } @@ -1890,6 +1933,12 @@ start_compress(const char *filename, uint64_t chunksize, int level) } thread = 1; + if (enable_rabin_global) { + for (i = 0; i < nprocs; i++) { + tdat = dary[i]; + tdat->rctx->index_sem_next = &(dary[(i + 1) % nprocs]->index_sem); + } + } // When doing global dedupe first thread does not wait to access the index. sem_post(&(dary[0]->index_sem)); @@ -2000,6 +2049,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) /* * Read the first chunk into a spare buffer (a simple double-buffering). */ + file_offset = 0; if (enable_rabin_split) { rctx = create_dedupe_context(chunksize, 0, 0, algo, &props, enable_delta_encode, enable_fixed_scan, VERSION, COMPRESS, 0, NULL); @@ -2064,12 +2114,13 @@ start_compress(const char *filename, uint64_t chunksize, int level) */ tdat->id = chunk_num; tdat->rbytes = rbytes; - if ((enable_rabin_scan || enable_fixed_scan)) { + if ((enable_rabin_scan || enable_fixed_scan || enable_rabin_global)) { tmp = tdat->cmp_seg; tdat->cmp_seg = cread_buf; cread_buf = tmp; tdat->compressed_chunk = tdat->cmp_seg + COMPRESSED_CHUNKSZ + cksum_bytes + mac_bytes; + if (tdat->rctx) tdat->rctx->file_offset = file_offset; /* * If there is data after the last rabin boundary in the chunk, then @@ -2087,6 +2138,8 @@ start_compress(const char *filename, uint64_t chunksize, int level) tdat->uncompressed_chunk = cread_buf; cread_buf = tmp; } + file_offset += tdat->rbytes; + if (rbytes < chunksize) { if (rbytes < 0) { bail = 1; @@ -2344,7 +2397,7 @@ main(int argc, char *argv[]) slab_init(); init_pcompress(); - while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDEe:w:rLPS:B:Fk:")) != -1) { + while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:rLPS:B:Fk:")) != -1) { int ovr; switch (opt) { @@ -2409,6 +2462,10 @@ main(int argc, char *argv[]) enable_rabin_scan = 1; break; + case 'G': + enable_rabin_global = 1; + break; + case 'E': enable_rabin_scan = 1; if (!enable_delta_encode) @@ -2503,6 +2560,20 @@ main(int argc, char *argv[]) exit(1); } + /* + * Global Deduplication can use Rabin or Fixed chunking. Default, if not specified, + * is to use Rabin. + */ + if (enable_rabin_global && !enable_rabin_scan && !enable_fixed_scan) { + enable_rabin_scan = 1; + enable_rabin_split = 1; + } + + if (enable_rabin_global && pipe_mode) { + fprintf(stderr, "Global Deduplication is not supported in pipe mode.\n"); + exit(1); + } + if (num_rem == 0 && !pipe_mode) { usage(); /* At least 1 filename needed. */ exit(1); diff --git a/pcompress.h b/pcompress.h index b678c2b..ee84500 100644 --- a/pcompress.h +++ b/pcompress.h @@ -40,7 +40,7 @@ extern "C" { #define CHUNK_FLAG_SZ 1 #define ALGO_SZ 8 #define MIN_CHUNK 2048 -#define VERSION 7 +#define VERSION 8 #define FLAG_DEDUP 1 #define FLAG_DEDUP_FIXED 2 #define FLAG_SINGLE_CHUNK 4 @@ -195,11 +195,11 @@ struct cmp_data { sem_t cmp_done_sem; sem_t write_done_sem; sem_t index_sem; - sem_t *index_sem_other; void *data; pthread_t thr; mac_ctx_t chunk_hmac; algo_props_t *props; + int decompressing; }; #ifdef __cplusplus diff --git a/rabin/global/db.c b/rabin/global/db.c index fcb639e..ac6e6f6 100644 --- a/rabin/global/db.c +++ b/rabin/global/db.c @@ -105,7 +105,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch } if (path != NULL) { - printf("Disk based index not yet implemented.\n"); + fprintf(stderr, "Disk based index not yet implemented.\n"); free(cfg); return (NULL); } else { @@ -148,7 +148,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch return (NULL); } - indx->memlimit = memlimit; + indx->memlimit = memlimit - (hash_entry_size << 2); indx->list = (htab_t *)calloc(intervals, sizeof (htab_t)); indx->hash_entry_size = hash_entry_size; indx->intervals = intervals; @@ -198,7 +198,7 @@ mycmp(uchar_t *a, uchar_t *b, int sz) len = 0; do { val1 = *((size_t *)v1); - val2 = *((size_t *)v1); + val2 = *((size_t *)v2); if (val1 != val2) { return (1); } @@ -211,7 +211,8 @@ mycmp(uchar_t *a, uchar_t *b, int sz) } /* - * Lookup and insert item if indicated. Not thread-safe by design. + * Lookup and insert item if indicated. Not thread-safe by design. Caller needs to + * ensure thread-safety. */ hash_entry_t * db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, @@ -221,7 +222,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, index_t *indx = (index_t *)(cfg->dbdata); hash_entry_t **htab, *ent, **pent; - assert(cfg->similarity_cksum_sz && (sizeof (size_t) - 1) == 0); + assert((cfg->similarity_cksum_sz & (sizeof (size_t) - 1)) == 0); htab_entry = XXH32(sim_cksum, cfg->similarity_cksum_sz, 0); htab_entry ^= (htab_entry / cfg->similarity_cksum_sz); htab_entry = htab_entry % indx->hash_slots; @@ -248,9 +249,8 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, } } if (do_insert) { - if (indx->memused + indx->hash_entry_size >= indx->memlimit - (indx->hash_entry_size << 2)) { + if (indx->memused + indx->hash_entry_size >= indx->memlimit) { ent = htab[htab_entry]; - pent = &(htab[htab_entry]); htab[htab_entry] = htab[htab_entry]->next; } else { ent = (hash_entry_t *)malloc(indx->hash_entry_size); diff --git a/rabin/global/dedupe_config.c b/rabin/global/dedupe_config.c index 4efecc5..e9bcf7f 100644 --- a/rabin/global/dedupe_config.c +++ b/rabin/global/dedupe_config.c @@ -360,7 +360,7 @@ set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim cfg->archive_sz = file_sz; cfg->dedupe_mode = MODE_SIMILARITY; - if (cfg->archive_sz <= SIXTEEN_GB) { + if (cfg->archive_sz <= SIXTEEN_GB || pct_interval == 0) { cfg->dedupe_mode = MODE_SIMPLE; cfg->segment_sz_bytes = user_chunk_sz; diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 105186a..0b06d45 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -68,6 +68,8 @@ #include #include #include +#include +#include #include #include #include @@ -133,16 +135,15 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s int file_version, compress_op_t op, uint64_t file_size, char *tmppath) { dedupe_context_t *ctx; uint32_t i; - archive_config_t *arc; if (rab_blk_sz < 1 || rab_blk_sz > 5) rab_blk_sz = RAB_BLK_DEFAULT; if (dedupe_flag == RABIN_DEDUPE_FIXED || dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) { delta_flag = 0; - inited = 1; + if (dedupe_flag != RABIN_DEDUPE_FILE_GLOBAL) + inited = 1; } - arc = NULL; /* * Pre-compute a table of irreducible polynomial evaluations for each @@ -173,11 +174,24 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s ir[j] = val; } + /* + * If Global Deduplication is enabled initialize the in-memory index. + * It is essentially a hashtable that is used for crypto-hash based + * chunk matching. + */ if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) { my_sysinfo msys_info; + /* + * Get available free memory. + */ get_sysinfo(&msys_info); - arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, DEFAULT_PCT_INTERVAL, + + /* + * Use a maximum of approx 62% of free RAM for the index. + */ + msys_info.freeram = (msys_info.freeram >> 1) + (msys_info.freeram >> 3); + arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0, algo, props->cksum, props->cksum, file_size, msys_info.freeram, props->nthreads); if (arc == NULL) { @@ -220,6 +234,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s ctx->rabin_poly_min_block_size = dedupe_min_blksz(rab_blk_sz); ctx->delta_flag = 0; ctx->deltac_min_distance = props->deltac_min_distance; + ctx->pagesize = sysconf(_SC_PAGE_SIZE); /* * Scale down similarity percentage based on avg block size unless user specified @@ -256,12 +271,12 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s ctx->current_window_data = (uchar_t *)1; #endif ctx->blocks = NULL; - if (real_chunksize > 0) { + if (real_chunksize > 0 && dedupe_flag != RABIN_DEDUPE_FILE_GLOBAL) { ctx->blocks = (rabin_blockentry_t **)slab_calloc(NULL, ctx->blknum, sizeof (rabin_blockentry_t *)); } if(ctx == NULL || ctx->current_window_data == NULL || - (ctx->blocks == NULL && real_chunksize > 0)) { + (ctx->blocks == NULL && real_chunksize > 0 && dedupe_flag != RABIN_DEDUPE_FILE_GLOBAL)) { fprintf(stderr, "Could not allocate rabin polynomial context, out of memory\n"); destroy_dedupe_context(ctx); @@ -383,6 +398,15 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of */ ary_sz = ctx->rabin_poly_max_block_size; ctx_heap = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz); + + /* + * If global dedupe is active, the global blocks array uses temp space in + * the target buffer. + */ + if (ctx->arc != NULL) { + ary_sz += (sizeof (global_blockentry_t) * (*size / ctx->rabin_poly_min_block_size + 1)); + ctx->g_blocks = (global_blockentry_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz); + } } #ifndef SSE_MODE memset(ctx->current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE); @@ -493,12 +517,18 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of cur_pos_checksum = cur_roll_checksum ^ ir[pushed_out]; if ((cur_pos_checksum & ctx->rabin_avg_block_mask) == ctx->rabin_break_patt || length >= ctx->rabin_poly_max_block_size) { - if (ctx->blocks[blknum] == 0) - ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, - sizeof (rabin_blockentry_t)); - ctx->blocks[blknum]->offset = last_offset; - ctx->blocks[blknum]->index = blknum; // Need to store for sorting - ctx->blocks[blknum]->length = length; + + if (!(ctx->arc)) { + if (ctx->blocks[blknum] == 0) + ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, + sizeof (rabin_blockentry_t)); + ctx->blocks[blknum]->offset = last_offset; + ctx->blocks[blknum]->index = blknum; // Need to store for sorting + ctx->blocks[blknum]->length = length; + } else { + ctx->g_blocks[blknum].length = length; + ctx->g_blocks[blknum].offset = last_offset; + } DEBUG_STAT_EN(if (length >= ctx->rabin_poly_max_block_size) ++max_count); /* @@ -510,7 +540,7 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of * Once block contents are arranged in a min heap we compute the K min values * sketch by hashing over the heap till K%. We interpret the raw bytes as a * sequence of 64-bit integers. - * This is called minhashing and is used widely, for example in various + * This is variant of minhashing which is used widely, for example in various * search engines to detect similar documents. */ if (ctx->delta_flag) { @@ -537,13 +567,18 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of // Insert the last left-over trailing bytes, if any, into a block. if (last_offset < *size) { - if (ctx->blocks[blknum] == 0) - ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, - sizeof (rabin_blockentry_t)); - ctx->blocks[blknum]->offset = last_offset; - ctx->blocks[blknum]->index = blknum; length = *size - last_offset; - ctx->blocks[blknum]->length = length; + if (!(ctx->arc)) { + if (ctx->blocks[blknum] == 0) + ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL, + sizeof (rabin_blockentry_t)); + ctx->blocks[blknum]->offset = last_offset; + ctx->blocks[blknum]->index = blknum; + ctx->blocks[blknum]->length = length; + } else { + ctx->g_blocks[blknum].length = length; + ctx->g_blocks[blknum].offset = last_offset; + } if (ctx->delta_flag) { uint64_t cur_sketch; @@ -576,19 +611,161 @@ process_blocks: DEBUG_STAT_EN(fprintf(stderr, "Original size: %" PRId64 ", blknum: %u\n", *size, blknum)); DEBUG_STAT_EN(fprintf(stderr, "Number of maxlen blocks: %u\n", max_count)); if (blknum > 2) { - uint64_t pos, matchlen, pos1; + uint64_t pos, matchlen, pos1 = 0; int valid = 1; uint32_t *dedupe_index; - uint64_t dedupe_index_sz; + uint64_t dedupe_index_sz = 0; rabin_blockentry_t *be; DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions); DEBUG_STAT_EN(delta_calls = 0); DEBUG_STAT_EN(delta_fails = 0); DEBUG_STAT_EN(hash_collisions = 0); - ary_sz = (blknum << 1) * sizeof (rabin_blockentry_t *); - htab = (rabin_blockentry_t **)(ctx->cbuf + ctx->real_chunksize - ary_sz); - memset(htab, 0, ary_sz); + /* + * If global dedupe is enabled then process it here. + */ + if (ctx->arc) { + if (ctx->arc->dedupe_mode == MODE_SIMPLE) { + uchar_t *g_dedupe_idx, *tgt, *src; + /* + * First compute all the rabin chunk/block cryptographic hashes. + */ +#if defined(_OPENMP) +# pragma omp parallel for if (mt) +#endif + for (i=0; ig_blocks[i].cksum, + ctx->arc->chunk_cksum_type, buf1+ctx->g_blocks[i].offset, + ctx->g_blocks[i].length, 0, 0); + } + + /* + * Index table within this segment. + */ + g_dedupe_idx = ctx->cbuf + RABIN_HDR_SIZE; + dedupe_index_sz = 0; + + /* + * First entry in table in the original file offset where this + * data segment begins. + */ + *((uint64_t *)g_dedupe_idx) = LE64(ctx->file_offset); + g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); + dedupe_index_sz += 2; + length = 0; + matchlen = 0; + + /* + * Now lookup blocks in index. First wait for our semaphore to be + * signaled. If the previous thread in sequence is using the index + * he will finish and then signal our semaphore. So we can have + * predictable serialization of index access in a sequence of + * threads without locking. + */ + sem_wait(ctx->index_sem); + for (i=0; iarc, ctx->g_blocks[i].cksum, 0, + ctx->file_offset + ctx->g_blocks[i].offset, + ctx->g_blocks[i].length, 1); + if (!he) { + /* + * Block match in index not found. + * Block was added to index. Merge this block. + */ + if (length + ctx->g_blocks[i].length > RABIN_MAX_BLOCK_SIZE) { + *((uint32_t *)g_dedupe_idx) = LE32(length); + g_dedupe_idx += RABIN_ENTRY_SIZE; + length = 0; + dedupe_index_sz++; + } + length += ctx->g_blocks[i].length; + } else { + /* + * Block match in index was found. + */ + if (length > 0) { + /* + * Write pending accumulated block length value. + */ + *((uint32_t *)g_dedupe_idx) = LE32(length); + g_dedupe_idx += RABIN_ENTRY_SIZE; + length = 0; + dedupe_index_sz++; + } + /* + * Add a reference entry to the dedupe array. + */ + *((uint32_t *)g_dedupe_idx) = LE32((he->item_size | RABIN_INDEX_FLAG) & + CLEAR_SIMILARITY_FLAG); + g_dedupe_idx += RABIN_ENTRY_SIZE; + *((uint64_t *)g_dedupe_idx) = LE64(he->item_offset); + g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); + matchlen += he->item_size; + dedupe_index_sz += 3; + } + } + + /* + * Signal the next thread in sequence to access the index. + */ + sem_post(ctx->index_sem_next); + + /* + * Write final pending block length value (if any). + */ + if (length > 0) { + *((uint32_t *)g_dedupe_idx) = LE32(length); + g_dedupe_idx += RABIN_ENTRY_SIZE; + length = 0; + dedupe_index_sz++; + } + + blknum = dedupe_index_sz; // Number of entries in block list + tgt = g_dedupe_idx; + g_dedupe_idx = ctx->cbuf + RABIN_HDR_SIZE; + dedupe_index_sz = tgt - g_dedupe_idx; + src = buf1; + g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); + + /* + * Deduplication reduction should at least be greater than block list metadata. + */ + if (matchlen < dedupe_index_sz) { + DEBUG_STAT_EN(en = get_wtime_millis()); + DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n", + get_mb_s(*size, strt, en_1), get_mb_s(*size, strt, en))); + DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n")); + ctx->valid = 0; + return (0); + } + + /* + * Now copy the block data; + */ + for (i=0; icbuf; + blknum |= GLOBAL_FLAG; + } + goto dedupe_done; + } /* * Compute hash signature for each block. We do this in a separate loop to @@ -613,6 +790,10 @@ process_blocks: } } + ary_sz = (blknum << 1) * sizeof (rabin_blockentry_t *); + htab = (rabin_blockentry_t **)(ctx->cbuf + ctx->real_chunksize - ary_sz); + memset(htab, 0, ary_sz); + /* * Perform hash-matching of blocks and use a bucket-chained hashtable to match * for duplicates and similar blocks. Unique blocks are inserted and duplicates @@ -792,11 +973,11 @@ process_blocks: } } +dedupe_done: if (valid) { uchar_t *cbuf = ctx->cbuf; uint64_t *entries; DEBUG_STAT_EN(uint64_t sz); - DEBUG_STAT_EN(sz = *size); *((uint32_t *)cbuf) = htonl(blknum); cbuf += sizeof (uint32_t); @@ -808,7 +989,7 @@ process_blocks: ctx->valid = 1; DEBUG_STAT_EN(en = get_wtime_millis()); DEBUG_STAT_EN(fprintf(stderr, "Deduped size: %" PRId64 ", blknum: %u, delta_calls: %u, delta_fails: %u\n", - *size, blknum, delta_calls, delta_fails)); + *size, (unsigned int)(blknum & CLEAR_GLOBAL_FLAG), delta_calls, delta_fails)); DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n", get_mb_s(sz, strt, en_1), get_mb_s(sz, strt, en))); /* @@ -844,7 +1025,7 @@ parse_dedupe_hdr(uchar_t *buf, uint32_t *blknum, uint64_t *dedupe_index_sz, entries = (uint64_t *)buf; *dedupe_data_sz = ntohll(entries[0]); - *dedupe_index_sz = (uint64_t)(*blknum) * RABIN_ENTRY_SIZE; + *dedupe_index_sz = (uint64_t)(*blknum & CLEAR_GLOBAL_FLAG) * RABIN_ENTRY_SIZE; *dedupe_index_sz_cmp = ntohll(entries[1]); *deduped_size = ntohll(entries[2]); *dedupe_data_sz_cmp = ntohll(entries[3]); @@ -866,6 +1047,79 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size) sz = 0; ctx->valid = 1; + /* + * Handling for Global Deduplication. + */ + if (blknum & GLOBAL_FLAG) { + uchar_t *g_dedupe_idx, *src1, *src2; + uint64_t adj, offset; + uint32_t flag; + + blknum &= CLEAR_GLOBAL_FLAG; + g_dedupe_idx = buf + RABIN_HDR_SIZE; + offset = LE64(*((uint64_t *)g_dedupe_idx)); + g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); + blknum -= 2; + src1 = buf + RABIN_HDR_SIZE + dedupe_index_sz; + + sem_wait(ctx->index_sem); + for (blk=0; blk data_sz) { + fprintf(stderr, "Dedup data overflows chunk.\n"); + ctx->valid = 0; + break; + } + if (flag == 0) { + memcpy(pos2, src1, len); + pos2 += len; + src1 += len; + sz += len; + } else { + pos1 = LE64(*((uint64_t *)g_dedupe_idx)); + g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); + blk += 2; + + /* + * Handling of chunk references at duplicate chunks. + * + * If required data offset is greater than the current segment's starting + * offset then the referenced chunk is already in the current segment in + * RAM. Just mem-copy it. + * Otherwise it will be in the current output file. We mmap() the relevant + * region and copy it. The way deduplication is done it is guaranteed that + * all duplicate reference will be backward references so this approach works. + * + * However this approach precludes pipe-mode streamed decompression since + * it requires random access to the output file. + */ + if (pos1 > offset) { + src2 = ctx->cbuf + (pos1 - offset); + memcpy(pos2, src2, len); + } else { + adj = pos1 % ctx->pagesize; + src2 = mmap(NULL, len + adj, PROT_READ, MAP_SHARED, ctx->out_fd, pos1 - adj); + if (src2 == NULL) { + perror("MMAP failed "); + ctx->valid = 0; + break; + } + memcpy(pos2, src2 + adj, len); + munmap(src2, len + adj); + } + pos2 += len; + sz += len; + } + } + *size = data_sz; + return; + } + slab_cache_add(sizeof (rabin_blockentry_t)); for (blk = 0; blk < blknum; blk++) { if (ctx->blocks[blk] == 0) @@ -944,13 +1198,3 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size) } *size = data_sz; } - -/* - * TODO: Consolidate rabin dedup and compression/decompression in functions here rather than - * messy code in main program. -int -rabin_compress(dedupe_context_t *ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *tolen, - int level, char chdr, void *data, compress_func_ptr cmp) -{ -} -*/ diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index 4a497ef..c84784a 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -65,6 +65,9 @@ #include "utils.h" #include +#include +#include +#include //List of constants, mostly constraints and defaults for various parameters //to the Rabin Fingerprinting algorithm @@ -118,6 +121,8 @@ #define SET_SIMILARITY_FLAG (0x40000000UL) #define GET_SIMILARITY_FLAG SET_SIMILARITY_FLAG #define CLEAR_SIMILARITY_FLAG (0xBFFFFFFFUL) +#define GLOBAL_FLAG RABIN_INDEX_FLAG +#define CLEAR_GLOBAL_FLAG (0x7fffffffUL) #define RABIN_DEDUPE_SEGMENTED 0 #define RABIN_DEDUPE_FIXED 1 @@ -160,13 +165,15 @@ typedef struct rab_blockentry { } rabin_blockentry_t; typedef struct global_blockentry { - uint64_t offset; uint32_t length; + uint64_t offset; + uchar_t cksum[CKSUM_MAX_BYTES]; } global_blockentry_t; typedef struct { unsigned char *current_window_data; rabin_blockentry_t **blocks; + global_blockentry_t *g_blocks; uint32_t blknum; unsigned char *cbuf; uint32_t rabin_poly_max_block_size; @@ -178,11 +185,17 @@ typedef struct { short valid; void *lzma_data; int level, delta_flag, dedupe_flag, deltac_min_distance; + uint64_t file_offset; // For global dedupe archive_config_t *arc; + sem_t *index_sem; + sem_t *index_sem_next; + uint32_t pagesize; + int out_fd; + int id; } dedupe_context_t; extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, - int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int fixed_flag, + int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, int file_version, compress_op_t op, uint64_t file_size, char *tmppath); extern void destroy_dedupe_context(dedupe_context_t *ctx); extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf,