diff --git a/rabin/global/dedupe_config.h b/rabin/global/dedupe_config.h index d75cb55..bd3e153 100644 --- a/rabin/global/dedupe_config.h +++ b/rabin/global/dedupe_config.h @@ -51,6 +51,12 @@ typedef enum { MODE_ARCHIVE } dedupe_mode_t; +struct seg_map_fd { + int fd; + void *mapping; + uint32_t len; +}; + typedef struct { char rootdir[PATH_MAX+1]; uint32_t chunk_sz; // Numeric ID: 1 - 4k ... 5 - 64k @@ -65,6 +71,7 @@ typedef struct { int pct_interval; // Similarity based match intervals in %age. // The items below are computed given the above // components. + int intervals; dedupe_mode_t dedupe_mode; uint32_t chunk_sz_bytes; // Average chunk size @@ -76,7 +83,8 @@ typedef struct { int num_containers; // Number of containers in a directory int nthreads; // Number of threads processing data segments in parallel int seg_fd_w; - int *seg_fd_r; // One read-only fd per thread for mapping in portions of the + uint32_t pagesize; + struct seg_map_fd *seg_fd_r; // One read-only fd per thread for mapping in portions of the // segment metadata cache. int valid; void *dbdata; diff --git a/rabin/global/index.c b/rabin/global/index.c index 3333694..be62a41 100644 --- a/rabin/global/index.c +++ b/rabin/global/index.c @@ -35,6 +35,7 @@ #include #include #include +#include #include "index.h" @@ -74,13 +75,23 @@ init_global_db(char *configfile) void static cleanup_indx(index_t *indx) { - int i; + int i, j; if (indx) { if (indx->list) { for (i = 0; i < indx->intervals; i++) { - if (indx->list[i].tab) + if (indx->list[i].tab) { + for (j=0; jhash_slots; j++) { + hash_entry_t *he, *nxt; + he = indx->list[i].tab[j]; + while (he) { + nxt = he->next; + free(he); + he = nxt; + } + } free(indx->list[i].tab); + } } free(indx->list); } @@ -155,7 +166,7 @@ set_cfg: *hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size); *pct_interval = 0; } else { - *intervals = 100 / *pct_interval - 1; + *intervals = 90 / *pct_interval - 1; *hash_slots = file_sz / cfg->segment_sz_bytes + 1; *hash_slots *= *intervals; } @@ -164,7 +175,7 @@ set_cfg: // occupancy. *memreqd = MEM_REQD(*hash_slots, *hash_entry_size); - if (*memreqd > (memlimit + (memlimit >> 2)) && cfg->dedupe_mode == MODE_SIMPLE && + if (*memreqd > (memlimit + (memlimit >> 1)) && cfg->dedupe_mode == MODE_SIMPLE && *pct_interval == 0) { *pct_interval = DEFAULT_PCT_INTERVAL; set_user = 1; @@ -192,8 +203,8 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch } orig_pct = pct_interval; cfg = calloc(1, sizeof (archive_config_t)); - - diff = (float)pct_interval / 100.0; + + diff = (float)pct_interval / 90.0; rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim, file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit); @@ -207,8 +218,10 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch memreqd = hash_slots * MEM_PER_UNIT(hash_entry_size); } - // Now create as many hash tables as there are similarity match intervals - // each having hash_slots / intervals slots. + /* + * Now create as many hash tables as there are similarity match intervals + * each having hash_slots / intervals slots. + */ indx = calloc(1, sizeof (index_t)); if (!indx) { free(cfg); @@ -221,6 +234,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch indx->intervals = intervals; indx->hash_slots = hash_slots / intervals; cfg->nthreads = nthreads; + cfg->intervals = intervals; for (i = 0; i < intervals; i++) { indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals, @@ -237,7 +251,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch strcpy(cfg->rootdir, tmppath); strcat(cfg->rootdir, "/.segXXXXXX"); cfg->seg_fd_w = mkstemp(cfg->rootdir); - cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads); + cfg->seg_fd_r = (struct seg_map_fd *)malloc(sizeof (struct seg_map_fd) * nthreads); if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) { cleanup_indx(indx); if (cfg->seg_fd_r) @@ -245,14 +259,110 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch free(cfg); return (NULL); } - for (i = 0; i < nthreads; i++) { - cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY); + for (i = 0; i < nthreads; i++) { + cfg->seg_fd_r[i].fd = open(cfg->rootdir, O_RDONLY); + cfg->seg_fd_r[i].mapping = NULL; } } cfg->dbdata = indx; return (cfg); } +/* + * Functions to handle segment metadata cache for segmented similarity based deduplication. + * These functions are not thread-safe by design. The caller must ensure thread safety. + */ + +/* + * Add new segment block list array into the metadata cache. Once added the entry is + * not removed till the program exits. + */ +int +db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, + uint64_t file_offset) +{ + int64_t w; + uchar_t *hdr[16]; + + *((uint32_t *)hdr) = len; + *((uint32_t *)(hdr + 4)) = blknum; + *((uint32_t *)(hdr + 8)) = file_offset; + + w = Write(cfg->seg_fd_w, hdr, sizeof (hdr)); + if (w < sizeof (hdr)) + return (-1); + w = Write(cfg->seg_fd_w, buf, len); + if (w < len) + return (-1); + return (0); +} + +/* + * Get the current file pointer position of the metadata file. This indicates the + * position where the next entry will be added. + */ +int +db_segcache_pos(archive_config_t *cfg, int tid) +{ + return (lseek(cfg->seg_fd_w, 0, SEEK_CUR)); +} + +/* + * Mmap the requested segment metadata array. + */ +int +db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks) +{ + uchar_t *mapbuf; + uchar_t *hdr[16]; + int64_t r; + int fd; + uint32_t len, adj; + + /* + * Ensure previous mapping is removed. + */ + db_segcache_unmap(cfg, tid); + fd = cfg->seg_fd_r[tid].fd; + if (lseek(fd, *offset, SEEK_SET) != *offset) + return (-1); + + /* + * Read header first so that we know how much to map. + */ + r = Read(fd, hdr, sizeof (hdr)); + if (r < sizeof (hdr)) + return (-1); + + *offset += sizeof (hdr); + len = *((uint32_t *)hdr); + adj = *offset % cfg->pagesize; + *blknum = *((uint32_t *)(hdr + 4)); + mapbuf = mmap(NULL, len + adj, PROT_READ, MAP_SHARED, fd, *offset - adj); + + if (mapbuf == MAP_FAILED) + return (-1); + *offset = *((uint32_t *)(hdr + 8)); + *blocks = mapbuf + adj; + cfg->seg_fd_r[tid].mapping = mapbuf; + cfg->seg_fd_r[tid].len = len + adj; + + return (0); +} + +/* + * Remove the metadata mapping. + */ +int +db_segcache_unmap(archive_config_t *cfg, int tid) +{ + if (cfg->seg_fd_r[tid].mapping) { + munmap(cfg->seg_fd_r[tid].mapping, cfg->seg_fd_r[tid].len); + cfg->seg_fd_r[tid].mapping = NULL; + } + return (0); +} + static inline int mycmp(uchar_t *a, uchar_t *b, int sz) { @@ -296,7 +406,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, pent = &(htab[htab_entry]); ent = htab[htab_entry]; - if (cfg->pct_interval == 0) { // Single file global dedupe case + if (cfg->pct_interval == 0) { // Global dedupe with simple index while (ent) { if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0 && ent->item_size == item_size) { @@ -340,7 +450,7 @@ destroy_global_db_s(archive_config_t *cfg) cleanup_indx(indx); if (cfg->pct_interval > 0) { for (i = 0; i < cfg->nthreads; i++) { - close(cfg->seg_fd_r[i]); + close(cfg->seg_fd_r[i].fd); } free(cfg->seg_fd_r); close(cfg->seg_fd_w); diff --git a/rabin/global/index.h b/rabin/global/index.h index efc9355..83c4862 100644 --- a/rabin/global/index.h +++ b/rabin/global/index.h @@ -54,6 +54,11 @@ hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int uint64_t item_offset, uint32_t item_size, int do_insert); void destroy_global_db_s(archive_config_t *cfg); +int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset); +int db_segcache_pos(archive_config_t *cfg, int tid); +int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks); +int db_segcache_unmap(archive_config_t *cfg, int tid); + #ifdef __cplusplus } #endif diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 56b399e..0ccccbe 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -75,6 +75,7 @@ #include #include #include +#include #include "rabin_dedup.h" #if defined(__USE_SSE_INTRIN__) && defined(__SSE4_1__) && RAB_POLYNOMIAL_WIN_SIZE == 16 @@ -107,6 +108,7 @@ static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; uint64_t ir[256], out[256]; static int inited = 0; archive_config_t *arc = NULL; +static struct blake2_dispatch bdsp; static uint32_t dedupe_min_blksz(int rab_blk_sz) @@ -215,6 +217,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s pthread_mutex_unlock(&init_lock); return (NULL); } + blake2_module_init(&bdsp, &proc_info); } inited = 1; } @@ -253,6 +256,8 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s ctx->deltac_min_distance = props->deltac_min_distance; ctx->pagesize = sysconf(_SC_PAGE_SIZE); ctx->similarity_cksums = NULL; + if (arc) + arc->pagesize = ctx->pagesize; /* * Scale down similarity percentage based on avg block size unless user specified @@ -302,14 +307,13 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s } if (arc && dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) { - ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, 90 / arc->pct_interval, 32); + ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, arc->intervals, 32); if (!ctx->similarity_cksums) { fprintf(stderr, "Could not allocate dedupe context, out of memory\n"); destroy_dedupe_context(ctx); return (NULL); } - ctx->similarity_count = 90 / arc->pct_interval; } ctx->lzma_data = NULL; @@ -776,7 +780,11 @@ process_blocks: } else { uchar_t *seg_heap, *sim_ck; archive_config_t *cfg; - uint32_t increment, len; + uint32_t increment, len, mapped_blks; + global_blockentry_t *seg_blocks; + uint64_t seg_offset, offset; + int written; + global_blockentry_t **htab; /* * This code block implements Segmented similarity based Dedupe with @@ -784,7 +792,10 @@ process_blocks: */ cfg = ctx->arc; seg_heap = (uchar_t *)ctx->g_blocks - cfg->segment_sz_bytes; + ary_sz = cfg->segment_sz * sizeof (global_blockentry_t **); + htab = (global_blockentry_t **)(seg_heap - ary_sz); for (i=0; isegment_sz) { + blake2b_state S1, S2; length = 0; /* @@ -797,19 +808,71 @@ process_blocks: * Compute the cumulative similarity minhashes. */ sim_ck = ctx->similarity_cksums; - increment = length / ctx->similarity_count; + increment = length / cfg->intervals; len = increment; src = buf1 + ctx->g_blocks[i].offset; tgt = seg_heap; memcpy(tgt, src, length); - for (j=0; jsimilarity_count; j++) { + bdsp.blake2b_init(&S1, 32); + for (j=0; jintervals; j++) { reset_heap(&heap, len/8); ksmallest((int64_t *)seg_heap, length, &heap); - compute_checksum(sim_ck, CKSUM_BLAKE256, seg_heap, len, 0, 0); + bdsp.blake2b_update(&S1, seg_heap + len - increment, increment); + memcpy(&S2, &S1, sizeof (S1)); + bdsp.blake2b_final(&S2, sim_ck, 32); len += increment; sim_ck += 32; } + + /* + * Now lookup the similarity minhashes starting at the highest + * significance level. + */ + sim_ck -= 32; + written = 0; + increment = cfg->intervals * cfg->pct_interval; + sem_wait(ctx->index_sem); + seg_offset = db_segcache_pos(cfg, ctx->id); + for (j=cfg->intervals; j > 0; j--) { + hash_entry_t *he; + + he = db_lookup_insert_s(cfg, sim_ck, j-1, seg_offset, 0, 1); + if (he) { + /* + * Match found. Load segment metadata from disk and perform + * identity deduplication with the segment chunks. + */ + memset(htab, 0, ary_sz); + offset = he->item_offset; + if (db_segcache_map(cfg, ctx->id, &mapped_blks, &offset, + (uchar_t **)&seg_blocks) == -1) { + fprintf(stderr, "Segment cache mmap failed.\n"); + ctx->valid = 0; + return (0); + } + + if (increment > 70) { + j = cfg->intervals - j; + } + } else if (!written) { + if (blknum - i >= cfg->segment_sz) { + db_segcache_write(cfg, ctx->id, src, len, cfg->segment_sz, + ctx->file_offset); + } else { + db_segcache_write(cfg, ctx->id, src, len, blknum-i, + ctx->file_offset); + } + written = 1; + } + increment -= cfg->pct_interval; + sim_ck -= 32; + } } + + /* + * Signal the next thread in sequence to access the index. + */ + sem_post(ctx->index_sem_next); } /* diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index d109771..ae10534 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -163,11 +163,14 @@ typedef struct rab_blockentry { struct rab_blockentry *next; } rabin_blockentry_t; +#pragma pack(1) typedef struct global_blockentry { uint32_t length; uint64_t offset; + struct global_blockentry *next; // Reqd when part of a hashtable uchar_t cksum[CKSUM_MAX_BYTES]; } global_blockentry_t; +#pragma pack() typedef struct { unsigned char *current_window_data; @@ -189,7 +192,6 @@ typedef struct { sem_t *index_sem; sem_t *index_sem_next; uchar_t *similarity_cksums; - int similarity_count; uint32_t pagesize; int out_fd; int id;