Work in progress changes for Segmented Global Deduplication.

This commit is contained in:
Moinak Ghosh 2013-04-14 23:51:54 +05:30
parent 50251107de
commit a22b52cf08
5 changed files with 209 additions and 21 deletions

View file

@ -51,6 +51,12 @@ typedef enum {
MODE_ARCHIVE MODE_ARCHIVE
} dedupe_mode_t; } dedupe_mode_t;
struct seg_map_fd {
int fd;
void *mapping;
uint32_t len;
};
typedef struct { typedef struct {
char rootdir[PATH_MAX+1]; char rootdir[PATH_MAX+1];
uint32_t chunk_sz; // Numeric ID: 1 - 4k ... 5 - 64k uint32_t chunk_sz; // Numeric ID: 1 - 4k ... 5 - 64k
@ -65,6 +71,7 @@ typedef struct {
int pct_interval; // Similarity based match intervals in %age. int pct_interval; // Similarity based match intervals in %age.
// The items below are computed given the above // The items below are computed given the above
// components. // components.
int intervals;
dedupe_mode_t dedupe_mode; dedupe_mode_t dedupe_mode;
uint32_t chunk_sz_bytes; // Average chunk size uint32_t chunk_sz_bytes; // Average chunk size
@ -76,7 +83,8 @@ typedef struct {
int num_containers; // Number of containers in a directory int num_containers; // Number of containers in a directory
int nthreads; // Number of threads processing data segments in parallel int nthreads; // Number of threads processing data segments in parallel
int seg_fd_w; int seg_fd_w;
int *seg_fd_r; // One read-only fd per thread for mapping in portions of the uint32_t pagesize;
struct seg_map_fd *seg_fd_r; // One read-only fd per thread for mapping in portions of the
// segment metadata cache. // segment metadata cache.
int valid; int valid;
void *dbdata; void *dbdata;

View file

@ -35,6 +35,7 @@
#include <allocator.h> #include <allocator.h>
#include <pthread.h> #include <pthread.h>
#include <xxhash.h> #include <xxhash.h>
#include <sys/mman.h>
#include "index.h" #include "index.h"
@ -74,14 +75,24 @@ init_global_db(char *configfile)
void void
static cleanup_indx(index_t *indx) static cleanup_indx(index_t *indx)
{ {
int i; int i, j;
if (indx) { if (indx) {
if (indx->list) { if (indx->list) {
for (i = 0; i < indx->intervals; i++) { for (i = 0; i < indx->intervals; i++) {
if (indx->list[i].tab) if (indx->list[i].tab) {
for (j=0; j<indx->hash_slots; j++) {
hash_entry_t *he, *nxt;
he = indx->list[i].tab[j];
while (he) {
nxt = he->next;
free(he);
he = nxt;
}
}
free(indx->list[i].tab); free(indx->list[i].tab);
} }
}
free(indx->list); free(indx->list);
} }
free(indx); free(indx);
@ -155,7 +166,7 @@ set_cfg:
*hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size); *hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size);
*pct_interval = 0; *pct_interval = 0;
} else { } else {
*intervals = 100 / *pct_interval - 1; *intervals = 90 / *pct_interval - 1;
*hash_slots = file_sz / cfg->segment_sz_bytes + 1; *hash_slots = file_sz / cfg->segment_sz_bytes + 1;
*hash_slots *= *intervals; *hash_slots *= *intervals;
} }
@ -164,7 +175,7 @@ set_cfg:
// occupancy. // occupancy.
*memreqd = MEM_REQD(*hash_slots, *hash_entry_size); *memreqd = MEM_REQD(*hash_slots, *hash_entry_size);
if (*memreqd > (memlimit + (memlimit >> 2)) && cfg->dedupe_mode == MODE_SIMPLE && if (*memreqd > (memlimit + (memlimit >> 1)) && cfg->dedupe_mode == MODE_SIMPLE &&
*pct_interval == 0) { *pct_interval == 0) {
*pct_interval = DEFAULT_PCT_INTERVAL; *pct_interval = DEFAULT_PCT_INTERVAL;
set_user = 1; set_user = 1;
@ -193,7 +204,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
orig_pct = pct_interval; orig_pct = pct_interval;
cfg = calloc(1, sizeof (archive_config_t)); cfg = calloc(1, sizeof (archive_config_t));
diff = (float)pct_interval / 100.0; diff = (float)pct_interval / 90.0;
rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim, rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit); file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit);
@ -207,8 +218,10 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
memreqd = hash_slots * MEM_PER_UNIT(hash_entry_size); memreqd = hash_slots * MEM_PER_UNIT(hash_entry_size);
} }
// Now create as many hash tables as there are similarity match intervals /*
// each having hash_slots / intervals slots. * Now create as many hash tables as there are similarity match intervals
* each having hash_slots / intervals slots.
*/
indx = calloc(1, sizeof (index_t)); indx = calloc(1, sizeof (index_t));
if (!indx) { if (!indx) {
free(cfg); free(cfg);
@ -221,6 +234,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
indx->intervals = intervals; indx->intervals = intervals;
indx->hash_slots = hash_slots / intervals; indx->hash_slots = hash_slots / intervals;
cfg->nthreads = nthreads; cfg->nthreads = nthreads;
cfg->intervals = intervals;
for (i = 0; i < intervals; i++) { for (i = 0; i < intervals; i++) {
indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals, indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals,
@ -237,7 +251,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
strcpy(cfg->rootdir, tmppath); strcpy(cfg->rootdir, tmppath);
strcat(cfg->rootdir, "/.segXXXXXX"); strcat(cfg->rootdir, "/.segXXXXXX");
cfg->seg_fd_w = mkstemp(cfg->rootdir); cfg->seg_fd_w = mkstemp(cfg->rootdir);
cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads); cfg->seg_fd_r = (struct seg_map_fd *)malloc(sizeof (struct seg_map_fd) * nthreads);
if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) { if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) {
cleanup_indx(indx); cleanup_indx(indx);
if (cfg->seg_fd_r) if (cfg->seg_fd_r)
@ -246,13 +260,109 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
return (NULL); return (NULL);
} }
for (i = 0; i < nthreads; i++) { for (i = 0; i < nthreads; i++) {
cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY); cfg->seg_fd_r[i].fd = open(cfg->rootdir, O_RDONLY);
cfg->seg_fd_r[i].mapping = NULL;
} }
} }
cfg->dbdata = indx; cfg->dbdata = indx;
return (cfg); return (cfg);
} }
/*
* Functions to handle segment metadata cache for segmented similarity based deduplication.
* These functions are not thread-safe by design. The caller must ensure thread safety.
*/
/*
* Add new segment block list array into the metadata cache. Once added the entry is
* not removed till the program exits.
*/
int
db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum,
uint64_t file_offset)
{
int64_t w;
uchar_t *hdr[16];
*((uint32_t *)hdr) = len;
*((uint32_t *)(hdr + 4)) = blknum;
*((uint32_t *)(hdr + 8)) = file_offset;
w = Write(cfg->seg_fd_w, hdr, sizeof (hdr));
if (w < sizeof (hdr))
return (-1);
w = Write(cfg->seg_fd_w, buf, len);
if (w < len)
return (-1);
return (0);
}
/*
* Get the current file pointer position of the metadata file. This indicates the
* position where the next entry will be added.
*/
int
db_segcache_pos(archive_config_t *cfg, int tid)
{
return (lseek(cfg->seg_fd_w, 0, SEEK_CUR));
}
/*
* Mmap the requested segment metadata array.
*/
int
db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks)
{
uchar_t *mapbuf;
uchar_t *hdr[16];
int64_t r;
int fd;
uint32_t len, adj;
/*
* Ensure previous mapping is removed.
*/
db_segcache_unmap(cfg, tid);
fd = cfg->seg_fd_r[tid].fd;
if (lseek(fd, *offset, SEEK_SET) != *offset)
return (-1);
/*
* Read header first so that we know how much to map.
*/
r = Read(fd, hdr, sizeof (hdr));
if (r < sizeof (hdr))
return (-1);
*offset += sizeof (hdr);
len = *((uint32_t *)hdr);
adj = *offset % cfg->pagesize;
*blknum = *((uint32_t *)(hdr + 4));
mapbuf = mmap(NULL, len + adj, PROT_READ, MAP_SHARED, fd, *offset - adj);
if (mapbuf == MAP_FAILED)
return (-1);
*offset = *((uint32_t *)(hdr + 8));
*blocks = mapbuf + adj;
cfg->seg_fd_r[tid].mapping = mapbuf;
cfg->seg_fd_r[tid].len = len + adj;
return (0);
}
/*
* Remove the metadata mapping.
*/
int
db_segcache_unmap(archive_config_t *cfg, int tid)
{
if (cfg->seg_fd_r[tid].mapping) {
munmap(cfg->seg_fd_r[tid].mapping, cfg->seg_fd_r[tid].len);
cfg->seg_fd_r[tid].mapping = NULL;
}
return (0);
}
static inline int static inline int
mycmp(uchar_t *a, uchar_t *b, int sz) mycmp(uchar_t *a, uchar_t *b, int sz)
{ {
@ -296,7 +406,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
pent = &(htab[htab_entry]); pent = &(htab[htab_entry]);
ent = htab[htab_entry]; ent = htab[htab_entry];
if (cfg->pct_interval == 0) { // Single file global dedupe case if (cfg->pct_interval == 0) { // Global dedupe with simple index
while (ent) { while (ent) {
if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0 && if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0 &&
ent->item_size == item_size) { ent->item_size == item_size) {
@ -340,7 +450,7 @@ destroy_global_db_s(archive_config_t *cfg)
cleanup_indx(indx); cleanup_indx(indx);
if (cfg->pct_interval > 0) { if (cfg->pct_interval > 0) {
for (i = 0; i < cfg->nthreads; i++) { for (i = 0; i < cfg->nthreads; i++) {
close(cfg->seg_fd_r[i]); close(cfg->seg_fd_r[i].fd);
} }
free(cfg->seg_fd_r); free(cfg->seg_fd_r);
close(cfg->seg_fd_w); close(cfg->seg_fd_w);

View file

@ -54,6 +54,11 @@ hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int
uint64_t item_offset, uint32_t item_size, int do_insert); uint64_t item_offset, uint32_t item_size, int do_insert);
void destroy_global_db_s(archive_config_t *cfg); void destroy_global_db_s(archive_config_t *cfg);
int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset);
int db_segcache_pos(archive_config_t *cfg, int tid);
int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks);
int db_segcache_unmap(archive_config_t *cfg, int tid);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View file

@ -75,6 +75,7 @@
#include <pthread.h> #include <pthread.h>
#include <heapq.h> #include <heapq.h>
#include <xxhash.h> #include <xxhash.h>
#include <blake2_digest.h>
#include "rabin_dedup.h" #include "rabin_dedup.h"
#if defined(__USE_SSE_INTRIN__) && defined(__SSE4_1__) && RAB_POLYNOMIAL_WIN_SIZE == 16 #if defined(__USE_SSE_INTRIN__) && defined(__SSE4_1__) && RAB_POLYNOMIAL_WIN_SIZE == 16
@ -107,6 +108,7 @@ static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER;
uint64_t ir[256], out[256]; uint64_t ir[256], out[256];
static int inited = 0; static int inited = 0;
archive_config_t *arc = NULL; archive_config_t *arc = NULL;
static struct blake2_dispatch bdsp;
static uint32_t static uint32_t
dedupe_min_blksz(int rab_blk_sz) dedupe_min_blksz(int rab_blk_sz)
@ -215,6 +217,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
pthread_mutex_unlock(&init_lock); pthread_mutex_unlock(&init_lock);
return (NULL); return (NULL);
} }
blake2_module_init(&bdsp, &proc_info);
} }
inited = 1; inited = 1;
} }
@ -253,6 +256,8 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
ctx->deltac_min_distance = props->deltac_min_distance; ctx->deltac_min_distance = props->deltac_min_distance;
ctx->pagesize = sysconf(_SC_PAGE_SIZE); ctx->pagesize = sysconf(_SC_PAGE_SIZE);
ctx->similarity_cksums = NULL; ctx->similarity_cksums = NULL;
if (arc)
arc->pagesize = ctx->pagesize;
/* /*
* Scale down similarity percentage based on avg block size unless user specified * Scale down similarity percentage based on avg block size unless user specified
@ -302,14 +307,13 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
} }
if (arc && dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) { if (arc && dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) {
ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, 90 / arc->pct_interval, 32); ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, arc->intervals, 32);
if (!ctx->similarity_cksums) { if (!ctx->similarity_cksums) {
fprintf(stderr, fprintf(stderr,
"Could not allocate dedupe context, out of memory\n"); "Could not allocate dedupe context, out of memory\n");
destroy_dedupe_context(ctx); destroy_dedupe_context(ctx);
return (NULL); return (NULL);
} }
ctx->similarity_count = 90 / arc->pct_interval;
} }
ctx->lzma_data = NULL; ctx->lzma_data = NULL;
@ -776,7 +780,11 @@ process_blocks:
} else { } else {
uchar_t *seg_heap, *sim_ck; uchar_t *seg_heap, *sim_ck;
archive_config_t *cfg; archive_config_t *cfg;
uint32_t increment, len; uint32_t increment, len, mapped_blks;
global_blockentry_t *seg_blocks;
uint64_t seg_offset, offset;
int written;
global_blockentry_t **htab;
/* /*
* This code block implements Segmented similarity based Dedupe with * This code block implements Segmented similarity based Dedupe with
@ -784,7 +792,10 @@ process_blocks:
*/ */
cfg = ctx->arc; cfg = ctx->arc;
seg_heap = (uchar_t *)ctx->g_blocks - cfg->segment_sz_bytes; seg_heap = (uchar_t *)ctx->g_blocks - cfg->segment_sz_bytes;
ary_sz = cfg->segment_sz * sizeof (global_blockentry_t **);
htab = (global_blockentry_t **)(seg_heap - ary_sz);
for (i=0; i<blknum; i += cfg->segment_sz) { for (i=0; i<blknum; i += cfg->segment_sz) {
blake2b_state S1, S2;
length = 0; length = 0;
/* /*
@ -797,19 +808,71 @@ process_blocks:
* Compute the cumulative similarity minhashes. * Compute the cumulative similarity minhashes.
*/ */
sim_ck = ctx->similarity_cksums; sim_ck = ctx->similarity_cksums;
increment = length / ctx->similarity_count; increment = length / cfg->intervals;
len = increment; len = increment;
src = buf1 + ctx->g_blocks[i].offset; src = buf1 + ctx->g_blocks[i].offset;
tgt = seg_heap; tgt = seg_heap;
memcpy(tgt, src, length); memcpy(tgt, src, length);
for (j=0; j<ctx->similarity_count; j++) { bdsp.blake2b_init(&S1, 32);
for (j=0; j<cfg->intervals; j++) {
reset_heap(&heap, len/8); reset_heap(&heap, len/8);
ksmallest((int64_t *)seg_heap, length, &heap); ksmallest((int64_t *)seg_heap, length, &heap);
compute_checksum(sim_ck, CKSUM_BLAKE256, seg_heap, len, 0, 0); bdsp.blake2b_update(&S1, seg_heap + len - increment, increment);
memcpy(&S2, &S1, sizeof (S1));
bdsp.blake2b_final(&S2, sim_ck, 32);
len += increment; len += increment;
sim_ck += 32; sim_ck += 32;
} }
/*
* Now lookup the similarity minhashes starting at the highest
* significance level.
*/
sim_ck -= 32;
written = 0;
increment = cfg->intervals * cfg->pct_interval;
sem_wait(ctx->index_sem);
seg_offset = db_segcache_pos(cfg, ctx->id);
for (j=cfg->intervals; j > 0; j--) {
hash_entry_t *he;
he = db_lookup_insert_s(cfg, sim_ck, j-1, seg_offset, 0, 1);
if (he) {
/*
* Match found. Load segment metadata from disk and perform
* identity deduplication with the segment chunks.
*/
memset(htab, 0, ary_sz);
offset = he->item_offset;
if (db_segcache_map(cfg, ctx->id, &mapped_blks, &offset,
(uchar_t **)&seg_blocks) == -1) {
fprintf(stderr, "Segment cache mmap failed.\n");
ctx->valid = 0;
return (0);
} }
if (increment > 70) {
j = cfg->intervals - j;
}
} else if (!written) {
if (blknum - i >= cfg->segment_sz) {
db_segcache_write(cfg, ctx->id, src, len, cfg->segment_sz,
ctx->file_offset);
} else {
db_segcache_write(cfg, ctx->id, src, len, blknum-i,
ctx->file_offset);
}
written = 1;
}
increment -= cfg->pct_interval;
sim_ck -= 32;
}
}
/*
* Signal the next thread in sequence to access the index.
*/
sem_post(ctx->index_sem_next);
} }
/* /*

View file

@ -163,11 +163,14 @@ typedef struct rab_blockentry {
struct rab_blockentry *next; struct rab_blockentry *next;
} rabin_blockentry_t; } rabin_blockentry_t;
#pragma pack(1)
typedef struct global_blockentry { typedef struct global_blockentry {
uint32_t length; uint32_t length;
uint64_t offset; uint64_t offset;
struct global_blockentry *next; // Reqd when part of a hashtable
uchar_t cksum[CKSUM_MAX_BYTES]; uchar_t cksum[CKSUM_MAX_BYTES];
} global_blockentry_t; } global_blockentry_t;
#pragma pack()
typedef struct { typedef struct {
unsigned char *current_window_data; unsigned char *current_window_data;
@ -189,7 +192,6 @@ typedef struct {
sem_t *index_sem; sem_t *index_sem;
sem_t *index_sem_next; sem_t *index_sem_next;
uchar_t *similarity_cksums; uchar_t *similarity_cksums;
int similarity_count;
uint32_t pagesize; uint32_t pagesize;
int out_fd; int out_fd;
int id; int id;