Work in progress changes for scalable segmented global deduplication.

Allow user-specified environment setting to control in-memory index size.
This commit is contained in:
Moinak Ghosh 2013-04-06 15:15:27 +05:30
parent c357452079
commit 3d7a179a77
6 changed files with 230 additions and 115 deletions

65
main.c
View file

@ -677,7 +677,7 @@ start_decompress(const char *filename, const char *to_filename)
int uncompfd = -1, err, np, bail; int uncompfd = -1, err, np, bail;
int nprocs = 1, thread = 0, level; int nprocs = 1, thread = 0, level;
unsigned int i; unsigned int i;
short version, flags; unsigned short version, flags;
int64_t chunksize, compressed_chunksize; int64_t chunksize, compressed_chunksize;
struct cmp_data **dary, *tdat; struct cmp_data **dary, *tdat;
pthread_t writer_thr; pthread_t writer_thr;
@ -1644,7 +1644,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
char to_filename[MAXPATHLEN]; char to_filename[MAXPATHLEN];
uint64_t compressed_chunksize, n_chunksize, file_offset; uint64_t compressed_chunksize, n_chunksize, file_offset;
int64_t rbytes, rabin_count; int64_t rbytes, rabin_count;
short version, flags; unsigned short version, flags;
struct stat sbuf; struct stat sbuf;
int compfd = -1, uncompfd = -1, err; int compfd = -1, uncompfd = -1, err;
int thread, bail, single_chunk; int thread, bail, single_chunk;
@ -1655,21 +1655,9 @@ start_compress(const char *filename, uint64_t chunksize, int level)
dedupe_context_t *rctx; dedupe_context_t *rctx;
algo_props_t props; algo_props_t props;
/*
* Compressed buffer size must include zlib/dedup scratch space and
* chunk header space.
* See http://www.zlib.net/manual.html#compress2
*
* We do this unconditionally whether user mentioned zlib or not
* to keep it simple. While zlib scratch space is only needed at
* runtime, chunk header is stored in the file.
*
* See start_decompress() routine for details of chunk header.
* We also keep extra 8-byte space for the last chunk's size.
*/
compressed_chunksize = chunksize + CHUNK_HDR_SZ + zlib_buf_extra(chunksize);
init_algo_props(&props); init_algo_props(&props);
props.cksum = cksum; props.cksum = cksum;
props.buf_extra = 0;
cread_buf = NULL; cread_buf = NULL;
if (_props_func) { if (_props_func) {
@ -1757,9 +1745,6 @@ start_compress(const char *filename, uint64_t chunksize, int level)
thread = 0; thread = 0;
single_chunk = 0; single_chunk = 0;
rctx = NULL; rctx = NULL;
slab_cache_add(chunksize);
slab_cache_add(compressed_chunksize);
slab_cache_add(sizeof (struct cmp_data));
nprocs = sysconf(_SC_NPROCESSORS_ONLN); nprocs = sysconf(_SC_NPROCESSORS_ONLN);
if (nthreads > 0 && nthreads < nprocs) if (nthreads > 0 && nthreads < nprocs)
@ -1798,6 +1783,18 @@ start_compress(const char *filename, uint64_t chunksize, int level)
single_chunk = 1; single_chunk = 1;
props.is_single_chunk = 1; props.is_single_chunk = 1;
flags |= FLAG_SINGLE_CHUNK; flags |= FLAG_SINGLE_CHUNK;
/*
* Switch to simple Deduplication if global is enabled.
*/
if (enable_rabin_global) {
unsigned short flg;
enable_rabin_scan = 1;
enable_rabin_global = 0;
dedupe_flag = RABIN_DEDUPE_SEGMENTED;
flg = FLAG_DEDUP_FIXED;
flags &= ~flg;
}
} else { } else {
if (nthreads == 0 || nthreads > sbuf.st_size / chunksize) { if (nthreads == 0 || nthreads > sbuf.st_size / chunksize) {
nthreads = sbuf.st_size / chunksize; nthreads = sbuf.st_size / chunksize;
@ -1870,6 +1867,38 @@ start_compress(const char *filename, uint64_t chunksize, int level)
strcpy(tmpdir, tmp); strcpy(tmpdir, tmp);
} }
/*
* Compressed buffer size must include zlib/dedup scratch space and
* chunk header space.
* See http://www.zlib.net/manual.html#compress2
*
* We do this unconditionally whether user mentioned zlib or not
* to keep it simple. While zlib scratch space is only needed at
* runtime, chunk header is stored in the file.
*
* See start_decompress() routine for details of chunk header.
* We also keep extra 8-byte space for the last chunk's size.
*/
compressed_chunksize = chunksize + CHUNK_HDR_SZ + zlib_buf_extra(chunksize);
if (chunksize + props.buf_extra > compressed_chunksize) {
compressed_chunksize += (chunksize + props.buf_extra -
compressed_chunksize);
}
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
/* Additional scratch space for dedup arrays. */
if (chunksize + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode)
> compressed_chunksize) {
compressed_chunksize += (chunksize +
dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode)) -
compressed_chunksize;
}
}
slab_cache_add(chunksize);
slab_cache_add(compressed_chunksize);
slab_cache_add(sizeof (struct cmp_data));
if (encrypt_type) if (encrypt_type)
flags |= encrypt_type; flags |= encrypt_type;

View file

@ -88,17 +88,18 @@ static cleanup_indx(index_t *indx)
} }
} }
#define MEM_PER_UNIT ( (hash_entry_size + sizeof (hash_entry_t *) + \ #define MEM_PER_UNIT(ent_sz) ( (ent_sz + sizeof (hash_entry_t *) + \
(sizeof (hash_entry_t *)) / 2) + sizeof (hash_entry_t **) ) (sizeof (hash_entry_t *)) / 2) + sizeof (hash_entry_t **) )
#define MEM_REQD(hslots, ent_sz) (hslots * MEM_PER_UNIT(ent_sz))
#define SLOTS_FOR_MEM(memlimit, ent_sz) (memlimit / MEM_PER_UNIT(ent_sz) - 5)
archive_config_t * int
init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz, setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz,
int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim, int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
size_t file_sz, size_t memlimit, int nthreads) size_t file_sz, uint32_t *hash_slots, int *hash_entry_size,
uint32_t *intervals, uint64_t *memreqd, size_t memlimit)
{ {
archive_config_t *cfg; int rv, set_user;
int rv;
float diff;
/* /*
* file_sz = 0 and pct_interval = 0 means we are in pipe mode and want a simple * file_sz = 0 and pct_interval = 0 means we are in pipe mode and want a simple
@ -106,54 +107,92 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
* for the simple index. * for the simple index.
* *
* If file_sz != 0 but pct_interval = 0 then we need to create a simple index * If file_sz != 0 but pct_interval = 0 then we need to create a simple index
* sized for the given file. * sized for the given file. If the available memory is not sufficient for a full
* index and required index size is 1.25x of availble mem then switch to a
* segmented index.
* *
* If file_sz = 0 and pct_interval = 100 then we are in pipe mode and want a segmented * If file_sz != 0 and pct_interval != 0 then we explicitly want to create a segmented
* index. This option is auto-selected to support the previous behavior.
*
* If file_sz = 0 and pct_interval != 0 then we are in pipe mode and want a segmented
* index. This is typically for WAN deduplication of large data transfers. * index. This is typically for WAN deduplication of large data transfers.
*/ */
if (file_sz == 0 && pct_interval == 0) if (file_sz == 0 && *pct_interval == 0)
pct_interval = 100; *pct_interval = 100;
set_user = 0;
cfg = calloc(1, sizeof (archive_config_t)); set_cfg:
rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, user_chunk_sz, pct_interval); rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, *user_chunk_sz, *pct_interval);
if (cfg->dedupe_mode == MODE_SIMPLE) { if (cfg->dedupe_mode == MODE_SIMPLE) {
if (pct_interval != 100) if (*pct_interval != 100)
pct_interval = 0; *pct_interval = 0;
cfg->pct_interval = 0; cfg->pct_interval = 0;
} }
if (path != NULL) { /*
fprintf(stderr, "Disk based index not yet implemented.\n"); * Adjust user_chunk_sz if this is the second try.
free(cfg); */
return (NULL); if (set_user) {
if (*user_chunk_sz < cfg->segment_sz_bytes) {
*user_chunk_sz = cfg->segment_sz_bytes;
} else { } else {
*user_chunk_sz = (*user_chunk_sz / cfg->segment_sz_bytes) * cfg->segment_sz_bytes;
}
}
// Compute total hashtable entries first
*hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1;
if (*pct_interval == 0) {
*intervals = 1;
*hash_slots = file_sz / cfg->chunk_sz_bytes + 1;
} else if (*pct_interval == 100) {
*intervals = 1;
*hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size);
*pct_interval = 0;
} else {
*intervals = 100 / *pct_interval - 1;
*hash_slots = file_sz / cfg->segment_sz_bytes + 1;
*hash_slots *= *intervals;
}
// Compute memory required to hold all hash entries assuming worst case 50%
// occupancy.
*memreqd = MEM_REQD(*hash_slots, *hash_entry_size);
if (*memreqd > (memlimit + (memlimit >> 2)) && cfg->dedupe_mode == MODE_SIMPLE &&
*pct_interval == 0) {
*pct_interval = DEFAULT_PCT_INTERVAL;
set_user = 1;
goto set_cfg;
}
return (rv);
}
archive_config_t *
init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz,
int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
size_t file_sz, size_t memlimit, int nthreads)
{
archive_config_t *cfg;
int rv, orig_pct;
float diff;
uint32_t hash_slots, intervals, i; uint32_t hash_slots, intervals, i;
uint64_t memreqd; uint64_t memreqd;
int hash_entry_size; int hash_entry_size;
index_t *indx; index_t *indx;
hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1; if (path != NULL) {
fprintf(stderr, "Disk based index not yet implemented.\n");
// Compute total hashtable entries first return (NULL);
if (pct_interval == 0) {
intervals = 1;
hash_slots = file_sz / cfg->chunk_sz_bytes + 1;
} else if (pct_interval == 100) {
intervals = 1;
hash_slots = memlimit / MEM_PER_UNIT - 5;
pct_interval = 0;
} else {
intervals = 100 / pct_interval - 1;
hash_slots = file_sz / cfg->segment_sz_bytes + 1;
hash_slots *= intervals;
} }
orig_pct = pct_interval;
cfg = calloc(1, sizeof (archive_config_t));
// Compute memory required to hold all hash entries assuming worst case 50%
// occupancy.
memreqd = hash_slots * MEM_PER_UNIT;
diff = (float)pct_interval / 100.0; diff = (float)pct_interval / 100.0;
rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit);
// Reduce hash_slots to remain within memlimit // Reduce hash_slots to remain within memlimit
while (memreqd > memlimit) { while (memreqd > memlimit) {
@ -162,7 +201,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
} else { } else {
hash_slots -= (hash_slots * diff); hash_slots -= (hash_slots * diff);
} }
memreqd = hash_slots * MEM_PER_UNIT; memreqd = hash_slots * MEM_PER_UNIT(hash_entry_size);
} }
// Now create as many hash tables as there are similarity match intervals // Now create as many hash tables as there are similarity match intervals
@ -203,13 +242,11 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
free(cfg); free(cfg);
return (NULL); return (NULL);
} }
for (i = 0; i < nthreads; i++) { for (i = 0; i < nthreads; i++) {
cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY); cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY);
} }
} }
cfg->dbdata = indx; cfg->dbdata = indx;
}
return (cfg); return (cfg);
} }

View file

@ -42,6 +42,10 @@ typedef struct _hash_entry {
} hash_entry_t; } hash_entry_t;
archive_config_t *init_global_db(char *configfile); archive_config_t *init_global_db(char *configfile);
int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz,
int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
size_t file_sz, uint32_t *hash_slots, int *hash_entry_size,
uint32_t *intervals, uint64_t *memreqd, size_t memlimit);
archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize, archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize,
uint64_t user_chunk_sz, int pct_interval, const char *algo, uint64_t user_chunk_sz, int pct_interval, const char *algo,
cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit, cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit,

View file

@ -37,6 +37,7 @@ extern "C" {
#define DEFAULT_CHUNK_CKSUM CKSUM_SHA256 #define DEFAULT_CHUNK_CKSUM CKSUM_SHA256
#define DEFAULT_SIMILARITY_CKSUM CKSUM_BLAKE256 #define DEFAULT_SIMILARITY_CKSUM CKSUM_BLAKE256
#define DEFAULT_COMPRESS COMPRESS_LZ4 #define DEFAULT_COMPRESS COMPRESS_LZ4
#define DEFAULT_PCT_INTERVAL 2
#define CONTAINER_ITEMS 2048 #define CONTAINER_ITEMS 2048
#define MIN_CK 1 #define MIN_CK 1
#define MAX_CK 5 #define MAX_CK 5
@ -77,6 +78,7 @@ typedef struct {
int seg_fd_w; int seg_fd_w;
int *seg_fd_r; // One read-only fd per thread for mapping in portions of the int *seg_fd_r; // One read-only fd per thread for mapping in portions of the
// segment metadata cache. // segment metadata cache.
int valid;
void *dbdata; void *dbdata;
} archive_config_t; } archive_config_t;

View file

@ -126,6 +126,27 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta
return ((chunksize / dedupe_min_blksz(rab_blk_sz)) * sizeof (uint32_t)); return ((chunksize / dedupe_min_blksz(rab_blk_sz)) * sizeof (uint32_t));
} }
/*
* Helper function to let caller size the the user specific compression chunk/segment
* to align with deduplication requirements.
*/
int
global_dedupe_chkmem(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval,
const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz,
size_t memlimit, int nthreads)
{
uint64_t memreqd;
archive_config_t cfg;
int rv, pct_i, hash_entry_size;
uint32_t intervals, hash_slots;
rv = 0;
pct_i = pct_interval;
rv = setup_db_config_s(&cfg, chunksize, user_chunk_sz, &pct_i, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit);
return (rv);
}
/* /*
* Initialize the algorithm with the default params. * Initialize the algorithm with the default params.
*/ */
@ -181,12 +202,26 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
*/ */
if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) { if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) {
my_sysinfo msys_info; my_sysinfo msys_info;
char *val;
/* /*
* Get available free memory. * Get available free memory.
*/ */
get_sysinfo(&msys_info); get_sysinfo(&msys_info);
if ((val = getenv("PCOMPRESS_INDEX_MEM")) != NULL) {
uint64_t mem;
/*
* Externally specified index limit in MB.
*/
mem = strtoull(val, NULL, 0);
mem *= (1024 * 1024);
if (mem > (1024 * 1024) && mem < msys_info.freeram) {
msys_info.freeram = mem;
}
}
/* /*
* Use a maximum of approx 75% of free RAM for the index. * Use a maximum of approx 75% of free RAM for the index.
*/ */
@ -773,6 +808,12 @@ process_blocks:
} }
pos1 = tgt - ctx->cbuf; pos1 = tgt - ctx->cbuf;
blknum |= GLOBAL_FLAG; blknum |= GLOBAL_FLAG;
} else {
/*
* Segmented similarity based Dedupe.
*/
} }
goto dedupe_done; goto dedupe_done;
} }

View file

@ -127,7 +127,6 @@
#define RABIN_DEDUPE_SEGMENTED 0 #define RABIN_DEDUPE_SEGMENTED 0
#define RABIN_DEDUPE_FIXED 1 #define RABIN_DEDUPE_FIXED 1
#define RABIN_DEDUPE_FILE_GLOBAL 2 #define RABIN_DEDUPE_FILE_GLOBAL 2
#define DEFAULT_PCT_INTERVAL 2
// Mask to extract value from a rabin index entry // Mask to extract value from a rabin index entry
#define RABIN_INDEX_VALUE (0x3FFFFFFFUL) #define RABIN_INDEX_VALUE (0x3FFFFFFFUL)
@ -209,5 +208,8 @@ extern void update_dedupe_hdr(uchar_t *buf, uint64_t dedupe_index_sz_cmp,
extern void reset_dedupe_context(dedupe_context_t *ctx); extern void reset_dedupe_context(dedupe_context_t *ctx);
extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo,
int delta_flag); int delta_flag);
extern int global_dedupe_chkmem(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval,
const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz,
size_t memlimit, int nthreads);
#endif /* _RABIN_POLY_H_ */ #endif /* _RABIN_POLY_H_ */