Work in progress changes for Segmented Global Deduplication.

This commit is contained in:
Moinak Ghosh 2013-04-09 22:23:51 +05:30
parent 3d7a179a77
commit 50251107de
9 changed files with 157 additions and 94 deletions

View file

@ -84,8 +84,8 @@ BZLIB_HDRS = $(MAINHDRS)
BZLIB_OBJS = $(BZLIB_SRCS:.c=.o) BZLIB_OBJS = $(BZLIB_SRCS:.c=.o)
BZLIB_CPPFLAGS = @LIBBZ2_INC@ BZLIB_CPPFLAGS = @LIBBZ2_INC@
RABINSRCS = rabin/rabin_dedup.c rabin/global/db.c rabin/global/dedupe_config.c RABINSRCS = rabin/rabin_dedup.c rabin/global/index.c rabin/global/dedupe_config.c
RABINHDRS = rabin/rabin_dedup.h utils/utils.h rabin/global/db.h rabin/global/dedupe_config.h RABINHDRS = rabin/rabin_dedup.h utils/utils.h rabin/global/index.h rabin/global/dedupe_config.h
RABINOBJS = $(RABINSRCS:.c=.o) RABINOBJS = $(RABINSRCS:.c=.o)
BSDIFFSRCS = bsdiff/bsdiff.c bsdiff/bspatch.c bsdiff/rle_encoder.c BSDIFFSRCS = bsdiff/bsdiff.c bsdiff/bspatch.c bsdiff/rle_encoder.c

8
main.c
View file

@ -1908,6 +1908,14 @@ start_compress(const char *filename, uint64_t chunksize, int level)
nprocs = nthreads; nprocs = nthreads;
fprintf(stderr, "\n"); fprintf(stderr, "\n");
if (enable_rabin_global && !pipe_mode) {
my_sysinfo msys_info;
get_sys_limits(&msys_info);
global_dedupe_bufadjust(rab_blk_size, &chunksize, 0, algo, cksum,
CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads);
}
dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *)); dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *));
if ((enable_rabin_scan || enable_fixed_scan)) if ((enable_rabin_scan || enable_fixed_scan))
cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize); cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize);

View file

@ -36,7 +36,7 @@
#include <rabin_dedup.h> #include <rabin_dedup.h>
#include "dedupe_config.h" #include "dedupe_config.h"
#include "db.h" #include "index.h"
static compress_algo_t static compress_algo_t
get_compress_level(compress_algo_t algo) get_compress_level(compress_algo_t algo)

View file

@ -36,7 +36,7 @@
#include <pthread.h> #include <pthread.h>
#include <xxhash.h> #include <xxhash.h>
#include "db.h" #include "index.h"
/* /*
* Hashtable structures for in-memory index. * Hashtable structures for in-memory index.
@ -117,9 +117,12 @@ setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chun
* If file_sz = 0 and pct_interval != 0 then we are in pipe mode and want a segmented * If file_sz = 0 and pct_interval != 0 then we are in pipe mode and want a segmented
* index. This is typically for WAN deduplication of large data transfers. * index. This is typically for WAN deduplication of large data transfers.
*/ */
if (pct_interval != 0)
set_user = 0;
else
set_user = 1;
if (file_sz == 0 && *pct_interval == 0) if (file_sz == 0 && *pct_interval == 0)
*pct_interval = 100; *pct_interval = 100;
set_user = 0;
set_cfg: set_cfg:
rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, *user_chunk_sz, *pct_interval); rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, *user_chunk_sz, *pct_interval);
@ -131,11 +134,11 @@ set_cfg:
} }
/* /*
* Adjust user_chunk_sz if this is the second try. * Adjust user_chunk_sz if indicated.
*/ */
if (set_user) { if (set_user) {
if (*user_chunk_sz < cfg->segment_sz_bytes) { if (*user_chunk_sz < cfg->segment_sz_bytes) {
*user_chunk_sz = cfg->segment_sz_bytes; *user_chunk_sz = cfg->segment_sz_bytes + (cfg->segment_sz_bytes >> 1);
} else { } else {
*user_chunk_sz = (*user_chunk_sz / cfg->segment_sz_bytes) * cfg->segment_sz_bytes; *user_chunk_sz = (*user_chunk_sz / cfg->segment_sz_bytes) * cfg->segment_sz_bytes;
} }

View file

@ -22,8 +22,8 @@
* moinakg@belenix.org, http://moinakg.wordpress.com/ * moinakg@belenix.org, http://moinakg.wordpress.com/
*/ */
#ifndef _DB_H #ifndef _INDEX_H
#define _DB_H #define _INDEX_H
#include <dedupe_config.h> #include <dedupe_config.h>

View file

@ -131,7 +131,7 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta
* to align with deduplication requirements. * to align with deduplication requirements.
*/ */
int int
global_dedupe_chkmem(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, global_dedupe_bufadjust(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval,
const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz,
size_t memlimit, int nthreads) size_t memlimit, int nthreads)
{ {
@ -202,30 +202,12 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
*/ */
if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) { if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) {
my_sysinfo msys_info; my_sysinfo msys_info;
char *val;
/* /*
* Get available free memory. * Get amount of memory to use. The freeram got here is adjusted amount.
*/ */
get_sysinfo(&msys_info); get_sys_limits(&msys_info);
if ((val = getenv("PCOMPRESS_INDEX_MEM")) != NULL) {
uint64_t mem;
/*
* Externally specified index limit in MB.
*/
mem = strtoull(val, NULL, 0);
mem *= (1024 * 1024);
if (mem > (1024 * 1024) && mem < msys_info.freeram) {
msys_info.freeram = mem;
}
}
/*
* Use a maximum of approx 75% of free RAM for the index.
*/
msys_info.freeram = (msys_info.freeram >> 1) + (msys_info.freeram >> 2);
arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0, arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0,
algo, props->cksum, props->cksum, file_size, algo, props->cksum, props->cksum, file_size,
msys_info.freeram, props->nthreads); msys_info.freeram, props->nthreads);
@ -270,6 +252,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
ctx->delta_flag = 0; ctx->delta_flag = 0;
ctx->deltac_min_distance = props->deltac_min_distance; ctx->deltac_min_distance = props->deltac_min_distance;
ctx->pagesize = sysconf(_SC_PAGE_SIZE); ctx->pagesize = sysconf(_SC_PAGE_SIZE);
ctx->similarity_cksums = NULL;
/* /*
* Scale down similarity percentage based on avg block size unless user specified * Scale down similarity percentage based on avg block size unless user specified
@ -318,6 +301,17 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
return (NULL); return (NULL);
} }
if (arc && dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) {
ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, 90 / arc->pct_interval, 32);
if (!ctx->similarity_cksums) {
fprintf(stderr,
"Could not allocate dedupe context, out of memory\n");
destroy_dedupe_context(ctx);
return (NULL);
}
ctx->similarity_count = 90 / arc->pct_interval;
}
ctx->lzma_data = NULL; ctx->lzma_data = NULL;
ctx->level = 14; ctx->level = 14;
if (real_chunksize > 0) { if (real_chunksize > 0) {
@ -371,6 +365,7 @@ destroy_dedupe_context(dedupe_context_t *ctx)
} }
slab_free(NULL, ctx->blocks); slab_free(NULL, ctx->blocks);
} }
if (ctx->similarity_cksums) slab_free(NULL, ctx->similarity_cksums);
if (ctx->lzma_data) lzma_deinit(&(ctx->lzma_data)); if (ctx->lzma_data) lzma_deinit(&(ctx->lzma_data));
slab_free(NULL, ctx); slab_free(NULL, ctx);
} }
@ -670,8 +665,8 @@ process_blocks:
* If global dedupe is enabled then process it here. * If global dedupe is enabled then process it here.
*/ */
if (ctx->arc) { if (ctx->arc) {
if (ctx->arc->dedupe_mode == MODE_SIMPLE) {
uchar_t *g_dedupe_idx, *tgt, *src; uchar_t *g_dedupe_idx, *tgt, *src;
/* /*
* First compute all the rabin chunk/block cryptographic hashes. * First compute all the rabin chunk/block cryptographic hashes.
*/ */
@ -691,15 +686,18 @@ process_blocks:
dedupe_index_sz = 0; dedupe_index_sz = 0;
/* /*
* First entry in table in the original file offset where this * First entry in table is the original file offset where this
* data segment begins. * data segment begins.
*/ */
*((uint64_t *)g_dedupe_idx) = LE64(ctx->file_offset); *((uint64_t *)g_dedupe_idx) = LE64(ctx->file_offset);
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
dedupe_index_sz += 2; dedupe_index_sz += 2;
length = 0;
matchlen = 0; matchlen = 0;
if (ctx->arc->dedupe_mode == MODE_SIMPLE) {
/*
* This code block implements Global Dedupe with simple in-memory index.
*/
/* /*
* Now lookup blocks in index. First wait for our semaphore to be * Now lookup blocks in index. First wait for our semaphore to be
* signaled. If the previous thread in sequence is using the index * signaled. If the previous thread in sequence is using the index
@ -707,6 +705,7 @@ process_blocks:
* predictable serialization of index access in a sequence of * predictable serialization of index access in a sequence of
* threads without locking. * threads without locking.
*/ */
length = 0;
sem_wait(ctx->index_sem); sem_wait(ctx->index_sem);
for (i=0; i<blknum; i++) { for (i=0; i<blknum; i++) {
hash_entry_t *he; hash_entry_t *he;
@ -774,6 +773,45 @@ process_blocks:
src = buf1; src = buf1;
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2); g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
} else {
uchar_t *seg_heap, *sim_ck;
archive_config_t *cfg;
uint32_t increment, len;
/*
* This code block implements Segmented similarity based Dedupe with
* in-memory index.
*/
cfg = ctx->arc;
seg_heap = (uchar_t *)ctx->g_blocks - cfg->segment_sz_bytes;
for (i=0; i<blknum; i += cfg->segment_sz) {
length = 0;
/*
* Compute length of current segment.
*/
for (j=i; j<blknum && j<cfg->segment_sz; j++)
length += ctx->g_blocks[i].length;
/*
* Compute the cumulative similarity minhashes.
*/
sim_ck = ctx->similarity_cksums;
increment = length / ctx->similarity_count;
len = increment;
src = buf1 + ctx->g_blocks[i].offset;
tgt = seg_heap;
memcpy(tgt, src, length);
for (j=0; j<ctx->similarity_count; j++) {
reset_heap(&heap, len/8);
ksmallest((int64_t *)seg_heap, length, &heap);
compute_checksum(sim_ck, CKSUM_BLAKE256, seg_heap, len, 0, 0);
len += increment;
sim_ck += 32;
}
}
}
/* /*
* Deduplication reduction should at least be greater than block list metadata. * Deduplication reduction should at least be greater than block list metadata.
*/ */
@ -808,13 +846,6 @@ process_blocks:
} }
pos1 = tgt - ctx->cbuf; pos1 = tgt - ctx->cbuf;
blknum |= GLOBAL_FLAG; blknum |= GLOBAL_FLAG;
} else {
/*
* Segmented similarity based Dedupe.
*/
}
goto dedupe_done; goto dedupe_done;
} }

View file

@ -64,7 +64,7 @@
#define _RABIN_POLY_H_ #define _RABIN_POLY_H_
#include "utils.h" #include "utils.h"
#include <db.h> #include <index.h>
#include <crypto_utils.h> #include <crypto_utils.h>
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
@ -188,6 +188,8 @@ typedef struct {
archive_config_t *arc; archive_config_t *arc;
sem_t *index_sem; sem_t *index_sem;
sem_t *index_sem_next; sem_t *index_sem_next;
uchar_t *similarity_cksums;
int similarity_count;
uint32_t pagesize; uint32_t pagesize;
int out_fd; int out_fd;
int id; int id;
@ -208,7 +210,7 @@ extern void update_dedupe_hdr(uchar_t *buf, uint64_t dedupe_index_sz_cmp,
extern void reset_dedupe_context(dedupe_context_t *ctx); extern void reset_dedupe_context(dedupe_context_t *ctx);
extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo,
int delta_flag); int delta_flag);
extern int global_dedupe_chkmem(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval, extern int global_dedupe_bufadjust(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval,
const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz, const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz,
size_t memlimit, int nthreads); size_t memlimit, int nthreads);

View file

@ -371,10 +371,11 @@ get_mb_s(uint64_t bytes, double strt, double en)
} }
void void
get_sysinfo(my_sysinfo *msys_info) get_sys_limits(my_sysinfo *msys_info)
{ {
struct sysinfo sys_info; struct sysinfo sys_info;
int rv; int rv;
char *val;
rv = sysinfo(&sys_info); rv = sysinfo(&sys_info);
@ -386,4 +387,22 @@ get_sysinfo(my_sysinfo *msys_info)
msys_info->totalswap = sys_info.totalswap; msys_info->totalswap = sys_info.totalswap;
msys_info->freeswap = sys_info.freeswap; msys_info->freeswap = sys_info.freeswap;
msys_info->mem_unit = sys_info.mem_unit; msys_info->mem_unit = sys_info.mem_unit;
if ((val = getenv("PCOMPRESS_INDEX_MEM")) != NULL) {
uint64_t mem;
/*
* Externally specified index limit in MB.
*/
mem = strtoull(val, NULL, 0);
mem *= (1024 * 1024);
if (mem > (1024 * 1024) && mem < msys_info->freeram) {
msys_info->freeram = mem;
}
}
/*
* Use a maximum of approx 75% of free RAM for the index.
*/
msys_info->freeram = (msys_info->freeram >> 1) + (msys_info->freeram >> 2);
} }

View file

@ -217,7 +217,7 @@ extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs,
extern uint64_t get_total_ram(); extern uint64_t get_total_ram();
extern double get_wtime_millis(void); extern double get_wtime_millis(void);
extern double get_mb_s(uint64_t bytes, double strt, double en); extern double get_mb_s(uint64_t bytes, double strt, double en);
extern void get_sysinfo(my_sysinfo *msys_info); extern void get_sys_limits(my_sysinfo *msys_info);
extern void init_algo_props(algo_props_t *props); extern void init_algo_props(algo_props_t *props);
extern void init_pcompress(); extern void init_pcompress();