From b7fdeb08bcb7760a4e09d758a30c224e5a6c7687 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Wed, 20 Mar 2013 22:47:03 +0530 Subject: [PATCH] Work in progress global dedupe changes. --- crypto/crypto_utils.h | 23 ++------------ main.c | 28 ++++++++++++++--- pcompress.h | 4 ++- rabin/global/db.c | 70 ++++++++++++++++++++++++------------------- rabin/global/db.h | 2 +- rabin/rabin_dedup.c | 10 +++---- rabin/rabin_dedup.h | 6 +++- utils/utils.c | 15 ++++++++++ utils/utils.h | 32 ++++++++++++++++++++ 9 files changed, 126 insertions(+), 64 deletions(-) diff --git a/crypto/crypto_utils.h b/crypto/crypto_utils.h index b866b36..0afe015 100644 --- a/crypto/crypto_utils.h +++ b/crypto/crypto_utils.h @@ -30,6 +30,8 @@ #include #include +#include + #ifdef __cplusplus extern "C" { #endif @@ -58,27 +60,6 @@ extern "C" { #define MAX_NONCE 32 #define KECCAK_MAX_SEG (2305843009213693950ULL) -/* - * Public checksum properties. CKSUM_MAX_BYTES must be updated if a - * newer larger checksum is added to the list. - */ -typedef enum { - CKSUM_CRC64 = 0x100, - CKSUM_BLAKE256 = 0x200, - CKSUM_BLAKE512 = 0x300, - CKSUM_SHA256 = 0x400, - CKSUM_SHA512 = 0x500, - CKSUM_KECCAK256 = 0x600, - CKSUM_KECCAK512 = 0x700, -/* - * Backwards compatibility options. SKEIN in release 1.2 was replaced with - * Blake2 from 1.3 onwards (for sheer speed of Blake2). We want to be able - * to decode archives created with 1.2. New archives do not use SKEIN. - */ - CKSUM_SKEIN256 = 0x800, - CKSUM_SKEIN512 = 0x900, - CKSUM_INVALID = 0 -} cksum_t; typedef struct { void *crypto_ctx; diff --git a/main.c b/main.c index e0c5871..ab15c1e 100644 --- a/main.c +++ b/main.c @@ -87,6 +87,7 @@ static int nthreads = 0; static int hide_mem_stats = 1; static int hide_cmp_stats = 1; static int enable_rabin_scan = 0; +static int enable_rabin_global = 0; static int enable_delta_encode = 0; static int enable_delta2_encode = 0; static int enable_rabin_split = 1; @@ -770,6 +771,15 @@ start_decompress(const char *filename, const char *to_filename) if (flags & FLAG_DEDUP) { enable_rabin_scan = 1; + if (flags & FLAG_DEDUP_FIXED) { + if (version > 7) { + enable_rabin_global = 1; + } else { + fprintf(stderr, "Invalid file deduplication flags.\n"); + err = 1; + goto uncomp_done; + } + } } else if (flags & FLAG_DEDUP_FIXED) { enable_fixed_scan = 1; } @@ -1577,8 +1587,8 @@ start_compress(const char *filename, uint64_t chunksize, int level) short version, flags; struct stat sbuf; int compfd = -1, uncompfd = -1, err; - int i, thread, bail, single_chunk; - int nprocs, np, p; + int thread, bail, single_chunk; + uint32_t i, nprocs, np, p; struct cmp_data **dary = NULL, *tdat; pthread_t writer_thr; uchar_t *cread_buf, *pos; @@ -1599,6 +1609,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) */ compressed_chunksize = chunksize + CHUNK_HDR_SZ + zlib_buf_extra(chunksize); init_algo_props(&props); + props.cksum = cksum; cread_buf = NULL; if (_props_func) { @@ -1610,8 +1621,10 @@ start_compress(const char *filename, uint64_t chunksize, int level) } flags = 0; - if (enable_rabin_scan || enable_fixed_scan) { - if (enable_rabin_scan) + if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) { + if (enable_rabin_global) + flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED); + else if (enable_rabin_scan) flags |= FLAG_DEDUP; else flags |= FLAG_DEDUP_FIXED; @@ -1801,6 +1814,10 @@ start_compress(const char *filename, uint64_t chunksize, int level) sem_init(&(tdat->start_sem), 0, 0); sem_init(&(tdat->cmp_done_sem), 0, 0); sem_init(&(tdat->write_done_sem), 0, 1); + sem_init(&(tdat->index_sem), 0, 0); + + // i is unsigned so this wraps around backwards for i == 0 + tdat->index_sem_other = &(dary[(i - 1) % nprocs]->index_sem); if (_init_func) { if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, VERSION, COMPRESS) != 0) { COMP_BAIL; @@ -1830,6 +1847,9 @@ start_compress(const char *filename, uint64_t chunksize, int level) } thread = 1; + // When doing global dedupe first thread does not wait to access the index. + sem_post(&(dary[0]->index_sem)); + w.dary = dary; w.wfd = compfd; w.nprocs = nprocs; diff --git a/pcompress.h b/pcompress.h index 06de7bb..b678c2b 100644 --- a/pcompress.h +++ b/pcompress.h @@ -53,7 +53,7 @@ extern "C" { #define CHSIZE_MASK 0x80 #define BZIP2_A_NUM 16 #define LZMA_A_NUM 32 -#define CHUNK_FLAG_DEDUP 2 +#define CHUNK_FLAG_DEDUP 2 #define CHUNK_FLAG_PREPROC 4 #define COMP_EXTN ".pz" @@ -194,6 +194,8 @@ struct cmp_data { sem_t start_sem; sem_t cmp_done_sem; sem_t write_done_sem; + sem_t index_sem; + sem_t *index_sem_other; void *data; pthread_t thr; mac_ctx_t chunk_hmac; diff --git a/rabin/global/db.c b/rabin/global/db.c index febac06..6bdaedf 100644 --- a/rabin/global/db.c +++ b/rabin/global/db.c @@ -42,7 +42,8 @@ * Hashtable structures for in-memory index. */ typedef struct _hash_entry { - uint64_t seg_offset; + uint64_t item_offset; + uint32_t item_size; struct _hash_entry *next; uchar_t cksum[1]; } hash_entry_t; @@ -53,7 +54,6 @@ typedef struct { typedef struct { htab_t *list; - pthread_mutex_t *mlist; uint64_t memlimit; uint64_t memused; int hash_entry_size, intervals, hash_slots; @@ -91,8 +91,6 @@ static cleanup_indx(index_t *indx) } free(indx->list); } - if (indx->mlist) - free(indx->mlist); free(indx); } } @@ -153,7 +151,6 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch indx->memlimit = memlimit; indx->list = (htab_t *)calloc(intervals, sizeof (htab_t)); - indx->mlist = (pthread_mutex_t *)malloc(intervals * sizeof (pthread_mutex_t)); indx->hash_entry_size = hash_entry_size; indx->intervals = intervals; indx->hash_slots = hash_slots / intervals; @@ -167,23 +164,24 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch return (NULL); } indx->memused += ((hash_slots / intervals) * (sizeof (hash_entry_t *))); - pthread_mutex_init(&(indx->mlist[i]), NULL); } - strcpy(cfg->rootdir, tmppath); - strcat(cfg->rootdir, "/.segXXXXXX"); - cfg->seg_fd_w = mkstemp(cfg->rootdir); - cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads); - if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) { - cleanup_indx(indx); - if (cfg->seg_fd_r) - free(cfg->seg_fd_r); - free(cfg); - return (NULL); - } + if (pct_interval > 0) { + strcpy(cfg->rootdir, tmppath); + strcat(cfg->rootdir, "/.segXXXXXX"); + cfg->seg_fd_w = mkstemp(cfg->rootdir); + cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads); + if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) { + cleanup_indx(indx); + if (cfg->seg_fd_r) + free(cfg->seg_fd_r); + free(cfg); + return (NULL); + } - for (i = 0; i < nthreads; i++) { - cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY); + for (i = 0; i < nthreads; i++) { + cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY); + } } cfg->dbdata = indx; } @@ -213,9 +211,12 @@ mycmp(uchar_t *a, uchar_t *b, int sz) return (0); } +/* + * Lookup and insert item if indicated. Not thread-safe by design. + */ uint64_t db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, - uint64_t seg_offset, int do_insert) + uint64_t item_offset, uint32_t item_size, int do_insert) { uint32_t htab_entry; index_t *indx = (index_t *)(cfg->dbdata); @@ -228,17 +229,24 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, htab = indx->list[interval].tab; pent = &(htab[htab_entry]); - pthread_mutex_lock(&(indx->mlist[interval])); ent = htab[htab_entry]; - while (ent) { - if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0) { - uint64_t off; - off = ent->seg_offset; - pthread_mutex_unlock(&(indx->mlist[interval])); - return (off+1); + if (cfg->pct_interval == 0) { // Single file global dedupe case + while (ent) { + if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0 && + ent->item_size == item_size) { + return (ent->item_offset); + } + pent = &(ent->next); + ent = ent->next; + } + } else { + while (ent) { + if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0) { + return (ent->item_offset); + } + pent = &(ent->next); + ent = ent->next; } - pent = &(ent->next); - ent = ent->next; } if (do_insert) { if (indx->memused + indx->hash_entry_size >= indx->memlimit - (indx->hash_entry_size << 2)) { @@ -249,11 +257,11 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, ent = (hash_entry_t *)malloc(indx->hash_entry_size); indx->memused += indx->hash_entry_size; } - ent->seg_offset = seg_offset; + ent->item_offset = item_offset; + ent->item_size = item_size; ent->next = 0; memcpy(ent->cksum, sim_cksum, cfg->similarity_cksum_sz); *pent = ent; } - pthread_mutex_unlock(&(indx->mlist[interval])); return (0); } diff --git a/rabin/global/db.h b/rabin/global/db.h index 5552a69..d8b68a1 100644 --- a/rabin/global/db.h +++ b/rabin/global/db.h @@ -37,7 +37,7 @@ archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, int nthreads); uint64_t db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, - uint64_t seg_offset, int do_insert); + uint64_t item_offset, uint32_t item_size, int do_insert); #ifdef __cplusplus } diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 47e0390..d0ac8b5 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -128,7 +128,7 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta */ dedupe_context_t * create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, - const char *algo, const algo_props_t *props, int delta_flag, int fixed_flag, + const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, int file_version, compress_op_t op) { dedupe_context_t *ctx; uint32_t i; @@ -136,7 +136,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s if (rab_blk_sz < 1 || rab_blk_sz > 5) rab_blk_sz = RAB_BLK_DEFAULT; - if (fixed_flag) { + if (dedupe_flag == RABIN_DEDUPE_FIXED || dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) { delta_flag = 0; inited = 1; } @@ -196,7 +196,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s ctx->rabin_poly_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE; ctx->current_window_data = NULL; - ctx->fixed_flag = fixed_flag; + ctx->dedupe_flag = dedupe_flag; ctx->rabin_break_patt = 0; ctx->rabin_poly_avg_block_size = RAB_BLK_AVG_SZ(rab_blk_sz); ctx->rabin_avg_block_mask = RAB_BLK_MASK; @@ -220,7 +220,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s ctx->delta_flag = 2; } - if (!fixed_flag) + if (dedupe_flag != RABIN_DEDUPE_FIXED) ctx->blknum = chunksize / ctx->rabin_poly_min_block_size; else ctx->blknum = chunksize / ctx->rabin_poly_avg_block_size; @@ -330,7 +330,7 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of if (*size < ctx->rabin_poly_avg_block_size) return (0); DEBUG_STAT_EN(strt = get_wtime_millis()); - if (ctx->fixed_flag) { + if (ctx->dedupe_flag == RABIN_DEDUPE_FIXED) { blknum = *size / ctx->rabin_poly_avg_block_size; j = *size % ctx->rabin_poly_avg_block_size; if (j) diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index 9d0331c..b2c4e9f 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -118,6 +118,10 @@ #define GET_SIMILARITY_FLAG SET_SIMILARITY_FLAG #define CLEAR_SIMILARITY_FLAG (0xBFFFFFFFUL) +#define RABIN_DEDUPE_SEGMENTED 0 +#define RABIN_DEDUPE_FIXED 1 +#define RABIN_DEDUPE_FILE_GLOBAL 2 + // Mask to extract value from a rabin index entry #define RABIN_INDEX_VALUE (0x3FFFFFFFUL) @@ -171,7 +175,7 @@ typedef struct { uint64_t real_chunksize; short valid; void *lzma_data; - int level, delta_flag, fixed_flag, deltac_min_distance; + int level, delta_flag, dedupe_flag, deltac_min_distance; } dedupe_context_t; extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, diff --git a/utils/utils.c b/utils/utils.c index 483641d..457728e 100644 --- a/utils/utils.c +++ b/utils/utils.c @@ -41,6 +41,8 @@ #include #include +#include + #define _IN_UTILS_ #include "utils.h" @@ -367,3 +369,16 @@ get_mb_s(uint64_t bytes, double strt, double en) bytes_sec = ((double)bytes / (en - strt)) * 1000; return (BYTES_TO_MB(bytes_sec)); } + +void +get_sysinfo(my_sysinfo *msys_info) +{ + struct sysinfo sys_info; + sysinfo(&sys_info); + + msys_info->totalram = sys_info.totalram; + msys_info->freeram = sys_info.freeram; + msys_info->totalswap = sys_info.totalswap; + msys_info->freeswap = sys_info.freeswap; + msys_info->mem_unit = sys_info.mem_unit; +} diff --git a/utils/utils.h b/utils/utils.h index 3fdabb0..2485609 100644 --- a/utils/utils.h +++ b/utils/utils.h @@ -134,6 +134,28 @@ typedef int32_t bsize_t; #define BYTES_TO_MB(x) ((x) / (1024 * 1024)) +/* + * Public checksum properties. CKSUM_MAX_BYTES must be updated if a + * newer larger checksum is added to the list. + */ +typedef enum { + CKSUM_CRC64 = 0x100, + CKSUM_BLAKE256 = 0x200, + CKSUM_BLAKE512 = 0x300, + CKSUM_SHA256 = 0x400, + CKSUM_SHA512 = 0x500, + CKSUM_KECCAK256 = 0x600, + CKSUM_KECCAK512 = 0x700, +/* + * Backwards compatibility options. SKEIN in release 1.2 was replaced with + * Blake2 from 1.3 onwards (for sheer speed of Blake2). We want to be able + * to decode archives created with 1.2. New archives do not use SKEIN. + */ + CKSUM_SKEIN256 = 0x800, + CKSUM_SKEIN512 = 0x900, + CKSUM_INVALID = 0 +} cksum_t; + typedef enum { COMPRESS_NONE = 0, COMPRESS_LZFX, @@ -155,6 +177,7 @@ typedef struct { int d_max_threads; int delta2_span; int deltac_min_distance; + cksum_t cksum; } algo_props_t; typedef enum { @@ -162,6 +185,14 @@ typedef enum { DECOMPRESS_THREADS } algo_threads_type_t; +typedef struct{ + int64_t totalram; + int64_t freeram; + int64_t totalswap; + int64_t freeswap; + int64_t mem_unit; +} my_sysinfo; + #ifndef _IN_UTILS_ extern processor_info_t proc_info; #endif @@ -179,6 +210,7 @@ extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs, extern uint64_t get_total_ram(); extern double get_wtime_millis(void); extern double get_mb_s(uint64_t bytes, double strt, double en); +extern void get_sysinfo(my_sysinfo *msys_info); extern void init_algo_props(algo_props_t *props); extern void init_pcompress();