Work in progress global dedupe changes.

This commit is contained in:
Moinak Ghosh 2013-03-20 22:47:03 +05:30
parent f2806d4ffa
commit b7fdeb08bc
9 changed files with 126 additions and 64 deletions

View file

@ -30,6 +30,8 @@
#include <sys/types.h>
#include <stdint.h>
#include <utils.h>
#ifdef __cplusplus
extern "C" {
#endif
@ -58,27 +60,6 @@ extern "C" {
#define MAX_NONCE 32
#define KECCAK_MAX_SEG (2305843009213693950ULL)
/*
* Public checksum properties. CKSUM_MAX_BYTES must be updated if a
* newer larger checksum is added to the list.
*/
typedef enum {
CKSUM_CRC64 = 0x100,
CKSUM_BLAKE256 = 0x200,
CKSUM_BLAKE512 = 0x300,
CKSUM_SHA256 = 0x400,
CKSUM_SHA512 = 0x500,
CKSUM_KECCAK256 = 0x600,
CKSUM_KECCAK512 = 0x700,
/*
* Backwards compatibility options. SKEIN in release 1.2 was replaced with
* Blake2 from 1.3 onwards (for sheer speed of Blake2). We want to be able
* to decode archives created with 1.2. New archives do not use SKEIN.
*/
CKSUM_SKEIN256 = 0x800,
CKSUM_SKEIN512 = 0x900,
CKSUM_INVALID = 0
} cksum_t;
typedef struct {
void *crypto_ctx;

28
main.c
View file

@ -87,6 +87,7 @@ static int nthreads = 0;
static int hide_mem_stats = 1;
static int hide_cmp_stats = 1;
static int enable_rabin_scan = 0;
static int enable_rabin_global = 0;
static int enable_delta_encode = 0;
static int enable_delta2_encode = 0;
static int enable_rabin_split = 1;
@ -770,6 +771,15 @@ start_decompress(const char *filename, const char *to_filename)
if (flags & FLAG_DEDUP) {
enable_rabin_scan = 1;
if (flags & FLAG_DEDUP_FIXED) {
if (version > 7) {
enable_rabin_global = 1;
} else {
fprintf(stderr, "Invalid file deduplication flags.\n");
err = 1;
goto uncomp_done;
}
}
} else if (flags & FLAG_DEDUP_FIXED) {
enable_fixed_scan = 1;
}
@ -1577,8 +1587,8 @@ start_compress(const char *filename, uint64_t chunksize, int level)
short version, flags;
struct stat sbuf;
int compfd = -1, uncompfd = -1, err;
int i, thread, bail, single_chunk;
int nprocs, np, p;
int thread, bail, single_chunk;
uint32_t i, nprocs, np, p;
struct cmp_data **dary = NULL, *tdat;
pthread_t writer_thr;
uchar_t *cread_buf, *pos;
@ -1599,6 +1609,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
*/
compressed_chunksize = chunksize + CHUNK_HDR_SZ + zlib_buf_extra(chunksize);
init_algo_props(&props);
props.cksum = cksum;
cread_buf = NULL;
if (_props_func) {
@ -1610,8 +1621,10 @@ start_compress(const char *filename, uint64_t chunksize, int level)
}
flags = 0;
if (enable_rabin_scan || enable_fixed_scan) {
if (enable_rabin_scan)
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
if (enable_rabin_global)
flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED);
else if (enable_rabin_scan)
flags |= FLAG_DEDUP;
else
flags |= FLAG_DEDUP_FIXED;
@ -1801,6 +1814,10 @@ start_compress(const char *filename, uint64_t chunksize, int level)
sem_init(&(tdat->start_sem), 0, 0);
sem_init(&(tdat->cmp_done_sem), 0, 0);
sem_init(&(tdat->write_done_sem), 0, 1);
sem_init(&(tdat->index_sem), 0, 0);
// i is unsigned so this wraps around backwards for i == 0
tdat->index_sem_other = &(dary[(i - 1) % nprocs]->index_sem);
if (_init_func) {
if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, VERSION, COMPRESS) != 0) {
COMP_BAIL;
@ -1830,6 +1847,9 @@ start_compress(const char *filename, uint64_t chunksize, int level)
}
thread = 1;
// When doing global dedupe first thread does not wait to access the index.
sem_post(&(dary[0]->index_sem));
w.dary = dary;
w.wfd = compfd;
w.nprocs = nprocs;

View file

@ -53,7 +53,7 @@ extern "C" {
#define CHSIZE_MASK 0x80
#define BZIP2_A_NUM 16
#define LZMA_A_NUM 32
#define CHUNK_FLAG_DEDUP 2
#define CHUNK_FLAG_DEDUP 2
#define CHUNK_FLAG_PREPROC 4
#define COMP_EXTN ".pz"
@ -194,6 +194,8 @@ struct cmp_data {
sem_t start_sem;
sem_t cmp_done_sem;
sem_t write_done_sem;
sem_t index_sem;
sem_t *index_sem_other;
void *data;
pthread_t thr;
mac_ctx_t chunk_hmac;

View file

@ -42,7 +42,8 @@
* Hashtable structures for in-memory index.
*/
typedef struct _hash_entry {
uint64_t seg_offset;
uint64_t item_offset;
uint32_t item_size;
struct _hash_entry *next;
uchar_t cksum[1];
} hash_entry_t;
@ -53,7 +54,6 @@ typedef struct {
typedef struct {
htab_t *list;
pthread_mutex_t *mlist;
uint64_t memlimit;
uint64_t memused;
int hash_entry_size, intervals, hash_slots;
@ -91,8 +91,6 @@ static cleanup_indx(index_t *indx)
}
free(indx->list);
}
if (indx->mlist)
free(indx->mlist);
free(indx);
}
}
@ -153,7 +151,6 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
indx->memlimit = memlimit;
indx->list = (htab_t *)calloc(intervals, sizeof (htab_t));
indx->mlist = (pthread_mutex_t *)malloc(intervals * sizeof (pthread_mutex_t));
indx->hash_entry_size = hash_entry_size;
indx->intervals = intervals;
indx->hash_slots = hash_slots / intervals;
@ -167,23 +164,24 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
return (NULL);
}
indx->memused += ((hash_slots / intervals) * (sizeof (hash_entry_t *)));
pthread_mutex_init(&(indx->mlist[i]), NULL);
}
strcpy(cfg->rootdir, tmppath);
strcat(cfg->rootdir, "/.segXXXXXX");
cfg->seg_fd_w = mkstemp(cfg->rootdir);
cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads);
if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) {
cleanup_indx(indx);
if (cfg->seg_fd_r)
free(cfg->seg_fd_r);
free(cfg);
return (NULL);
}
if (pct_interval > 0) {
strcpy(cfg->rootdir, tmppath);
strcat(cfg->rootdir, "/.segXXXXXX");
cfg->seg_fd_w = mkstemp(cfg->rootdir);
cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads);
if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) {
cleanup_indx(indx);
if (cfg->seg_fd_r)
free(cfg->seg_fd_r);
free(cfg);
return (NULL);
}
for (i = 0; i < nthreads; i++) {
cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY);
for (i = 0; i < nthreads; i++) {
cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY);
}
}
cfg->dbdata = indx;
}
@ -213,9 +211,12 @@ mycmp(uchar_t *a, uchar_t *b, int sz)
return (0);
}
/*
* Lookup and insert item if indicated. Not thread-safe by design.
*/
uint64_t
db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
uint64_t seg_offset, int do_insert)
uint64_t item_offset, uint32_t item_size, int do_insert)
{
uint32_t htab_entry;
index_t *indx = (index_t *)(cfg->dbdata);
@ -228,17 +229,24 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
htab = indx->list[interval].tab;
pent = &(htab[htab_entry]);
pthread_mutex_lock(&(indx->mlist[interval]));
ent = htab[htab_entry];
while (ent) {
if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0) {
uint64_t off;
off = ent->seg_offset;
pthread_mutex_unlock(&(indx->mlist[interval]));
return (off+1);
if (cfg->pct_interval == 0) { // Single file global dedupe case
while (ent) {
if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0 &&
ent->item_size == item_size) {
return (ent->item_offset);
}
pent = &(ent->next);
ent = ent->next;
}
} else {
while (ent) {
if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0) {
return (ent->item_offset);
}
pent = &(ent->next);
ent = ent->next;
}
pent = &(ent->next);
ent = ent->next;
}
if (do_insert) {
if (indx->memused + indx->hash_entry_size >= indx->memlimit - (indx->hash_entry_size << 2)) {
@ -249,11 +257,11 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
ent = (hash_entry_t *)malloc(indx->hash_entry_size);
indx->memused += indx->hash_entry_size;
}
ent->seg_offset = seg_offset;
ent->item_offset = item_offset;
ent->item_size = item_size;
ent->next = 0;
memcpy(ent->cksum, sim_cksum, cfg->similarity_cksum_sz);
*pent = ent;
}
pthread_mutex_unlock(&(indx->mlist[interval]));
return (0);
}

View file

@ -37,7 +37,7 @@ archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize
cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit,
int nthreads);
uint64_t db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
uint64_t seg_offset, int do_insert);
uint64_t item_offset, uint32_t item_size, int do_insert);
#ifdef __cplusplus
}

View file

@ -128,7 +128,7 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta
*/
dedupe_context_t *
create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz,
const char *algo, const algo_props_t *props, int delta_flag, int fixed_flag,
const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op) {
dedupe_context_t *ctx;
uint32_t i;
@ -136,7 +136,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
if (rab_blk_sz < 1 || rab_blk_sz > 5)
rab_blk_sz = RAB_BLK_DEFAULT;
if (fixed_flag) {
if (dedupe_flag == RABIN_DEDUPE_FIXED || dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL) {
delta_flag = 0;
inited = 1;
}
@ -196,7 +196,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
ctx->rabin_poly_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE;
ctx->current_window_data = NULL;
ctx->fixed_flag = fixed_flag;
ctx->dedupe_flag = dedupe_flag;
ctx->rabin_break_patt = 0;
ctx->rabin_poly_avg_block_size = RAB_BLK_AVG_SZ(rab_blk_sz);
ctx->rabin_avg_block_mask = RAB_BLK_MASK;
@ -220,7 +220,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
ctx->delta_flag = 2;
}
if (!fixed_flag)
if (dedupe_flag != RABIN_DEDUPE_FIXED)
ctx->blknum = chunksize / ctx->rabin_poly_min_block_size;
else
ctx->blknum = chunksize / ctx->rabin_poly_avg_block_size;
@ -330,7 +330,7 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of
if (*size < ctx->rabin_poly_avg_block_size) return (0);
DEBUG_STAT_EN(strt = get_wtime_millis());
if (ctx->fixed_flag) {
if (ctx->dedupe_flag == RABIN_DEDUPE_FIXED) {
blknum = *size / ctx->rabin_poly_avg_block_size;
j = *size % ctx->rabin_poly_avg_block_size;
if (j)

View file

@ -118,6 +118,10 @@
#define GET_SIMILARITY_FLAG SET_SIMILARITY_FLAG
#define CLEAR_SIMILARITY_FLAG (0xBFFFFFFFUL)
#define RABIN_DEDUPE_SEGMENTED 0
#define RABIN_DEDUPE_FIXED 1
#define RABIN_DEDUPE_FILE_GLOBAL 2
// Mask to extract value from a rabin index entry
#define RABIN_INDEX_VALUE (0x3FFFFFFFUL)
@ -171,7 +175,7 @@ typedef struct {
uint64_t real_chunksize;
short valid;
void *lzma_data;
int level, delta_flag, fixed_flag, deltac_min_distance;
int level, delta_flag, dedupe_flag, deltac_min_distance;
} dedupe_context_t;
extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize,

View file

@ -41,6 +41,8 @@
#include <cpuid.h>
#include <xxhash.h>
#include <sys/sysinfo.h>
#define _IN_UTILS_
#include "utils.h"
@ -367,3 +369,16 @@ get_mb_s(uint64_t bytes, double strt, double en)
bytes_sec = ((double)bytes / (en - strt)) * 1000;
return (BYTES_TO_MB(bytes_sec));
}
void
get_sysinfo(my_sysinfo *msys_info)
{
struct sysinfo sys_info;
sysinfo(&sys_info);
msys_info->totalram = sys_info.totalram;
msys_info->freeram = sys_info.freeram;
msys_info->totalswap = sys_info.totalswap;
msys_info->freeswap = sys_info.freeswap;
msys_info->mem_unit = sys_info.mem_unit;
}

View file

@ -134,6 +134,28 @@ typedef int32_t bsize_t;
#define BYTES_TO_MB(x) ((x) / (1024 * 1024))
/*
* Public checksum properties. CKSUM_MAX_BYTES must be updated if a
* newer larger checksum is added to the list.
*/
typedef enum {
CKSUM_CRC64 = 0x100,
CKSUM_BLAKE256 = 0x200,
CKSUM_BLAKE512 = 0x300,
CKSUM_SHA256 = 0x400,
CKSUM_SHA512 = 0x500,
CKSUM_KECCAK256 = 0x600,
CKSUM_KECCAK512 = 0x700,
/*
* Backwards compatibility options. SKEIN in release 1.2 was replaced with
* Blake2 from 1.3 onwards (for sheer speed of Blake2). We want to be able
* to decode archives created with 1.2. New archives do not use SKEIN.
*/
CKSUM_SKEIN256 = 0x800,
CKSUM_SKEIN512 = 0x900,
CKSUM_INVALID = 0
} cksum_t;
typedef enum {
COMPRESS_NONE = 0,
COMPRESS_LZFX,
@ -155,6 +177,7 @@ typedef struct {
int d_max_threads;
int delta2_span;
int deltac_min_distance;
cksum_t cksum;
} algo_props_t;
typedef enum {
@ -162,6 +185,14 @@ typedef enum {
DECOMPRESS_THREADS
} algo_threads_type_t;
typedef struct{
int64_t totalram;
int64_t freeram;
int64_t totalswap;
int64_t freeswap;
int64_t mem_unit;
} my_sysinfo;
#ifndef _IN_UTILS_
extern processor_info_t proc_info;
#endif
@ -179,6 +210,7 @@ extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs,
extern uint64_t get_total_ram();
extern double get_wtime_millis(void);
extern double get_mb_s(uint64_t bytes, double strt, double en);
extern void get_sysinfo(my_sysinfo *msys_info);
extern void init_algo_props(algo_props_t *props);
extern void init_pcompress();