diff --git a/main.c b/main.c index ab15c1e..7d4ad11 100644 --- a/main.c +++ b/main.c @@ -1076,7 +1076,8 @@ start_decompress(const char *filename, const char *to_filename) } if (enable_rabin_scan || enable_fixed_scan) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size, - algo, &props, enable_delta_encode, enable_fixed_scan, version, DECOMPRESS); + algo, &props, enable_delta_encode, enable_fixed_scan, version, DECOMPRESS, 0, + NULL); if (tdat->rctx == NULL) { UNCOMP_BAIL; } @@ -1580,7 +1581,7 @@ static int start_compress(const char *filename, uint64_t chunksize, int level) { struct wdata w; - char tmpfile1[MAXPATHLEN]; + char tmpfile1[MAXPATHLEN], tmpdir[MAXPATHLEN]; char to_filename[MAXPATHLEN]; uint64_t compressed_chunksize, n_chunksize; int64_t rbytes, rabin_count; @@ -1588,7 +1589,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) struct stat sbuf; int compfd = -1, uncompfd = -1, err; int thread, bail, single_chunk; - uint32_t i, nprocs, np, p; + uint32_t i, nprocs, np, p, dedupe_flag; struct cmp_data **dary = NULL, *tdat; pthread_t writer_thr; uchar_t *cread_buf, *pos; @@ -1621,13 +1622,21 @@ start_compress(const char *filename, uint64_t chunksize, int level) } flags = 0; + dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { - if (enable_rabin_global) + if (enable_rabin_global) { flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED); - else if (enable_rabin_scan) + dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL; + if (pipe_mode) { + sbuf.st_size = SIXTEEN_GB; + } + } else if (enable_rabin_scan) { flags |= FLAG_DEDUP; - else + dedupe_flag = RABIN_DEDUPE_SEGMENTED; + } else { flags |= FLAG_DEDUP_FIXED; + dedupe_flag = RABIN_DEDUPE_FIXED; + } /* Additional scratch space for dedup arrays. */ if (chunksize + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode) > compressed_chunksize) { @@ -1747,6 +1756,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) */ strcpy(tmpfile1, filename); strcpy(tmpfile1, dirname(tmpfile1)); + strcpy(tmpdir, tmpfile1); strcat(tmpfile1, "/.pcompXXXXXX"); snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename); if ((compfd = mkstemp(tmpfile1)) == -1) { @@ -1757,6 +1767,9 @@ start_compress(const char *filename, uint64_t chunksize, int level) signal(SIGINT, Int_Handler); signal(SIGTERM, Int_Handler); } else { + char *tmp; + struct stat st; + /* * Use stdin/stdout for pipe mode. */ @@ -1770,6 +1783,34 @@ start_compress(const char *filename, uint64_t chunksize, int level) perror("fileno "); COMP_BAIL; } + + /* + * Get a workable temporary dir. Required if global dedupe is enabled. + */ + tmp = getenv("TMPDIR"); + if (tmp == NULL) { + tmp = getenv("HOME"); + if (tmp == NULL) { + if (getcwd(tmpdir, MAXPATHLEN) == NULL) { + tmp = "/tmp"; + } else { + tmp = tmpdir; + } + } + } + if (stat(tmp, &st) == -1) { + fprintf(stderr, "Unable to find writable temporary dir.\n"); + COMP_BAIL; + } + if (!S_ISDIR(st.st_mode)) { + if (strcmp(tmp, "/tmp") != 0) { + tmp = "/tmp"; + } else { + fprintf(stderr, "Unable to find writable temporary dir.\n"); + COMP_BAIL; + } + } + strcpy(tmpdir, tmp); } if (encrypt_type) @@ -1819,13 +1860,15 @@ start_compress(const char *filename, uint64_t chunksize, int level) // i is unsigned so this wraps around backwards for i == 0 tdat->index_sem_other = &(dary[(i - 1) % nprocs]->index_sem); if (_init_func) { - if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, VERSION, COMPRESS) != 0) { + if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, + VERSION, COMPRESS) != 0) { COMP_BAIL; } } if (enable_rabin_scan || enable_fixed_scan) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size, - algo, &props, enable_delta_encode, enable_fixed_scan, VERSION, COMPRESS); + algo, &props, enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size, + tmpdir); if (tdat->rctx == NULL) { COMP_BAIL; } @@ -1959,7 +2002,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) */ if (enable_rabin_split) { rctx = create_dedupe_context(chunksize, 0, 0, algo, &props, enable_delta_encode, - enable_fixed_scan, VERSION, COMPRESS); + enable_fixed_scan, VERSION, COMPRESS, 0, NULL); rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); } else { rbytes = Read(uncompfd, cread_buf, chunksize); diff --git a/rabin/global/db.c b/rabin/global/db.c index 6bdaedf..fcb639e 100644 --- a/rabin/global/db.c +++ b/rabin/global/db.c @@ -41,13 +41,6 @@ /* * Hashtable structures for in-memory index. */ -typedef struct _hash_entry { - uint64_t item_offset; - uint32_t item_size; - struct _hash_entry *next; - uchar_t cksum[1]; -} hash_entry_t; - typedef struct { hash_entry_t **tab; } htab_t; @@ -97,7 +90,7 @@ static cleanup_indx(index_t *indx) archive_config_t * init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz, - int pct_interval, compress_algo_t algo, cksum_t ck, cksum_t ck_sim, + int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, int nthreads) { archive_config_t *cfg; @@ -107,6 +100,10 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch cfg = calloc(1, sizeof (archive_config_t)); rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, user_chunk_sz, pct_interval); + if (cfg->dedupe_mode == MODE_SIMPLE) { + pct_interval = 0; + } + if (path != NULL) { printf("Disk based index not yet implemented.\n"); free(cfg); @@ -118,12 +115,14 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch index_t *indx; // Compute total hashtable entries first - if (pct_interval == 0) + if (pct_interval == 0) { intervals = 1; - else + hash_slots = file_sz / cfg->chunk_sz_bytes + 1; + } else { intervals = 100 / pct_interval - 1; - hash_slots = file_sz / cfg->segment_sz_bytes + 1; - hash_slots *= intervals; + hash_slots = file_sz / cfg->segment_sz_bytes + 1; + hash_slots *= intervals; + } hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1; // Compute memory required to hold all hash entries assuming worst case 50% @@ -214,7 +213,7 @@ mycmp(uchar_t *a, uchar_t *b, int sz) /* * Lookup and insert item if indicated. Not thread-safe by design. */ -uint64_t +hash_entry_t * db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, uint64_t item_offset, uint32_t item_size, int do_insert) { @@ -234,7 +233,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, while (ent) { if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0 && ent->item_size == item_size) { - return (ent->item_offset); + return (ent); } pent = &(ent->next); ent = ent->next; @@ -242,7 +241,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, } else { while (ent) { if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0) { - return (ent->item_offset); + return (ent); } pent = &(ent->next); ent = ent->next; @@ -263,5 +262,5 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, memcpy(ent->cksum, sim_cksum, cfg->similarity_cksum_sz); *pent = ent; } - return (0); + return (NULL); } diff --git a/rabin/global/db.h b/rabin/global/db.h index d8b68a1..228b953 100644 --- a/rabin/global/db.h +++ b/rabin/global/db.h @@ -31,12 +31,22 @@ extern "C" { #endif +/* + * Publically visible In-memory hashtable entry. + */ +typedef struct _hash_entry { + uint64_t item_offset; + uint32_t item_size; + struct _hash_entry *next; + uchar_t cksum[1]; +} hash_entry_t; + archive_config_t *init_global_db(char *configfile); archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize, - uint64_t user_chunk_sz, int pct_interval, compress_algo_t algo, + uint64_t user_chunk_sz, int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, int nthreads); -uint64_t db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, +hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, uint64_t item_offset, uint32_t item_size, int do_insert); #ifdef __cplusplus diff --git a/rabin/global/dedupe_config.c b/rabin/global/dedupe_config.c index 5e86c92..4efecc5 100644 --- a/rabin/global/dedupe_config.c +++ b/rabin/global/dedupe_config.c @@ -38,12 +38,6 @@ #include "dedupe_config.h" #include "db.h" -#define ONE_PB (1125899906842624ULL) -#define ONE_TB (1099511627776ULL) -#define FOUR_MB (4194304ULL) -#define EIGHT_MB (8388608ULL) -#define EIGHT_GB (8589934592ULL) - static compress_algo_t get_compress_level(compress_algo_t algo) { @@ -64,7 +58,7 @@ get_compress_level(compress_algo_t algo) } static int -get_compress_algo(char *algo_name) +get_compress_algo(const char *algo_name) { if (strcmp(algo_name, "none") == 0) { return (COMPRESS_NONE); @@ -350,11 +344,11 @@ write_config(char *configfile, archive_config_t *cfg) } int -set_config_s(archive_config_t *cfg, compress_algo_t algo, cksum_t ck, cksum_t ck_sim, +set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim, uint32_t chunksize, size_t file_sz, uint64_t user_chunk_sz, int pct_interval) { - cfg->algo = algo; + cfg->algo = get_compress_algo(algo); cfg->chunk_cksum_type = ck; cfg->similarity_cksum = ck_sim; cfg->compress_level = get_compress_level(cfg->algo); @@ -366,7 +360,7 @@ set_config_s(archive_config_t *cfg, compress_algo_t algo, cksum_t ck, cksum_t ck cfg->archive_sz = file_sz; cfg->dedupe_mode = MODE_SIMILARITY; - if (cfg->archive_sz <= EIGHT_GB) { + if (cfg->archive_sz <= SIXTEEN_GB) { cfg->dedupe_mode = MODE_SIMPLE; cfg->segment_sz_bytes = user_chunk_sz; diff --git a/rabin/global/dedupe_config.h b/rabin/global/dedupe_config.h index 0c9da58..14529b7 100644 --- a/rabin/global/dedupe_config.h +++ b/rabin/global/dedupe_config.h @@ -86,7 +86,7 @@ typedef struct _segment_entry { int read_config(char *configfile, archive_config_t *cfg); int write_config(char *configfile, archive_config_t *cfg); -int set_config_s(archive_config_t *cfg, compress_algo_t algo, cksum_t ck, cksum_t ck_sim, +int set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim, uint32_t chunksize, size_t file_sz, uint64_t user_chunk_sz, int pct_interval); diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index d0ac8b5..105186a 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -104,6 +104,7 @@ extern int bspatch(u_char *pbuf, u_char *oldbuf, bsize_t oldsize, u_char *newbuf static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; uint64_t ir[256], out[256]; static int inited = 0; +archive_config_t *arc = NULL; static uint32_t dedupe_min_blksz(int rab_blk_sz) @@ -129,9 +130,10 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta dedupe_context_t * create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, - int file_version, compress_op_t op) { + int file_version, compress_op_t op, uint64_t file_size, char *tmppath) { dedupe_context_t *ctx; uint32_t i; + archive_config_t *arc; if (rab_blk_sz < 1 || rab_blk_sz > 5) rab_blk_sz = RAB_BLK_DEFAULT; @@ -140,6 +142,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s delta_flag = 0; inited = 1; } + arc = NULL; /* * Pre-compute a table of irreducible polynomial evaluations for each @@ -169,6 +172,19 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s } ir[j] = val; } + + if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) { + my_sysinfo msys_info; + + get_sysinfo(&msys_info); + arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, DEFAULT_PCT_INTERVAL, + algo, props->cksum, props->cksum, file_size, + msys_info.freeram, props->nthreads); + if (arc == NULL) { + pthread_mutex_unlock(&init_lock); + return (NULL); + } + } inited = 1; } pthread_mutex_unlock(&init_lock); @@ -194,6 +210,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s */ ctx = (dedupe_context_t *)slab_alloc(NULL, sizeof (dedupe_context_t)); ctx->rabin_poly_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE; + ctx->arc = arc; ctx->current_window_data = NULL; ctx->dedupe_flag = dedupe_flag; diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index b2c4e9f..4a497ef 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -64,6 +64,7 @@ #define _RABIN_POLY_H_ #include "utils.h" +#include //List of constants, mostly constraints and defaults for various parameters //to the Rabin Fingerprinting algorithm @@ -121,6 +122,7 @@ #define RABIN_DEDUPE_SEGMENTED 0 #define RABIN_DEDUPE_FIXED 1 #define RABIN_DEDUPE_FILE_GLOBAL 2 +#define DEFAULT_PCT_INTERVAL 2 // Mask to extract value from a rabin index entry #define RABIN_INDEX_VALUE (0x3FFFFFFFUL) @@ -176,11 +178,12 @@ typedef struct { short valid; void *lzma_data; int level, delta_flag, dedupe_flag, deltac_min_distance; + archive_config_t *arc; } dedupe_context_t; extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int fixed_flag, - int file_version, compress_op_t op); + int file_version, compress_op_t op, uint64_t file_size, char *tmppath); extern void destroy_dedupe_context(dedupe_context_t *ctx); extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf, uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt); diff --git a/utils/utils.c b/utils/utils.c index 457728e..5060ccc 100644 --- a/utils/utils.c +++ b/utils/utils.c @@ -374,8 +374,13 @@ void get_sysinfo(my_sysinfo *msys_info) { struct sysinfo sys_info; - sysinfo(&sys_info); + int rv; + rv = sysinfo(&sys_info); + + if (rv == -1) { + sys_info.freeram = 100 * 1024 * 1024; // 100M arbitrary + } msys_info->totalram = sys_info.totalram; msys_info->freeram = sys_info.freeram; msys_info->totalswap = sys_info.totalswap; diff --git a/utils/utils.h b/utils/utils.h index 2485609..c2ec2a4 100644 --- a/utils/utils.h +++ b/utils/utils.h @@ -48,6 +48,13 @@ extern "C" { #define EIGHTM (8UL * 1024UL * 1024UL) #define FOURM (4UL * 1024UL * 1024UL) +#define ONE_PB (1125899906842624ULL) +#define ONE_TB (1099511627776ULL) +#define FOUR_MB FOURM +#define EIGHT_MB EIGHTM +#define EIGHT_GB (8589934592ULL) +#define SIXTEEN_GB (EIGHT_GB * 2) + #if !defined(sun) && !defined(__sun) #define uchar_t u_char #endif