Complete implementation for Segmented Global Deduplication.

This commit is contained in:
Moinak Ghosh 2013-04-18 21:26:24 +05:30
parent a22b52cf08
commit 8ae571124d
8 changed files with 332 additions and 157 deletions

62
main.c
View file

@ -1659,37 +1659,9 @@ start_compress(const char *filename, uint64_t chunksize, int level)
props.cksum = cksum;
props.buf_extra = 0;
cread_buf = NULL;
if (_props_func) {
_props_func(&props, level, chunksize);
if (chunksize + props.buf_extra > compressed_chunksize) {
compressed_chunksize += (chunksize + props.buf_extra -
compressed_chunksize);
}
}
flags = 0;
sbuf.st_size = 0;
dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
if (enable_rabin_global) {
flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED);
dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL;
} else if (enable_rabin_scan) {
flags |= FLAG_DEDUP;
dedupe_flag = RABIN_DEDUPE_SEGMENTED;
} else {
flags |= FLAG_DEDUP_FIXED;
dedupe_flag = RABIN_DEDUPE_FIXED;
}
/* Additional scratch space for dedup arrays. */
if (chunksize + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode)
> compressed_chunksize) {
compressed_chunksize += (chunksize +
dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode)) -
compressed_chunksize;
}
}
if (encrypt_type) {
uchar_t pw[MAX_PW_LEN];
@ -1867,6 +1839,14 @@ start_compress(const char *filename, uint64_t chunksize, int level)
strcpy(tmpdir, tmp);
}
if (enable_rabin_global && !pipe_mode) {
my_sysinfo msys_info;
get_sys_limits(&msys_info);
global_dedupe_bufadjust(rab_blk_size, &chunksize, 0, algo, cksum,
CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads);
}
/*
* Compressed buffer size must include zlib/dedup scratch space and
* chunk header space.
@ -1885,7 +1865,25 @@ start_compress(const char *filename, uint64_t chunksize, int level)
compressed_chunksize);
}
if (_props_func) {
_props_func(&props, level, chunksize);
if (chunksize + props.buf_extra > compressed_chunksize) {
compressed_chunksize += (chunksize + props.buf_extra -
compressed_chunksize);
}
}
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
if (enable_rabin_global) {
flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED);
dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL;
} else if (enable_rabin_scan) {
flags |= FLAG_DEDUP;
dedupe_flag = RABIN_DEDUPE_SEGMENTED;
} else {
flags |= FLAG_DEDUP_FIXED;
dedupe_flag = RABIN_DEDUPE_FIXED;
}
/* Additional scratch space for dedup arrays. */
if (chunksize + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode)
> compressed_chunksize) {
@ -1908,14 +1906,6 @@ start_compress(const char *filename, uint64_t chunksize, int level)
nprocs = nthreads;
fprintf(stderr, "\n");
if (enable_rabin_global && !pipe_mode) {
my_sysinfo msys_info;
get_sys_limits(&msys_info);
global_dedupe_bufadjust(rab_blk_size, &chunksize, 0, algo, cksum,
CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, nthreads);
}
dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *));
if ((enable_rabin_scan || enable_fixed_scan))
cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize);

View file

@ -108,7 +108,10 @@ get_compress_str(compress_algo_t algo)
static cksum_t
get_cksum_type(char *cksum_name)
{
if (strcmp(cksum_name, "SHA256") == 0) {
if (strcmp(cksum_name, "CRC64") == 0) {
return (CKSUM_CRC64);
} else if (strcmp(cksum_name, "SHA256") == 0) {
return (CKSUM_SHA256);
} else if (strcmp(cksum_name, "SHA512") == 0) {
@ -132,7 +135,10 @@ get_cksum_type(char *cksum_name)
static char *
get_cksum_str(cksum_t ck)
{
if (ck == CKSUM_SHA256) {
if (ck == CKSUM_CRC64) {
return ("CRC64");
} else if (ck == CKSUM_SHA256) {
return ("SHA256");
} else if (ck == CKSUM_SHA512) {
@ -156,7 +162,10 @@ get_cksum_str(cksum_t ck)
static int
get_cksum_sz(cksum_t ck)
{
if (ck == CKSUM_SHA256 || ck == CKSUM_BLAKE256 || ck == CKSUM_KECCAK256) {
if (ck == CKSUM_CRC64) {
return (8);
} else if (ck == CKSUM_SHA256 || ck == CKSUM_BLAKE256 || ck == CKSUM_KECCAK256) {
return (32);
} else if (ck == CKSUM_SHA512 || ck == CKSUM_BLAKE512 || ck == CKSUM_KECCAK512) {
@ -360,9 +369,10 @@ set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim
cfg->archive_sz = file_sz;
cfg->dedupe_mode = MODE_SIMILARITY;
if (cfg->archive_sz <= SIXTEEN_GB || pct_interval == 0 || pct_interval == 100) {
if (cfg->archive_sz <= SIXTEEN_GB && (pct_interval == 0 || pct_interval == 100)) {
cfg->dedupe_mode = MODE_SIMPLE;
cfg->segment_sz_bytes = user_chunk_sz;
cfg->similarity_cksum_sz = cfg->chunk_cksum_sz;
} else if (cfg->archive_sz < ONE_TB) {
cfg->segment_sz_bytes = FOUR_MB;

View file

@ -37,7 +37,7 @@ extern "C" {
#define DEFAULT_CHUNK_CKSUM CKSUM_SHA256
#define DEFAULT_SIMILARITY_CKSUM CKSUM_BLAKE256
#define DEFAULT_COMPRESS COMPRESS_LZ4
#define DEFAULT_PCT_INTERVAL 2
#define DEFAULT_PCT_INTERVAL 5
#define CONTAINER_ITEMS 2048
#define MIN_CK 1
#define MAX_CK 5
@ -71,7 +71,7 @@ typedef struct {
int pct_interval; // Similarity based match intervals in %age.
// The items below are computed given the above
// components.
int intervals;
int intervals, sub_intervals;
dedupe_mode_t dedupe_mode;
uint32_t chunk_sz_bytes; // Average chunk size
@ -83,6 +83,7 @@ typedef struct {
int num_containers; // Number of containers in a directory
int nthreads; // Number of threads processing data segments in parallel
int seg_fd_w;
uint64_t segcache_pos;
uint32_t pagesize;
struct seg_map_fd *seg_fd_r; // One read-only fd per thread for mapping in portions of the
// segment metadata cache.
@ -90,11 +91,14 @@ typedef struct {
void *dbdata;
} archive_config_t;
typedef struct _segment_entry {
uint64_t chunk_offset;
uint32_t chunk_length;
uchar_t *chunk_cksum;
} segment_entry_t;
#pragma pack(1)
typedef struct global_blockentry {
uint32_t length;
uint64_t offset;
struct global_blockentry *next; // Reqd when part of a hashtable
uchar_t cksum[CKSUM_MAX_BYTES];
} global_blockentry_t;
#pragma pack()
int read_config(char *configfile, archive_config_t *cfg);
int write_config(char *configfile, archive_config_t *cfg);

View file

@ -108,7 +108,7 @@ int
setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz,
int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
size_t file_sz, uint32_t *hash_slots, int *hash_entry_size,
uint32_t *intervals, uint64_t *memreqd, size_t memlimit)
uint64_t *memreqd, size_t memlimit)
{
int rv, set_user;
@ -158,17 +158,20 @@ set_cfg:
// Compute total hashtable entries first
*hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1;
if (*pct_interval == 0) {
*intervals = 1;
cfg->intervals = 1;
cfg->sub_intervals = 0;
*hash_slots = file_sz / cfg->chunk_sz_bytes + 1;
} else if (*pct_interval == 100) {
*intervals = 1;
cfg->intervals = 1;
cfg->sub_intervals = 0;
*hash_slots = SLOTS_FOR_MEM(memlimit, *hash_entry_size);
*pct_interval = 0;
} else {
*intervals = 90 / *pct_interval - 1;
cfg->intervals = 100 / *pct_interval;
cfg->sub_intervals = cfg->segment_sz / cfg->intervals;
*hash_slots = file_sz / cfg->segment_sz_bytes + 1;
*hash_slots *= *intervals;
*hash_slots *= (cfg->intervals + cfg->sub_intervals);
}
// Compute memory required to hold all hash entries assuming worst case 50%
@ -191,7 +194,6 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
{
archive_config_t *cfg;
int rv, orig_pct;
float diff;
uint32_t hash_slots, intervals, i;
uint64_t memreqd;
int hash_entry_size;
@ -204,17 +206,12 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
orig_pct = pct_interval;
cfg = calloc(1, sizeof (archive_config_t));
diff = (float)pct_interval / 90.0;
rv = setup_db_config_s(cfg, chunksize, &user_chunk_sz, &pct_interval, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit);
file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit);
// Reduce hash_slots to remain within memlimit
while (memreqd > memlimit) {
if (pct_interval == 0) {
hash_slots--;
} else {
hash_slots -= (hash_slots * diff);
}
hash_slots--;
memreqd = hash_slots * MEM_PER_UNIT(hash_entry_size);
}
@ -228,23 +225,22 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
return (NULL);
}
cfg->nthreads = nthreads;
intervals = cfg->intervals + cfg->sub_intervals;
indx->memlimit = memlimit - (hash_entry_size << 2);
indx->list = (htab_t *)calloc(intervals, sizeof (htab_t));
indx->hash_entry_size = hash_entry_size;
indx->intervals = intervals;
indx->hash_slots = hash_slots / intervals;
cfg->nthreads = nthreads;
cfg->intervals = intervals;
for (i = 0; i < intervals; i++) {
indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals,
sizeof (hash_entry_t *));
indx->list[i].tab = (hash_entry_t **)calloc(indx->hash_slots, sizeof (hash_entry_t *));
if (!(indx->list[i].tab)) {
cleanup_indx(indx);
free(cfg);
return (NULL);
}
indx->memused += ((hash_slots / intervals) * (sizeof (hash_entry_t *)));
indx->memused += ((indx->hash_slots) * (sizeof (hash_entry_t *)));
}
if (pct_interval > 0) {
@ -264,6 +260,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
cfg->seg_fd_r[i].mapping = NULL;
}
}
cfg->segcache_pos = 0;
cfg->dbdata = indx;
return (cfg);
}
@ -277,23 +274,25 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
* Add new segment block list array into the metadata cache. Once added the entry is
* not removed till the program exits.
*/
#define SEGCACHE_HDR_SZ 12
int
db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum,
uint64_t file_offset)
{
int64_t w;
uchar_t *hdr[16];
uchar_t hdr[SEGCACHE_HDR_SZ];
*((uint32_t *)hdr) = len;
*((uint32_t *)(hdr + 4)) = blknum;
*((uint32_t *)(hdr + 8)) = file_offset;
*((uint32_t *)(hdr)) = blknum;
*((uint64_t *)(hdr + 4)) = file_offset;
w = Write(cfg->seg_fd_w, hdr, sizeof (hdr));
if (w < sizeof (hdr))
return (-1);
cfg->segcache_pos += w;
w = Write(cfg->seg_fd_w, buf, len);
if (w < len)
return (-1);
cfg->segcache_pos += w;
return (0);
}
@ -304,20 +303,19 @@ db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, ui
int
db_segcache_pos(archive_config_t *cfg, int tid)
{
return (lseek(cfg->seg_fd_w, 0, SEEK_CUR));
return (cfg->segcache_pos);
}
/*
* Mmap the requested segment metadata array.
*/
int
db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks)
db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t *blocks)
{
uchar_t *mapbuf;
uchar_t *hdr[16];
int64_t r;
uchar_t *mapbuf, *hdr;
int fd;
uint32_t len, adj;
uint64_t pos;
/*
* Ensure previous mapping is removed.
@ -328,22 +326,25 @@ db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offs
return (-1);
/*
* Read header first so that we know how much to map.
* Mmap hdr and blocks. We assume max # of rabin block entries and mmap (unless remaining
* file length is less). The header contains actual number of block entries so mmap-ing
* extra has no consequence other than address space usage.
*/
r = Read(fd, hdr, sizeof (hdr));
if (r < sizeof (hdr))
return (-1);
len = cfg->segment_sz * sizeof (global_blockentry_t) + SEGCACHE_HDR_SZ;
pos = cfg->segcache_pos;
if (pos - *offset < len)
len = pos - *offset;
*offset += sizeof (hdr);
len = *((uint32_t *)hdr);
adj = *offset % cfg->pagesize;
*blknum = *((uint32_t *)(hdr + 4));
mapbuf = mmap(NULL, len + adj, PROT_READ, MAP_SHARED, fd, *offset - adj);
if (mapbuf == MAP_FAILED)
return (-1);
*offset = *((uint32_t *)(hdr + 8));
*blocks = mapbuf + adj;
hdr = mapbuf + adj;
*blknum = *((uint32_t *)(hdr));
*offset = *((uint64_t *)(hdr + 4));
memcpy(blocks, hdr + SEGCACHE_HDR_SZ, *blknum * sizeof (global_blockentry_t));
cfg->seg_fd_r[tid].mapping = mapbuf;
cfg->seg_fd_r[tid].len = len + adj;

View file

@ -45,7 +45,7 @@ archive_config_t *init_global_db(char *configfile);
int setup_db_config_s(archive_config_t *cfg, uint32_t chunksize, uint64_t *user_chunk_sz,
int *pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
size_t file_sz, uint32_t *hash_slots, int *hash_entry_size,
uint32_t *intervals, uint64_t *memreqd, size_t memlimit);
uint64_t *memreqd, size_t memlimit);
archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize,
uint64_t user_chunk_sz, int pct_interval, const char *algo,
cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit,
@ -56,7 +56,7 @@ void destroy_global_db_s(archive_config_t *cfg);
int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset);
int db_segcache_pos(archive_config_t *cfg, int tid);
int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks);
int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t *blocks);
int db_segcache_unmap(archive_config_t *cfg, int tid);
#ifdef __cplusplus

View file

@ -103,6 +103,7 @@ extern int bsdiff(u_char *oldbuf, bsize_t oldsize, u_char *newbuf, bsize_t newsi
extern bsize_t get_bsdiff_sz(u_char *pbuf);
extern int bspatch(u_char *pbuf, u_char *oldbuf, bsize_t oldsize, u_char *newbuf,
bsize_t *_newsize);
extern uint64_t lzma_crc64(const uint8_t *buf, size_t size, uint64_t crc);
static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER;
uint64_t ir[256], out[256];
@ -133,19 +134,19 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta
* to align with deduplication requirements.
*/
int
global_dedupe_bufadjust(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval,
global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_interval,
const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz,
size_t memlimit, int nthreads)
{
uint64_t memreqd;
archive_config_t cfg;
int rv, pct_i, hash_entry_size;
uint32_t intervals, hash_slots;
uint32_t hash_slots;
rv = 0;
pct_i = pct_interval;
rv = setup_db_config_s(&cfg, chunksize, user_chunk_sz, &pct_i, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &intervals, &memreqd, memlimit);
rv = setup_db_config_s(&cfg, rab_blk_sz, user_chunk_sz, &pct_i, algo, ck, ck_sim,
file_sz, &hash_slots, &hash_entry_size, &memreqd, memlimit);
return (rv);
}
@ -210,8 +211,8 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
*/
get_sys_limits(&msys_info);
arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0,
algo, props->cksum, props->cksum, file_size,
arc = init_global_db_s(NULL, "/tmp", rab_blk_sz, chunksize, 0,
algo, props->cksum, CKSUM_CRC64, file_size,
msys_info.freeram, props->nthreads);
if (arc == NULL) {
pthread_mutex_unlock(&init_lock);
@ -307,7 +308,8 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
}
if (arc && dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) {
ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, arc->intervals, 32);
ctx->similarity_cksums = (uchar_t *)slab_calloc(NULL, arc->intervals + arc->sub_intervals,
arc->similarity_cksum_sz);
if (!ctx->similarity_cksums) {
fprintf(stderr,
"Could not allocate dedupe context, out of memory\n");
@ -436,21 +438,22 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of
}
if (rabin_pos == NULL) {
/*
* Initialize arrays for sketch computation. We re-use memory allocated
* for the compressed chunk temporarily.
*/
ary_sz = ctx->rabin_poly_max_block_size;
ctx_heap = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz);
/*
* If global dedupe is active, the global blocks array uses temp space in
* the target buffer.
*/
ary_sz = 0;
if (ctx->arc != NULL) {
ary_sz += (sizeof (global_blockentry_t) * (*size / ctx->rabin_poly_min_block_size + 1));
ary_sz = (sizeof (global_blockentry_t) * (*size / ctx->rabin_poly_min_block_size + 1));
ctx->g_blocks = (global_blockentry_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz);
}
/*
* Initialize arrays for sketch computation. We re-use memory allocated
* for the compressed chunk temporarily.
*/
ary_sz += ctx->rabin_poly_max_block_size;
ctx_heap = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz);
}
#ifndef SSE_MODE
memset(ctx->current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE);
@ -699,8 +702,9 @@ process_blocks:
matchlen = 0;
if (ctx->arc->dedupe_mode == MODE_SIMPLE) {
/*
/*======================================================================
* This code block implements Global Dedupe with simple in-memory index.
*======================================================================
*/
/*
* Now lookup blocks in index. First wait for our semaphore to be
@ -780,60 +784,89 @@ process_blocks:
} else {
uchar_t *seg_heap, *sim_ck;
archive_config_t *cfg;
uint32_t increment, len, mapped_blks;
uint32_t increment, len, blks, o_blks, k;
global_blockentry_t *seg_blocks;
uint64_t seg_offset, offset;
int written;
global_blockentry_t **htab;
global_blockentry_t **htab, *be;
/*
/*======================================================================
* This code block implements Segmented similarity based Dedupe with
* in-memory index.
* ======================================================================
*/
cfg = ctx->arc;
seg_heap = (uchar_t *)ctx->g_blocks - cfg->segment_sz_bytes;
seg_heap = (uchar_t *)(ctx->g_blocks) - cfg->segment_sz_bytes;
ary_sz = cfg->segment_sz * sizeof (global_blockentry_t **);
htab = (global_blockentry_t **)(seg_heap - ary_sz);
for (i=0; i<blknum; i += cfg->segment_sz) {
blake2b_state S1, S2;
seg_blocks = (global_blockentry_t *)(seg_heap - ary_sz - \
cfg->segment_sz * sizeof (global_blockentry_t));
for (i=0; i<blknum;) {
uint64_t crc;
length = 0;
/*
* Compute length of current segment.
*/
for (j=i; j<blknum && j<cfg->segment_sz; j++)
length += ctx->g_blocks[i].length;
blks = cfg->segment_sz;
if (blks > blknum-i) blks = blknum-i;
len = 0;
for (j=0; j<blks; j++) {
len += ctx->g_blocks[j+i].length;
if (len > cfg->segment_sz_bytes) {
break;
}
length = len;
}
blks = j+i;
/*
* Compute the cumulative similarity minhashes.
*/
sim_ck = ctx->similarity_cksums;
increment = length / cfg->intervals;
len = increment;
src = buf1 + ctx->g_blocks[i].offset;
tgt = seg_heap;
memcpy(tgt, src, length);
bdsp.blake2b_init(&S1, 32);
for (j=0; j<cfg->intervals; j++) {
crc = 0;
increment = (length / cfg->intervals) / cfg->sub_intervals;
len = increment;
for (j=0; j<cfg->sub_intervals; j++) {
reset_heap(&heap, len/8);
ksmallest((int64_t *)seg_heap, length, &heap);
bdsp.blake2b_update(&S1, seg_heap + len - increment, increment);
memcpy(&S2, &S1, sizeof (S1));
bdsp.blake2b_final(&S2, sim_ck, 32);
ksmallest((int64_t *)seg_heap, length/8, &heap);
crc = lzma_crc64(seg_heap + len - increment, increment, crc);
*((uint64_t *)sim_ck) = crc;
len += increment;
sim_ck += 32;
sim_ck += cfg->similarity_cksum_sz;
}
len -= increment;
increment = length / cfg->intervals;
len = increment * 2;
for (j=0; j<cfg->intervals-1; j++) {
reset_heap(&heap, len/8);
ksmallest((int64_t *)seg_heap, length/8, &heap);
crc = lzma_crc64(seg_heap + len - increment, increment, crc);
*((uint64_t *)sim_ck) = crc;
len += increment;
sim_ck += cfg->similarity_cksum_sz;
}
/*
* Begin shared index access and write segment metadata to cache
* first.
*/
if (i == 0) sem_wait(ctx->index_sem);
sim_ck -= cfg->similarity_cksum_sz;
seg_offset = db_segcache_pos(cfg, ctx->id);
src = (uchar_t *)&(ctx->g_blocks[i]);
len = blks * sizeof (global_blockentry_t);
db_segcache_write(cfg, ctx->id, src, len, blks,
ctx->file_offset + ctx->g_blocks[i].offset);
/*
* Now lookup the similarity minhashes starting at the highest
* significance level.
*/
sim_ck -= 32;
written = 0;
increment = cfg->intervals * cfg->pct_interval;
sem_wait(ctx->index_sem);
seg_offset = db_segcache_pos(cfg, ctx->id);
for (j=cfg->intervals; j > 0; j--) {
for (j=cfg->intervals + cfg->sub_intervals; j > 0; j--) {
hash_entry_t *he;
he = db_lookup_insert_s(cfg, sim_ck, j-1, seg_offset, 0, 1);
@ -844,35 +877,181 @@ process_blocks:
*/
memset(htab, 0, ary_sz);
offset = he->item_offset;
if (db_segcache_map(cfg, ctx->id, &mapped_blks, &offset,
(uchar_t **)&seg_blocks) == -1) {
if (db_segcache_map(cfg, ctx->id, &o_blks, &offset,
(uchar_t *)seg_blocks) == -1) {
fprintf(stderr, "Segment cache mmap failed.\n");
ctx->valid = 0;
return (0);
}
if (increment > 70) {
j = cfg->intervals - j;
/*
* First insert all the unique blocks from the mapped segment
* blocks(chunks) array into the hashtable.
*/
for (k=0; k<o_blks; k++) {
uint32_t hent;
hent = XXH32(&(seg_blocks[k].cksum[0]), cfg->chunk_cksum_sz, 0);
hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz;
if (htab[hent] == NULL) {
htab[hent] = &(seg_blocks[k]);
seg_blocks[k].offset += offset;
seg_blocks[k].next = NULL;
} else {
be = htab[hent];
do {
if (memcmp(seg_blocks[k].cksum,
be->cksum, cfg->chunk_cksum_sz) == 0 &&
seg_blocks[k].length == be->length) {
be = NULL;
break;
}
if (be->next)
be = be->next;
else
break;
} while(1);
/*
* be will be non-NULL if no match was found.
* It will the last bucket in the hash slot.
*/
if (be) {
be->next = &(seg_blocks[k]);
seg_blocks[k].offset += offset;
seg_blocks[k].next = NULL;
}
}
}
} else if (!written) {
if (blknum - i >= cfg->segment_sz) {
db_segcache_write(cfg, ctx->id, src, len, cfg->segment_sz,
ctx->file_offset);
} else {
db_segcache_write(cfg, ctx->id, src, len, blknum-i,
ctx->file_offset);
/*
* Now lookup current segment blocks(chunks) in the hashtable and
* perform the actual deduplication.
*/
be = NULL;
for (k=i; k<blks; k++) {
uint32_t hent;
if (ctx->g_blocks[k].length & RABIN_INDEX_FLAG) continue;
hent = XXH32(ctx->g_blocks[k].cksum, cfg->chunk_cksum_sz, 0);
hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz;
if (htab[hent] == NULL) {
htab[hent] = &(ctx->g_blocks[k]);
ctx->g_blocks[k].offset += ctx->file_offset;
ctx->g_blocks[k].next = NULL;
be = NULL;
} else {
be = htab[hent];
do {
if (memcmp(ctx->g_blocks[k].cksum,
be->cksum, cfg->chunk_cksum_sz) == 0 &&
ctx->g_blocks[k].length == be->length) {
break;
}
if (be->next) {
be = be->next;
} else {
be->next = &(ctx->g_blocks[k]);
be->next->offset +=
ctx->file_offset;
be->next->next = NULL;
be = NULL;
break;
}
} while(1);
}
/*
* be will be non-NULL if match was found. It will
* point to the matching bucket.
*/
if (be) {
global_blockentry_t *en;
/*
* Block match in index was found. Update g_blocks
* array.
*/
en = &(ctx->g_blocks[k]);
en->length = (en->length | RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG;
en->offset = be->offset;
}
}
written = 1;
break;
}
increment -= cfg->pct_interval;
sim_ck -= 32;
sim_ck -= cfg->similarity_cksum_sz;
}
i += blks;
}
/*
* Signal the next thread in sequence to access the index.
*/
sem_post(ctx->index_sem_next);
/*======================================================================
* Finally scan the blocks array and update dedupe index.
*======================================================================
*/
length = 0;
for (i=0; i<blknum; i++) {
if (!(ctx->g_blocks[i].length & RABIN_INDEX_FLAG)) {
/*
* Block match in index not found.
* Block was added to index. Merge this block.
*/
if (length + ctx->g_blocks[i].length > RABIN_MAX_BLOCK_SIZE) {
*((uint32_t *)g_dedupe_idx) = LE32(length);
g_dedupe_idx += RABIN_ENTRY_SIZE;
length = 0;
dedupe_index_sz++;
}
length += ctx->g_blocks[i].length;
} else {
/*
* Block match in index was found.
*/
if (length > 0) {
/*
* Write pending accumulated block length value.
*/
*((uint32_t *)g_dedupe_idx) = LE32(length);
g_dedupe_idx += RABIN_ENTRY_SIZE;
length = 0;
dedupe_index_sz++;
}
/*
* Add a reference entry to the dedupe array.
*/
*((uint32_t *)g_dedupe_idx) = LE32(ctx->g_blocks[i].length);
g_dedupe_idx += RABIN_ENTRY_SIZE;
*((uint64_t *)g_dedupe_idx) = LE64(ctx->g_blocks[i].offset);
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
matchlen += (ctx->g_blocks[i].length & RABIN_INDEX_VALUE);
dedupe_index_sz += 3;
}
}
/*
* Write final pending block length value (if any).
*/
if (length > 0) {
*((uint32_t *)g_dedupe_idx) = LE32(length);
g_dedupe_idx += RABIN_ENTRY_SIZE;
length = 0;
dedupe_index_sz++;
}
blknum = dedupe_index_sz; // Number of entries in block list
tgt = g_dedupe_idx;
g_dedupe_idx = ctx->cbuf + RABIN_HDR_SIZE;
dedupe_index_sz = tgt - g_dedupe_idx;
src = buf1;
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
}
/*

View file

@ -163,15 +163,6 @@ typedef struct rab_blockentry {
struct rab_blockentry *next;
} rabin_blockentry_t;
#pragma pack(1)
typedef struct global_blockentry {
uint32_t length;
uint64_t offset;
struct global_blockentry *next; // Reqd when part of a hashtable
uchar_t cksum[CKSUM_MAX_BYTES];
} global_blockentry_t;
#pragma pack()
typedef struct {
unsigned char *current_window_data;
rabin_blockentry_t **blocks;
@ -212,7 +203,7 @@ extern void update_dedupe_hdr(uchar_t *buf, uint64_t dedupe_index_sz_cmp,
extern void reset_dedupe_context(dedupe_context_t *ctx);
extern uint32_t dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo,
int delta_flag);
extern int global_dedupe_bufadjust(uint32_t chunksize, uint64_t *user_chunk_sz, int pct_interval,
extern int global_dedupe_bufadjust(uint32_t rab_blk_sz, uint64_t *user_chunk_sz, int pct_interval,
const char *algo, cksum_t ck, cksum_t ck_sim, size_t file_sz,
size_t memlimit, int nthreads);

View file

@ -396,13 +396,13 @@ get_sys_limits(my_sysinfo *msys_info)
*/
mem = strtoull(val, NULL, 0);
mem *= (1024 * 1024);
if (mem > (1024 * 1024) && mem < msys_info->freeram) {
if (mem >= (1024 * 1024) && mem < msys_info->freeram) {
msys_info->freeram = mem;
}
} else {
/*
* Use a maximum of approx 75% of free RAM for the index(if limit was not specified).
*/
msys_info->freeram = (msys_info->freeram >> 1) + (msys_info->freeram >> 2);
}
/*
* Use a maximum of approx 75% of free RAM for the index.
*/
msys_info->freeram = (msys_info->freeram >> 1) + (msys_info->freeram >> 2);
}