Implement Global Deduplication.

This commit is contained in:
Moinak Ghosh 2013-03-24 23:21:17 +05:30
parent 876796be5c
commit fbf4658635
8 changed files with 402 additions and 74 deletions

View file

@ -203,7 +203,7 @@ get_crypto_alg(char *name)
* need or want another level of parallelism to cause contention.
*/
int
compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, int mt)
compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, int mt, int verbose)
{
DEBUG_STAT_EN(double strt, en);
@ -213,7 +213,7 @@ compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, in
assert(mt == 0 || mt == 1);
#endif
DEBUG_STAT_EN(strt = get_wtime_millis());
DEBUG_STAT_EN(if (verbose) strt = get_wtime_millis());
if (cksum == CKSUM_CRC64) {
uint64_t *ck = (uint64_t *)cksum_buf;
*ck = lzma_crc64(buf, bytes, 0);
@ -321,8 +321,8 @@ compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, in
} else {
return (-1);
}
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Checksum computed at %.3f MB/s\n", get_mb_s(bytes, strt, en)));
DEBUG_STAT_EN(if (verbose) en = get_wtime_millis());
DEBUG_STAT_EN(if (verbose) fprintf(stderr, "Checksum computed at %.3f MB/s\n", get_mb_s(bytes, strt, en)));
return (0);
}
@ -789,10 +789,10 @@ init_crypto(crypto_ctx_t *cctx, uchar_t *pwd, int pwd_len, int crypto_alg,
b += 4;
*((uint32_t *)&sb[b]) = getpid();
b += 4;
compute_checksum(&sb[b], CKSUM_SHA256, sb, b, 0);
compute_checksum(&sb[b], CKSUM_SHA256, sb, b, 0, 0);
b = 8 + 4;
*((uint32_t *)&sb[b]) = rand();
compute_checksum(salt, CKSUM_SHA256, &sb[b], 32 + 4, 0);
compute_checksum(salt, CKSUM_SHA256, &sb[b], 32 + 4, 0, 0);
}
}

View file

@ -80,7 +80,7 @@ typedef struct {
/*
* Generic message digest functions.
*/
int compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, int mt);
int compute_checksum(uchar_t *cksum_buf, int cksum, uchar_t *buf, uint64_t bytes, int mt, int verbose);
void list_checksums(FILE *strm, char *pad);
int get_checksum_props(const char *name, int *cksum, int *cksum_bytes,
int *mac_bytes, int accept_compatible);

107
main.c
View file

@ -492,9 +492,9 @@ redo:
deserialize_checksum(tdat->checksum, tdat->compressed_chunk, cksum_bytes);
}
if ((enable_rabin_scan || enable_fixed_scan) && (HDR & CHUNK_FLAG_DEDUP)) {
if ((enable_rabin_scan || enable_fixed_scan || enable_rabin_global) &&
(HDR & CHUNK_FLAG_DEDUP)) {
uchar_t *cmpbuf, *ubuf;
/* Extract various sizes from dedupe header. */
parse_dedupe_hdr(cseg, &blknum, &dedupe_index_sz, &dedupe_data_sz,
&dedupe_index_sz_cmp, &dedupe_data_sz_cmp, &_chunksize);
@ -519,8 +519,8 @@ redo:
rv = tdat->decompress(cmpbuf, dedupe_data_sz_cmp, ubuf, &_chunksize,
tdat->level, HDR, tdat->data);
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Chunk decompression speed %.3f MB/s\n",
get_mb_s(_chunksize, strt, en)));
DEBUG_STAT_EN(fprintf(stderr, "Chunk %d decompression speed %.3f MB/s\n",
tdat->id, get_mb_s(_chunksize, strt, en)));
}
if (rv == -1) {
tdat->len_cmp = 0;
@ -551,6 +551,13 @@ redo:
memcpy(ubuf, cmpbuf, dedupe_index_sz);
} else {
/*
* This chunk was not deduplicated, however we still need to down the
* semaphore in order to maintain proper thread coordination.
*/
if (enable_rabin_global) {
sem_wait(tdat->rctx->index_sem);
}
if (HDR & COMPRESSED) {
if (HDR & CHUNK_FLAG_PREPROC) {
rv = preproc_decompress(tdat->decompress, cseg, tdat->len_cmp,
@ -607,7 +614,7 @@ redo:
* If it does not match we set length of chunk to 0 to indicate
* exit to the writer thread.
*/
compute_checksum(checksum, cksum, tdat->uncompressed_chunk, _chunksize, tdat->cksum_mt);
compute_checksum(checksum, cksum, tdat->uncompressed_chunk, _chunksize, tdat->cksum_mt, 1);
if (memcmp(checksum, tdat->checksum, cksum_bytes) != 0) {
tdat->len_cmp = 0;
fprintf(stderr, "ERROR: Chunk %d, checksums do not match.\n", tdat->id);
@ -663,9 +670,10 @@ start_decompress(const char *filename, const char *to_filename)
char algorithm[ALGO_SZ];
struct stat sbuf;
struct wdata w;
int compfd = -1, i, p;
int compfd = -1, p, dedupe_flag;
int uncompfd = -1, err, np, bail;
int nprocs = 1, thread = 0, level;
unsigned int i;
short version, flags;
int64_t chunksize, compressed_chunksize;
struct cmp_data **dary, *tdat;
@ -690,7 +698,7 @@ start_decompress(const char *filename, const char *to_filename)
if (sbuf.st_size == 0)
return (1);
if ((uncompfd = open(to_filename, O_WRONLY|O_CREAT|O_TRUNC, 0)) == -1) {
if ((uncompfd = open(to_filename, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR | S_IWUSR)) == -1) {
close(compfd);
err_exit(1, "Cannot open: %s", to_filename);
}
@ -768,12 +776,20 @@ start_decompress(const char *filename, const char *to_filename)
}
}
dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler
if (flags & FLAG_DEDUP) {
enable_rabin_scan = 1;
dedupe_flag = RABIN_DEDUPE_SEGMENTED;
if (flags & FLAG_DEDUP_FIXED) {
if (version > 7) {
if (pipe_mode) {
fprintf(stderr, "Global Deduplication is not supported with pipe mode.\n");
err = 1;
goto uncomp_done;
}
enable_rabin_global = 1;
dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL;
} else {
fprintf(stderr, "Invalid file deduplication flags.\n");
err = 1;
@ -782,6 +798,7 @@ start_decompress(const char *filename, const char *to_filename)
}
} else if (flags & FLAG_DEDUP_FIXED) {
enable_fixed_scan = 1;
dedupe_flag = RABIN_DEDUPE_FIXED;
}
if (flags & FLAG_SINGLE_CHUNK) {
@ -1054,6 +1071,7 @@ start_decompress(const char *filename, const char *to_filename)
tdat->compress = _compress_func;
tdat->decompress = _decompress_func;
tdat->cancel = 0;
tdat->decompressing = 1;
if (props.is_single_chunk) {
tdat->cksum_mt = 1;
if (version == 6) {
@ -1068,19 +1086,28 @@ start_decompress(const char *filename, const char *to_filename)
sem_init(&(tdat->start_sem), 0, 0);
sem_init(&(tdat->cmp_done_sem), 0, 0);
sem_init(&(tdat->write_done_sem), 0, 1);
sem_init(&(tdat->index_sem), 0, 0);
if (_init_func) {
if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize,
version, DECOMPRESS) != 0) {
UNCOMP_BAIL;
}
}
if (enable_rabin_scan || enable_fixed_scan) {
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size,
algo, &props, enable_delta_encode, enable_fixed_scan, version, DECOMPRESS, 0,
algo, &props, enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0,
NULL);
if (tdat->rctx == NULL) {
UNCOMP_BAIL;
}
if (enable_rabin_global) {
if ((tdat->rctx->out_fd = open(to_filename, O_RDONLY, 0)) == -1) {
perror("Unable to get new read handle to output file");
UNCOMP_BAIL;
}
}
tdat->rctx->index_sem = &(tdat->index_sem);
} else {
tdat->rctx = NULL;
}
@ -1099,6 +1126,15 @@ start_decompress(const char *filename, const char *to_filename)
}
thread = 1;
if (enable_rabin_global) {
for (i = 0; i < nprocs; i++) {
tdat = dary[i];
tdat->rctx->index_sem_next = &(dary[(i + 1) % nprocs]->index_sem);
}
}
// When doing global dedupe first thread does not wait to start dedupe recovery.
sem_post(&(dary[0]->index_sem));
if (encrypt_type) {
/* Erase encryption key bytes stored as a plain array. No longer reqd. */
crypto_clean_pkey(&crypto_ctx);
@ -1132,6 +1168,7 @@ start_decompress(const char *filename, const char *to_filename)
sem_wait(&tdat->write_done_sem);
if (main_cancel) break;
tdat->id = chunk_num;
if (tdat->rctx) tdat->rctx->id = tdat->id;
/*
* First read length of compressed chunk.
@ -1301,7 +1338,7 @@ redo:
*/
if (!encrypt_type)
compute_checksum(tdat->checksum, cksum, tdat->cmp_seg, tdat->rbytes,
tdat->cksum_mt);
tdat->cksum_mt, 1);
rctx = tdat->rctx;
reset_dedupe_context(tdat->rctx);
@ -1318,7 +1355,7 @@ redo:
*/
if (!encrypt_type)
compute_checksum(tdat->checksum, cksum, tdat->uncompressed_chunk,
tdat->rbytes, tdat->cksum_mt);
tdat->rbytes, tdat->cksum_mt, 1);
}
/*
@ -1562,9 +1599,14 @@ do_cancel:
main_cancel = 1;
tdat->cancel = 1;
sem_post(&tdat->start_sem);
if (tdat->rctx && enable_rabin_global)
sem_post(tdat->rctx->index_sem_next);
sem_post(&tdat->write_done_sem);
return (0);
}
if (tdat->decompressing && tdat->rctx && enable_rabin_global) {
sem_post(tdat->rctx->index_sem_next);
}
sem_post(&tdat->write_done_sem);
}
goto repeat;
@ -1583,7 +1625,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
struct wdata w;
char tmpfile1[MAXPATHLEN], tmpdir[MAXPATHLEN];
char to_filename[MAXPATHLEN];
uint64_t compressed_chunksize, n_chunksize;
uint64_t compressed_chunksize, n_chunksize, file_offset;
int64_t rbytes, rabin_count;
short version, flags;
struct stat sbuf;
@ -1628,7 +1670,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED);
dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL;
if (pipe_mode) {
sbuf.st_size = SIXTEEN_GB;
return (1);
}
} else if (enable_rabin_scan) {
flags |= FLAG_DEDUP;
@ -1845,6 +1887,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
tdat->decompress = _decompress_func;
tdat->uncompressed_chunk = (uchar_t *)1;
tdat->cancel = 0;
tdat->decompressing = 0;
if (single_chunk)
tdat->cksum_mt = 1;
else
@ -1857,21 +1900,21 @@ start_compress(const char *filename, uint64_t chunksize, int level)
sem_init(&(tdat->write_done_sem), 0, 1);
sem_init(&(tdat->index_sem), 0, 0);
// i is unsigned so this wraps around backwards for i == 0
tdat->index_sem_other = &(dary[(i - 1) % nprocs]->index_sem);
if (_init_func) {
if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize,
VERSION, COMPRESS) != 0) {
COMP_BAIL;
}
}
if (enable_rabin_scan || enable_fixed_scan) {
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size,
algo, &props, enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size,
tmpdir);
if (tdat->rctx == NULL) {
COMP_BAIL;
}
tdat->rctx->index_sem = &(tdat->index_sem);
} else {
tdat->rctx = NULL;
}
@ -1890,6 +1933,12 @@ start_compress(const char *filename, uint64_t chunksize, int level)
}
thread = 1;
if (enable_rabin_global) {
for (i = 0; i < nprocs; i++) {
tdat = dary[i];
tdat->rctx->index_sem_next = &(dary[(i + 1) % nprocs]->index_sem);
}
}
// When doing global dedupe first thread does not wait to access the index.
sem_post(&(dary[0]->index_sem));
@ -2000,6 +2049,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
/*
* Read the first chunk into a spare buffer (a simple double-buffering).
*/
file_offset = 0;
if (enable_rabin_split) {
rctx = create_dedupe_context(chunksize, 0, 0, algo, &props, enable_delta_encode,
enable_fixed_scan, VERSION, COMPRESS, 0, NULL);
@ -2064,12 +2114,13 @@ start_compress(const char *filename, uint64_t chunksize, int level)
*/
tdat->id = chunk_num;
tdat->rbytes = rbytes;
if ((enable_rabin_scan || enable_fixed_scan)) {
if ((enable_rabin_scan || enable_fixed_scan || enable_rabin_global)) {
tmp = tdat->cmp_seg;
tdat->cmp_seg = cread_buf;
cread_buf = tmp;
tdat->compressed_chunk = tdat->cmp_seg + COMPRESSED_CHUNKSZ +
cksum_bytes + mac_bytes;
if (tdat->rctx) tdat->rctx->file_offset = file_offset;
/*
* If there is data after the last rabin boundary in the chunk, then
@ -2087,6 +2138,8 @@ start_compress(const char *filename, uint64_t chunksize, int level)
tdat->uncompressed_chunk = cread_buf;
cread_buf = tmp;
}
file_offset += tdat->rbytes;
if (rbytes < chunksize) {
if (rbytes < 0) {
bail = 1;
@ -2344,7 +2397,7 @@ main(int argc, char *argv[])
slab_init();
init_pcompress();
while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDEe:w:rLPS:B:Fk:")) != -1) {
while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:rLPS:B:Fk:")) != -1) {
int ovr;
switch (opt) {
@ -2409,6 +2462,10 @@ main(int argc, char *argv[])
enable_rabin_scan = 1;
break;
case 'G':
enable_rabin_global = 1;
break;
case 'E':
enable_rabin_scan = 1;
if (!enable_delta_encode)
@ -2503,6 +2560,20 @@ main(int argc, char *argv[])
exit(1);
}
/*
* Global Deduplication can use Rabin or Fixed chunking. Default, if not specified,
* is to use Rabin.
*/
if (enable_rabin_global && !enable_rabin_scan && !enable_fixed_scan) {
enable_rabin_scan = 1;
enable_rabin_split = 1;
}
if (enable_rabin_global && pipe_mode) {
fprintf(stderr, "Global Deduplication is not supported in pipe mode.\n");
exit(1);
}
if (num_rem == 0 && !pipe_mode) {
usage(); /* At least 1 filename needed. */
exit(1);

View file

@ -40,7 +40,7 @@ extern "C" {
#define CHUNK_FLAG_SZ 1
#define ALGO_SZ 8
#define MIN_CHUNK 2048
#define VERSION 7
#define VERSION 8
#define FLAG_DEDUP 1
#define FLAG_DEDUP_FIXED 2
#define FLAG_SINGLE_CHUNK 4
@ -195,11 +195,11 @@ struct cmp_data {
sem_t cmp_done_sem;
sem_t write_done_sem;
sem_t index_sem;
sem_t *index_sem_other;
void *data;
pthread_t thr;
mac_ctx_t chunk_hmac;
algo_props_t *props;
int decompressing;
};
#ifdef __cplusplus

View file

@ -105,7 +105,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
}
if (path != NULL) {
printf("Disk based index not yet implemented.\n");
fprintf(stderr, "Disk based index not yet implemented.\n");
free(cfg);
return (NULL);
} else {
@ -148,7 +148,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
return (NULL);
}
indx->memlimit = memlimit;
indx->memlimit = memlimit - (hash_entry_size << 2);
indx->list = (htab_t *)calloc(intervals, sizeof (htab_t));
indx->hash_entry_size = hash_entry_size;
indx->intervals = intervals;
@ -198,7 +198,7 @@ mycmp(uchar_t *a, uchar_t *b, int sz)
len = 0;
do {
val1 = *((size_t *)v1);
val2 = *((size_t *)v1);
val2 = *((size_t *)v2);
if (val1 != val2) {
return (1);
}
@ -211,7 +211,8 @@ mycmp(uchar_t *a, uchar_t *b, int sz)
}
/*
* Lookup and insert item if indicated. Not thread-safe by design.
* Lookup and insert item if indicated. Not thread-safe by design. Caller needs to
* ensure thread-safety.
*/
hash_entry_t *
db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
@ -221,7 +222,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
index_t *indx = (index_t *)(cfg->dbdata);
hash_entry_t **htab, *ent, **pent;
assert(cfg->similarity_cksum_sz && (sizeof (size_t) - 1) == 0);
assert((cfg->similarity_cksum_sz & (sizeof (size_t) - 1)) == 0);
htab_entry = XXH32(sim_cksum, cfg->similarity_cksum_sz, 0);
htab_entry ^= (htab_entry / cfg->similarity_cksum_sz);
htab_entry = htab_entry % indx->hash_slots;
@ -248,9 +249,8 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
}
}
if (do_insert) {
if (indx->memused + indx->hash_entry_size >= indx->memlimit - (indx->hash_entry_size << 2)) {
if (indx->memused + indx->hash_entry_size >= indx->memlimit) {
ent = htab[htab_entry];
pent = &(htab[htab_entry]);
htab[htab_entry] = htab[htab_entry]->next;
} else {
ent = (hash_entry_t *)malloc(indx->hash_entry_size);

View file

@ -360,7 +360,7 @@ set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim
cfg->archive_sz = file_sz;
cfg->dedupe_mode = MODE_SIMILARITY;
if (cfg->archive_sz <= SIXTEEN_GB) {
if (cfg->archive_sz <= SIXTEEN_GB || pct_interval == 0) {
cfg->dedupe_mode = MODE_SIMPLE;
cfg->segment_sz_bytes = user_chunk_sz;

View file

@ -68,6 +68,8 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <allocator.h>
#include <utils.h>
#include <pthread.h>
@ -133,16 +135,15 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
int file_version, compress_op_t op, uint64_t file_size, char *tmppath) {
dedupe_context_t *ctx;
uint32_t i;
archive_config_t *arc;
if (rab_blk_sz < 1 || rab_blk_sz > 5)
rab_blk_sz = RAB_BLK_DEFAULT;
if (dedupe_flag == RABIN_DEDUPE_FIXED || dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) {
delta_flag = 0;
if (dedupe_flag != RABIN_DEDUPE_FILE_GLOBAL)
inited = 1;
}
arc = NULL;
/*
* Pre-compute a table of irreducible polynomial evaluations for each
@ -173,11 +174,24 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
ir[j] = val;
}
/*
* If Global Deduplication is enabled initialize the in-memory index.
* It is essentially a hashtable that is used for crypto-hash based
* chunk matching.
*/
if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) {
my_sysinfo msys_info;
/*
* Get available free memory.
*/
get_sysinfo(&msys_info);
arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, DEFAULT_PCT_INTERVAL,
/*
* Use a maximum of approx 62% of free RAM for the index.
*/
msys_info.freeram = (msys_info.freeram >> 1) + (msys_info.freeram >> 3);
arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, 0,
algo, props->cksum, props->cksum, file_size,
msys_info.freeram, props->nthreads);
if (arc == NULL) {
@ -220,6 +234,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
ctx->rabin_poly_min_block_size = dedupe_min_blksz(rab_blk_sz);
ctx->delta_flag = 0;
ctx->deltac_min_distance = props->deltac_min_distance;
ctx->pagesize = sysconf(_SC_PAGE_SIZE);
/*
* Scale down similarity percentage based on avg block size unless user specified
@ -256,12 +271,12 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
ctx->current_window_data = (uchar_t *)1;
#endif
ctx->blocks = NULL;
if (real_chunksize > 0) {
if (real_chunksize > 0 && dedupe_flag != RABIN_DEDUPE_FILE_GLOBAL) {
ctx->blocks = (rabin_blockentry_t **)slab_calloc(NULL,
ctx->blknum, sizeof (rabin_blockentry_t *));
}
if(ctx == NULL || ctx->current_window_data == NULL ||
(ctx->blocks == NULL && real_chunksize > 0)) {
(ctx->blocks == NULL && real_chunksize > 0 && dedupe_flag != RABIN_DEDUPE_FILE_GLOBAL)) {
fprintf(stderr,
"Could not allocate rabin polynomial context, out of memory\n");
destroy_dedupe_context(ctx);
@ -383,6 +398,15 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of
*/
ary_sz = ctx->rabin_poly_max_block_size;
ctx_heap = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz);
/*
* If global dedupe is active, the global blocks array uses temp space in
* the target buffer.
*/
if (ctx->arc != NULL) {
ary_sz += (sizeof (global_blockentry_t) * (*size / ctx->rabin_poly_min_block_size + 1));
ctx->g_blocks = (global_blockentry_t *)(ctx->cbuf + ctx->real_chunksize - ary_sz);
}
}
#ifndef SSE_MODE
memset(ctx->current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE);
@ -493,12 +517,18 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of
cur_pos_checksum = cur_roll_checksum ^ ir[pushed_out];
if ((cur_pos_checksum & ctx->rabin_avg_block_mask) == ctx->rabin_break_patt ||
length >= ctx->rabin_poly_max_block_size) {
if (!(ctx->arc)) {
if (ctx->blocks[blknum] == 0)
ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL,
sizeof (rabin_blockentry_t));
ctx->blocks[blknum]->offset = last_offset;
ctx->blocks[blknum]->index = blknum; // Need to store for sorting
ctx->blocks[blknum]->length = length;
} else {
ctx->g_blocks[blknum].length = length;
ctx->g_blocks[blknum].offset = last_offset;
}
DEBUG_STAT_EN(if (length >= ctx->rabin_poly_max_block_size) ++max_count);
/*
@ -510,7 +540,7 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of
* Once block contents are arranged in a min heap we compute the K min values
* sketch by hashing over the heap till K%. We interpret the raw bytes as a
* sequence of 64-bit integers.
* This is called minhashing and is used widely, for example in various
* This is variant of minhashing which is used widely, for example in various
* search engines to detect similar documents.
*/
if (ctx->delta_flag) {
@ -537,13 +567,18 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of
// Insert the last left-over trailing bytes, if any, into a block.
if (last_offset < *size) {
length = *size - last_offset;
if (!(ctx->arc)) {
if (ctx->blocks[blknum] == 0)
ctx->blocks[blknum] = (rabin_blockentry_t *)slab_alloc(NULL,
sizeof (rabin_blockentry_t));
ctx->blocks[blknum]->offset = last_offset;
ctx->blocks[blknum]->index = blknum;
length = *size - last_offset;
ctx->blocks[blknum]->length = length;
} else {
ctx->g_blocks[blknum].length = length;
ctx->g_blocks[blknum].offset = last_offset;
}
if (ctx->delta_flag) {
uint64_t cur_sketch;
@ -576,19 +611,161 @@ process_blocks:
DEBUG_STAT_EN(fprintf(stderr, "Original size: %" PRId64 ", blknum: %u\n", *size, blknum));
DEBUG_STAT_EN(fprintf(stderr, "Number of maxlen blocks: %u\n", max_count));
if (blknum > 2) {
uint64_t pos, matchlen, pos1;
uint64_t pos, matchlen, pos1 = 0;
int valid = 1;
uint32_t *dedupe_index;
uint64_t dedupe_index_sz;
uint64_t dedupe_index_sz = 0;
rabin_blockentry_t *be;
DEBUG_STAT_EN(uint32_t delta_calls, delta_fails, merge_count, hash_collisions);
DEBUG_STAT_EN(delta_calls = 0);
DEBUG_STAT_EN(delta_fails = 0);
DEBUG_STAT_EN(hash_collisions = 0);
ary_sz = (blknum << 1) * sizeof (rabin_blockentry_t *);
htab = (rabin_blockentry_t **)(ctx->cbuf + ctx->real_chunksize - ary_sz);
memset(htab, 0, ary_sz);
/*
* If global dedupe is enabled then process it here.
*/
if (ctx->arc) {
if (ctx->arc->dedupe_mode == MODE_SIMPLE) {
uchar_t *g_dedupe_idx, *tgt, *src;
/*
* First compute all the rabin chunk/block cryptographic hashes.
*/
#if defined(_OPENMP)
# pragma omp parallel for if (mt)
#endif
for (i=0; i<blknum; i++) {
compute_checksum(ctx->g_blocks[i].cksum,
ctx->arc->chunk_cksum_type, buf1+ctx->g_blocks[i].offset,
ctx->g_blocks[i].length, 0, 0);
}
/*
* Index table within this segment.
*/
g_dedupe_idx = ctx->cbuf + RABIN_HDR_SIZE;
dedupe_index_sz = 0;
/*
* First entry in table in the original file offset where this
* data segment begins.
*/
*((uint64_t *)g_dedupe_idx) = LE64(ctx->file_offset);
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
dedupe_index_sz += 2;
length = 0;
matchlen = 0;
/*
* Now lookup blocks in index. First wait for our semaphore to be
* signaled. If the previous thread in sequence is using the index
* he will finish and then signal our semaphore. So we can have
* predictable serialization of index access in a sequence of
* threads without locking.
*/
sem_wait(ctx->index_sem);
for (i=0; i<blknum; i++) {
hash_entry_t *he;
he = db_lookup_insert_s(ctx->arc, ctx->g_blocks[i].cksum, 0,
ctx->file_offset + ctx->g_blocks[i].offset,
ctx->g_blocks[i].length, 1);
if (!he) {
/*
* Block match in index not found.
* Block was added to index. Merge this block.
*/
if (length + ctx->g_blocks[i].length > RABIN_MAX_BLOCK_SIZE) {
*((uint32_t *)g_dedupe_idx) = LE32(length);
g_dedupe_idx += RABIN_ENTRY_SIZE;
length = 0;
dedupe_index_sz++;
}
length += ctx->g_blocks[i].length;
} else {
/*
* Block match in index was found.
*/
if (length > 0) {
/*
* Write pending accumulated block length value.
*/
*((uint32_t *)g_dedupe_idx) = LE32(length);
g_dedupe_idx += RABIN_ENTRY_SIZE;
length = 0;
dedupe_index_sz++;
}
/*
* Add a reference entry to the dedupe array.
*/
*((uint32_t *)g_dedupe_idx) = LE32((he->item_size | RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG);
g_dedupe_idx += RABIN_ENTRY_SIZE;
*((uint64_t *)g_dedupe_idx) = LE64(he->item_offset);
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
matchlen += he->item_size;
dedupe_index_sz += 3;
}
}
/*
* Signal the next thread in sequence to access the index.
*/
sem_post(ctx->index_sem_next);
/*
* Write final pending block length value (if any).
*/
if (length > 0) {
*((uint32_t *)g_dedupe_idx) = LE32(length);
g_dedupe_idx += RABIN_ENTRY_SIZE;
length = 0;
dedupe_index_sz++;
}
blknum = dedupe_index_sz; // Number of entries in block list
tgt = g_dedupe_idx;
g_dedupe_idx = ctx->cbuf + RABIN_HDR_SIZE;
dedupe_index_sz = tgt - g_dedupe_idx;
src = buf1;
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
/*
* Deduplication reduction should at least be greater than block list metadata.
*/
if (matchlen < dedupe_index_sz) {
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n",
get_mb_s(*size, strt, en_1), get_mb_s(*size, strt, en)));
DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n"));
ctx->valid = 0;
return (0);
}
/*
* Now copy the block data;
*/
for (i=0; i<blknum-2;) {
length = LE32(*((uint32_t *)g_dedupe_idx));
g_dedupe_idx += RABIN_ENTRY_SIZE;
++i;
j = length & RABIN_INDEX_FLAG;
length = length & RABIN_INDEX_VALUE;
if (!j) {
memcpy(tgt, src, length);
tgt += length;
src += length;
} else {
src += length;
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
i += 2;
}
}
pos1 = tgt - ctx->cbuf;
blknum |= GLOBAL_FLAG;
}
goto dedupe_done;
}
/*
* Compute hash signature for each block. We do this in a separate loop to
@ -613,6 +790,10 @@ process_blocks:
}
}
ary_sz = (blknum << 1) * sizeof (rabin_blockentry_t *);
htab = (rabin_blockentry_t **)(ctx->cbuf + ctx->real_chunksize - ary_sz);
memset(htab, 0, ary_sz);
/*
* Perform hash-matching of blocks and use a bucket-chained hashtable to match
* for duplicates and similar blocks. Unique blocks are inserted and duplicates
@ -792,11 +973,11 @@ process_blocks:
}
}
dedupe_done:
if (valid) {
uchar_t *cbuf = ctx->cbuf;
uint64_t *entries;
DEBUG_STAT_EN(uint64_t sz);
DEBUG_STAT_EN(sz = *size);
*((uint32_t *)cbuf) = htonl(blknum);
cbuf += sizeof (uint32_t);
@ -808,7 +989,7 @@ process_blocks:
ctx->valid = 1;
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Deduped size: %" PRId64 ", blknum: %u, delta_calls: %u, delta_fails: %u\n",
*size, blknum, delta_calls, delta_fails));
*size, (unsigned int)(blknum & CLEAR_GLOBAL_FLAG), delta_calls, delta_fails));
DEBUG_STAT_EN(fprintf(stderr, "Chunking speed %.3f MB/s, Overall Dedupe speed %.3f MB/s\n",
get_mb_s(sz, strt, en_1), get_mb_s(sz, strt, en)));
/*
@ -844,7 +1025,7 @@ parse_dedupe_hdr(uchar_t *buf, uint32_t *blknum, uint64_t *dedupe_index_sz,
entries = (uint64_t *)buf;
*dedupe_data_sz = ntohll(entries[0]);
*dedupe_index_sz = (uint64_t)(*blknum) * RABIN_ENTRY_SIZE;
*dedupe_index_sz = (uint64_t)(*blknum & CLEAR_GLOBAL_FLAG) * RABIN_ENTRY_SIZE;
*dedupe_index_sz_cmp = ntohll(entries[1]);
*deduped_size = ntohll(entries[2]);
*dedupe_data_sz_cmp = ntohll(entries[3]);
@ -866,6 +1047,79 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size)
sz = 0;
ctx->valid = 1;
/*
* Handling for Global Deduplication.
*/
if (blknum & GLOBAL_FLAG) {
uchar_t *g_dedupe_idx, *src1, *src2;
uint64_t adj, offset;
uint32_t flag;
blknum &= CLEAR_GLOBAL_FLAG;
g_dedupe_idx = buf + RABIN_HDR_SIZE;
offset = LE64(*((uint64_t *)g_dedupe_idx));
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
blknum -= 2;
src1 = buf + RABIN_HDR_SIZE + dedupe_index_sz;
sem_wait(ctx->index_sem);
for (blk=0; blk<blknum;) {
len = LE32(*((uint32_t *)g_dedupe_idx));
g_dedupe_idx += RABIN_ENTRY_SIZE;
++blk;
flag = len & GLOBAL_FLAG;
len &= RABIN_INDEX_VALUE;
if (sz + len > data_sz) {
fprintf(stderr, "Dedup data overflows chunk.\n");
ctx->valid = 0;
break;
}
if (flag == 0) {
memcpy(pos2, src1, len);
pos2 += len;
src1 += len;
sz += len;
} else {
pos1 = LE64(*((uint64_t *)g_dedupe_idx));
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
blk += 2;
/*
* Handling of chunk references at duplicate chunks.
*
* If required data offset is greater than the current segment's starting
* offset then the referenced chunk is already in the current segment in
* RAM. Just mem-copy it.
* Otherwise it will be in the current output file. We mmap() the relevant
* region and copy it. The way deduplication is done it is guaranteed that
* all duplicate reference will be backward references so this approach works.
*
* However this approach precludes pipe-mode streamed decompression since
* it requires random access to the output file.
*/
if (pos1 > offset) {
src2 = ctx->cbuf + (pos1 - offset);
memcpy(pos2, src2, len);
} else {
adj = pos1 % ctx->pagesize;
src2 = mmap(NULL, len + adj, PROT_READ, MAP_SHARED, ctx->out_fd, pos1 - adj);
if (src2 == NULL) {
perror("MMAP failed ");
ctx->valid = 0;
break;
}
memcpy(pos2, src2 + adj, len);
munmap(src2, len + adj);
}
pos2 += len;
sz += len;
}
}
*size = data_sz;
return;
}
slab_cache_add(sizeof (rabin_blockentry_t));
for (blk = 0; blk < blknum; blk++) {
if (ctx->blocks[blk] == 0)
@ -944,13 +1198,3 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size)
}
*size = data_sz;
}
/*
* TODO: Consolidate rabin dedup and compression/decompression in functions here rather than
* messy code in main program.
int
rabin_compress(dedupe_context_t *ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *tolen,
int level, char chdr, void *data, compress_func_ptr cmp)
{
}
*/

View file

@ -65,6 +65,9 @@
#include "utils.h"
#include <db.h>
#include <crypto_utils.h>
#include <pthread.h>
#include <semaphore.h>
//List of constants, mostly constraints and defaults for various parameters
//to the Rabin Fingerprinting algorithm
@ -118,6 +121,8 @@
#define SET_SIMILARITY_FLAG (0x40000000UL)
#define GET_SIMILARITY_FLAG SET_SIMILARITY_FLAG
#define CLEAR_SIMILARITY_FLAG (0xBFFFFFFFUL)
#define GLOBAL_FLAG RABIN_INDEX_FLAG
#define CLEAR_GLOBAL_FLAG (0x7fffffffUL)
#define RABIN_DEDUPE_SEGMENTED 0
#define RABIN_DEDUPE_FIXED 1
@ -160,13 +165,15 @@ typedef struct rab_blockentry {
} rabin_blockentry_t;
typedef struct global_blockentry {
uint64_t offset;
uint32_t length;
uint64_t offset;
uchar_t cksum[CKSUM_MAX_BYTES];
} global_blockentry_t;
typedef struct {
unsigned char *current_window_data;
rabin_blockentry_t **blocks;
global_blockentry_t *g_blocks;
uint32_t blknum;
unsigned char *cbuf;
uint32_t rabin_poly_max_block_size;
@ -178,11 +185,17 @@ typedef struct {
short valid;
void *lzma_data;
int level, delta_flag, dedupe_flag, deltac_min_distance;
uint64_t file_offset; // For global dedupe
archive_config_t *arc;
sem_t *index_sem;
sem_t *index_sem_next;
uint32_t pagesize;
int out_fd;
int id;
} dedupe_context_t;
extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize,
int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int fixed_flag,
int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op, uint64_t file_size, char *tmppath);
extern void destroy_dedupe_context(dedupe_context_t *ctx);
extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf,