Work in progress changes for global dedupe.

This commit is contained in:
Moinak Ghosh 2013-03-21 22:00:38 +05:30
parent b7fdeb08bc
commit 876796be5c
9 changed files with 119 additions and 41 deletions

61
main.c
View file

@ -1076,7 +1076,8 @@ start_decompress(const char *filename, const char *to_filename)
}
if (enable_rabin_scan || enable_fixed_scan) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size,
algo, &props, enable_delta_encode, enable_fixed_scan, version, DECOMPRESS);
algo, &props, enable_delta_encode, enable_fixed_scan, version, DECOMPRESS, 0,
NULL);
if (tdat->rctx == NULL) {
UNCOMP_BAIL;
}
@ -1580,7 +1581,7 @@ static int
start_compress(const char *filename, uint64_t chunksize, int level)
{
struct wdata w;
char tmpfile1[MAXPATHLEN];
char tmpfile1[MAXPATHLEN], tmpdir[MAXPATHLEN];
char to_filename[MAXPATHLEN];
uint64_t compressed_chunksize, n_chunksize;
int64_t rbytes, rabin_count;
@ -1588,7 +1589,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
struct stat sbuf;
int compfd = -1, uncompfd = -1, err;
int thread, bail, single_chunk;
uint32_t i, nprocs, np, p;
uint32_t i, nprocs, np, p, dedupe_flag;
struct cmp_data **dary = NULL, *tdat;
pthread_t writer_thr;
uchar_t *cread_buf, *pos;
@ -1621,13 +1622,21 @@ start_compress(const char *filename, uint64_t chunksize, int level)
}
flags = 0;
dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler
if (enable_rabin_scan || enable_fixed_scan || enable_rabin_global) {
if (enable_rabin_global)
if (enable_rabin_global) {
flags |= (FLAG_DEDUP | FLAG_DEDUP_FIXED);
else if (enable_rabin_scan)
dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL;
if (pipe_mode) {
sbuf.st_size = SIXTEEN_GB;
}
} else if (enable_rabin_scan) {
flags |= FLAG_DEDUP;
else
dedupe_flag = RABIN_DEDUPE_SEGMENTED;
} else {
flags |= FLAG_DEDUP_FIXED;
dedupe_flag = RABIN_DEDUPE_FIXED;
}
/* Additional scratch space for dedup arrays. */
if (chunksize + dedupe_buf_extra(chunksize, 0, algo, enable_delta_encode)
> compressed_chunksize) {
@ -1747,6 +1756,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
*/
strcpy(tmpfile1, filename);
strcpy(tmpfile1, dirname(tmpfile1));
strcpy(tmpdir, tmpfile1);
strcat(tmpfile1, "/.pcompXXXXXX");
snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename);
if ((compfd = mkstemp(tmpfile1)) == -1) {
@ -1757,6 +1767,9 @@ start_compress(const char *filename, uint64_t chunksize, int level)
signal(SIGINT, Int_Handler);
signal(SIGTERM, Int_Handler);
} else {
char *tmp;
struct stat st;
/*
* Use stdin/stdout for pipe mode.
*/
@ -1770,6 +1783,34 @@ start_compress(const char *filename, uint64_t chunksize, int level)
perror("fileno ");
COMP_BAIL;
}
/*
* Get a workable temporary dir. Required if global dedupe is enabled.
*/
tmp = getenv("TMPDIR");
if (tmp == NULL) {
tmp = getenv("HOME");
if (tmp == NULL) {
if (getcwd(tmpdir, MAXPATHLEN) == NULL) {
tmp = "/tmp";
} else {
tmp = tmpdir;
}
}
}
if (stat(tmp, &st) == -1) {
fprintf(stderr, "Unable to find writable temporary dir.\n");
COMP_BAIL;
}
if (!S_ISDIR(st.st_mode)) {
if (strcmp(tmp, "/tmp") != 0) {
tmp = "/tmp";
} else {
fprintf(stderr, "Unable to find writable temporary dir.\n");
COMP_BAIL;
}
}
strcpy(tmpdir, tmp);
}
if (encrypt_type)
@ -1819,13 +1860,15 @@ start_compress(const char *filename, uint64_t chunksize, int level)
// i is unsigned so this wraps around backwards for i == 0
tdat->index_sem_other = &(dary[(i - 1) % nprocs]->index_sem);
if (_init_func) {
if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, VERSION, COMPRESS) != 0) {
if (_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize,
VERSION, COMPRESS) != 0) {
COMP_BAIL;
}
}
if (enable_rabin_scan || enable_fixed_scan) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, rab_blk_size,
algo, &props, enable_delta_encode, enable_fixed_scan, VERSION, COMPRESS);
algo, &props, enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size,
tmpdir);
if (tdat->rctx == NULL) {
COMP_BAIL;
}
@ -1959,7 +2002,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
*/
if (enable_rabin_split) {
rctx = create_dedupe_context(chunksize, 0, 0, algo, &props, enable_delta_encode,
enable_fixed_scan, VERSION, COMPRESS);
enable_fixed_scan, VERSION, COMPRESS, 0, NULL);
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx);
} else {
rbytes = Read(uncompfd, cread_buf, chunksize);

View file

@ -41,13 +41,6 @@
/*
* Hashtable structures for in-memory index.
*/
typedef struct _hash_entry {
uint64_t item_offset;
uint32_t item_size;
struct _hash_entry *next;
uchar_t cksum[1];
} hash_entry_t;
typedef struct {
hash_entry_t **tab;
} htab_t;
@ -97,7 +90,7 @@ static cleanup_indx(index_t *indx)
archive_config_t *
init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_chunk_sz,
int pct_interval, compress_algo_t algo, cksum_t ck, cksum_t ck_sim,
int pct_interval, const char *algo, cksum_t ck, cksum_t ck_sim,
size_t file_sz, size_t memlimit, int nthreads)
{
archive_config_t *cfg;
@ -107,6 +100,10 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
cfg = calloc(1, sizeof (archive_config_t));
rv = set_config_s(cfg, algo, ck, ck_sim, chunksize, file_sz, user_chunk_sz, pct_interval);
if (cfg->dedupe_mode == MODE_SIMPLE) {
pct_interval = 0;
}
if (path != NULL) {
printf("Disk based index not yet implemented.\n");
free(cfg);
@ -118,12 +115,14 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
index_t *indx;
// Compute total hashtable entries first
if (pct_interval == 0)
if (pct_interval == 0) {
intervals = 1;
else
hash_slots = file_sz / cfg->chunk_sz_bytes + 1;
} else {
intervals = 100 / pct_interval - 1;
hash_slots = file_sz / cfg->segment_sz_bytes + 1;
hash_slots *= intervals;
hash_slots = file_sz / cfg->segment_sz_bytes + 1;
hash_slots *= intervals;
}
hash_entry_size = sizeof (hash_entry_t) + cfg->similarity_cksum_sz - 1;
// Compute memory required to hold all hash entries assuming worst case 50%
@ -214,7 +213,7 @@ mycmp(uchar_t *a, uchar_t *b, int sz)
/*
* Lookup and insert item if indicated. Not thread-safe by design.
*/
uint64_t
hash_entry_t *
db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
uint64_t item_offset, uint32_t item_size, int do_insert)
{
@ -234,7 +233,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
while (ent) {
if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0 &&
ent->item_size == item_size) {
return (ent->item_offset);
return (ent);
}
pent = &(ent->next);
ent = ent->next;
@ -242,7 +241,7 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
} else {
while (ent) {
if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0) {
return (ent->item_offset);
return (ent);
}
pent = &(ent->next);
ent = ent->next;
@ -263,5 +262,5 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
memcpy(ent->cksum, sim_cksum, cfg->similarity_cksum_sz);
*pent = ent;
}
return (0);
return (NULL);
}

View file

@ -31,12 +31,22 @@
extern "C" {
#endif
/*
* Publically visible In-memory hashtable entry.
*/
typedef struct _hash_entry {
uint64_t item_offset;
uint32_t item_size;
struct _hash_entry *next;
uchar_t cksum[1];
} hash_entry_t;
archive_config_t *init_global_db(char *configfile);
archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize,
uint64_t user_chunk_sz, int pct_interval, compress_algo_t algo,
uint64_t user_chunk_sz, int pct_interval, const char *algo,
cksum_t ck, cksum_t ck_sim, size_t file_sz, size_t memlimit,
int nthreads);
uint64_t db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
uint64_t item_offset, uint32_t item_size, int do_insert);
#ifdef __cplusplus

View file

@ -38,12 +38,6 @@
#include "dedupe_config.h"
#include "db.h"
#define ONE_PB (1125899906842624ULL)
#define ONE_TB (1099511627776ULL)
#define FOUR_MB (4194304ULL)
#define EIGHT_MB (8388608ULL)
#define EIGHT_GB (8589934592ULL)
static compress_algo_t
get_compress_level(compress_algo_t algo)
{
@ -64,7 +58,7 @@ get_compress_level(compress_algo_t algo)
}
static int
get_compress_algo(char *algo_name)
get_compress_algo(const char *algo_name)
{
if (strcmp(algo_name, "none") == 0) {
return (COMPRESS_NONE);
@ -350,11 +344,11 @@ write_config(char *configfile, archive_config_t *cfg)
}
int
set_config_s(archive_config_t *cfg, compress_algo_t algo, cksum_t ck, cksum_t ck_sim,
set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim,
uint32_t chunksize, size_t file_sz, uint64_t user_chunk_sz, int pct_interval)
{
cfg->algo = algo;
cfg->algo = get_compress_algo(algo);
cfg->chunk_cksum_type = ck;
cfg->similarity_cksum = ck_sim;
cfg->compress_level = get_compress_level(cfg->algo);
@ -366,7 +360,7 @@ set_config_s(archive_config_t *cfg, compress_algo_t algo, cksum_t ck, cksum_t ck
cfg->archive_sz = file_sz;
cfg->dedupe_mode = MODE_SIMILARITY;
if (cfg->archive_sz <= EIGHT_GB) {
if (cfg->archive_sz <= SIXTEEN_GB) {
cfg->dedupe_mode = MODE_SIMPLE;
cfg->segment_sz_bytes = user_chunk_sz;

View file

@ -86,7 +86,7 @@ typedef struct _segment_entry {
int read_config(char *configfile, archive_config_t *cfg);
int write_config(char *configfile, archive_config_t *cfg);
int set_config_s(archive_config_t *cfg, compress_algo_t algo, cksum_t ck, cksum_t ck_sim,
int set_config_s(archive_config_t *cfg, const char *algo, cksum_t ck, cksum_t ck_sim,
uint32_t chunksize, size_t file_sz, uint64_t user_chunk_sz,
int pct_interval);

View file

@ -104,6 +104,7 @@ extern int bspatch(u_char *pbuf, u_char *oldbuf, bsize_t oldsize, u_char *newbuf
static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER;
uint64_t ir[256], out[256];
static int inited = 0;
archive_config_t *arc = NULL;
static uint32_t
dedupe_min_blksz(int rab_blk_sz)
@ -129,9 +130,10 @@ dedupe_buf_extra(uint64_t chunksize, int rab_blk_sz, const char *algo, int delta
dedupe_context_t *
create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz,
const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op) {
int file_version, compress_op_t op, uint64_t file_size, char *tmppath) {
dedupe_context_t *ctx;
uint32_t i;
archive_config_t *arc;
if (rab_blk_sz < 1 || rab_blk_sz > 5)
rab_blk_sz = RAB_BLK_DEFAULT;
@ -140,6 +142,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
delta_flag = 0;
inited = 1;
}
arc = NULL;
/*
* Pre-compute a table of irreducible polynomial evaluations for each
@ -169,6 +172,19 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
}
ir[j] = val;
}
if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz > 0) {
my_sysinfo msys_info;
get_sysinfo(&msys_info);
arc = init_global_db_s(NULL, NULL, rab_blk_sz, chunksize, DEFAULT_PCT_INTERVAL,
algo, props->cksum, props->cksum, file_size,
msys_info.freeram, props->nthreads);
if (arc == NULL) {
pthread_mutex_unlock(&init_lock);
return (NULL);
}
}
inited = 1;
}
pthread_mutex_unlock(&init_lock);
@ -194,6 +210,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
*/
ctx = (dedupe_context_t *)slab_alloc(NULL, sizeof (dedupe_context_t));
ctx->rabin_poly_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE;
ctx->arc = arc;
ctx->current_window_data = NULL;
ctx->dedupe_flag = dedupe_flag;

View file

@ -64,6 +64,7 @@
#define _RABIN_POLY_H_
#include "utils.h"
#include <db.h>
//List of constants, mostly constraints and defaults for various parameters
//to the Rabin Fingerprinting algorithm
@ -121,6 +122,7 @@
#define RABIN_DEDUPE_SEGMENTED 0
#define RABIN_DEDUPE_FIXED 1
#define RABIN_DEDUPE_FILE_GLOBAL 2
#define DEFAULT_PCT_INTERVAL 2
// Mask to extract value from a rabin index entry
#define RABIN_INDEX_VALUE (0x3FFFFFFFUL)
@ -176,11 +178,12 @@ typedef struct {
short valid;
void *lzma_data;
int level, delta_flag, dedupe_flag, deltac_min_distance;
archive_config_t *arc;
} dedupe_context_t;
extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize,
int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int fixed_flag,
int file_version, compress_op_t op);
int file_version, compress_op_t op, uint64_t file_size, char *tmppath);
extern void destroy_dedupe_context(dedupe_context_t *ctx);
extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf,
uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt);

View file

@ -374,8 +374,13 @@ void
get_sysinfo(my_sysinfo *msys_info)
{
struct sysinfo sys_info;
sysinfo(&sys_info);
int rv;
rv = sysinfo(&sys_info);
if (rv == -1) {
sys_info.freeram = 100 * 1024 * 1024; // 100M arbitrary
}
msys_info->totalram = sys_info.totalram;
msys_info->freeram = sys_info.freeram;
msys_info->totalswap = sys_info.totalswap;

View file

@ -48,6 +48,13 @@ extern "C" {
#define EIGHTM (8UL * 1024UL * 1024UL)
#define FOURM (4UL * 1024UL * 1024UL)
#define ONE_PB (1125899906842624ULL)
#define ONE_TB (1099511627776ULL)
#define FOUR_MB FOURM
#define EIGHT_MB EIGHTM
#define EIGHT_GB (8589934592ULL)
#define SIXTEEN_GB (EIGHT_GB * 2)
#if !defined(sun) && !defined(__sun)
#define uchar_t u_char
#endif