Add lookup and insert functionality for global index.

Make global dedupe code buildable.
This commit is contained in:
Moinak Ghosh 2013-02-21 23:07:07 +05:30
parent cb853821c7
commit 5f6217bb1f
5 changed files with 101 additions and 65 deletions

View file

@ -70,8 +70,8 @@ BZLIB_HDRS = $(MAINHDRS)
BZLIB_OBJS = $(BZLIB_SRCS:.c=.o)
BZLIB_CPPFLAGS = @LIBBZ2_INC@
RABINSRCS = rabin/rabin_dedup.c
RABINHDRS = rabin/rabin_dedup.h utils/utils.h
RABINSRCS = rabin/rabin_dedup.c rabin/global/db.c rabin/global/dedupe_config.c
RABINHDRS = rabin/rabin_dedup.h utils/utils.h rabin/global/db.h rabin/global/dedupe_config.h
RABINOBJS = $(RABINSRCS:.c=.o)
BSDIFFSRCS = bsdiff/bsdiff.c bsdiff/bspatch.c bsdiff/rle_encoder.c
@ -168,7 +168,7 @@ RM_RF = rm -rf
COMMON_CPPFLAGS = -I. -I./lzma -I./lzfx -I./lz4 -I./rabin -I./bsdiff -DNODEFAULT_PROPS \
-DFILE_OFFSET_BITS=64 -D_REENTRANT -D__USE_SSE_INTRIN__ -D_LZMA_PROB32 \
-I./lzp @LIBBSCCPPFLAGS@ -I./crypto/skein -I./utils -I./crypto/sha2 \
-I./crypto/scrypt -I./crypto/aes -I./crypto @KEYLEN@ \
-I./crypto/scrypt -I./crypto/aes -I./crypto @KEYLEN@ -I./rabin/global \
-I./crypto/keccak -I./transpose -I./crypto/blake2 $(EXTRA_CPPFLAGS) -pedantic -Wall -std=gnu99 \
-fno-strict-aliasing -Wno-unused-but-set-variable -Wno-enum-compare
COMMON_VEC_FLAGS = -ftree-vectorize

View file

@ -193,7 +193,6 @@ adapt_compress(void *src, uint64_t srclen, void *dst,
tag3 = 0;
prev_byte = cur_byte = 0;
for (i = 0; i < srclen; i++) {
cur_byte = src1[i];
tot8b += (cur_byte & 0x80); // This way for possible auto-vectorization
tag1 += (cur_byte == '<');

View file

@ -30,6 +30,7 @@
#include <utils.h>
#include <allocator.h>
#include <pthread.h>
#include <xxhash.h>
#include "db.h"
@ -44,30 +45,20 @@
typedef struct _hash_entry {
uint64_t seg_offset;
struct _hash_entry *next;
struct _hash_entry *lru_prev;
struct _hash_entry *lru_next;
uchar_t cksum[1];
} hash_entry_t;
typedef struct {
hash_entry_t **htab;
hash_entry_t **tab;
} htab_t;
typedef struct {
htab_t *list;
pthread_mutex_t *mlist;
hash_entry_t *lru_head;
hash_entry_t *lru_tail;
uint64_t memlimit;
uint64_t memused;
int hash_entry_size, intervals;
} htablst_t;
typedef struct {
htablst_t *hlist;
int seg_fd_w;
int *tfd;
} seg_index_t;
int hash_entry_size, intervals, hash_slots;
} index_t;
archive_config_t *
init_global_db(char *configfile)
@ -89,21 +80,21 @@ init_global_db(char *configfile)
}
void
static cleanup_htablst(htablst_t *htablst, int intervals)
static cleanup_indx(index_t *indx)
{
int i;
if (htablst) {
if (htablst->list) {
for (i = 0; i < intervals; i++) {
if (htablst->list[i].htab)
free(htablst->list[i].htab);
if (indx) {
if (indx->list) {
for (i = 0; i < indx->intervals; i++) {
if (indx->list[i].tab)
free(indx->list[i].tab);
}
free(htablst->list);
free(indx->list);
}
if (htablst->mlist)
free(htablst->mlist);
free(htablst);
if (indx->mlist)
free(indx->mlist);
free(indx);
}
}
@ -126,9 +117,8 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, int pct_interval
} else {
uint32_t hash_slots, intervals, i;
uint64_t memreqd;
htablst_t *htablst;
int hash_entry_size;
seg_index_t *indx;
index_t *indx;
// Compute total hashtable entries first
intervals = 100 / pct_interval - 1;
@ -153,68 +143,114 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, int pct_interval
// Now create as many hash tables as there are similarity match intervals
// each having hash_slots / intervals slots.
htablst = calloc(1, sizeof (htablst_t));
if (!htablst) {
indx = calloc(1, sizeof (index_t));
if (!indx) {
free(cfg);
return (NULL);
}
htablst->memlimit = memlimit;
htablst->list = (htab_t *)calloc(intervals, sizeof (htab_t));
htablst->mlist = (pthread_mutex_t *)malloc(intervals * sizeof (pthread_mutex_t));
htablst->hash_entry_size = hash_entry_size;
htablst->intervals = intervals;
indx->memlimit = memlimit;
indx->list = (htab_t *)calloc(intervals, sizeof (htab_t));
indx->mlist = (pthread_mutex_t *)malloc(intervals * sizeof (pthread_mutex_t));
indx->hash_entry_size = hash_entry_size;
indx->intervals = intervals;
indx->hash_slots = hash_slots / intervals;
for (i = 0; i < intervals; i++) {
htablst->list[i].htab = (hash_entry_t **)calloc(hash_slots / intervals,
indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals,
sizeof (hash_entry_t *));
if (!(htablst->list[i].htab)) {
cleanup_htablst(htablst, intervals);
if (!(indx->list[i].tab)) {
cleanup_indx(indx);
free(cfg);
return (NULL);
}
htablst->memused += ((hash_slots / intervals) * (sizeof (hash_entry_t *)));
pthread_mutex_init(&(htablst->mlist[i]), NULL);
indx->memused += ((hash_slots / intervals) * (sizeof (hash_entry_t *)));
pthread_mutex_init(&(indx->mlist[i]), NULL);
}
indx = (seg_index_t *)calloc(1, sizeof (seg_index_t));
if (!indx) {
cleanup_htablst(htablst, intervals);
free(cfg);
return (NULL);
}
indx->hlist = htablst;
strcpy(cfg->rootdir, tmppath);
strcat(cfg->rootdir, "/.segXXXXXX");
indx->seg_fd_w = mkstemp(cfg->rootdir);
indx->tfd = (int *)malloc(sizeof (int) * nthreads);
if (indx->seg_fd_w == -1 || indx->tfd == NULL) {
cleanup_htablst(htablst, intervals);
cfg->seg_fd_w = mkstemp(cfg->rootdir);
cfg->seg_fd_r = (int *)malloc(sizeof (int) * nthreads);
if (cfg->seg_fd_w == -1 || cfg->seg_fd_r == NULL) {
cleanup_indx(indx);
if (cfg->seg_fd_r)
free(cfg->seg_fd_r);
free(cfg);
if (indx->tfd)
free(indx->tfd);
return (NULL);
}
for (i = 0; i < nthreads; i++) {
indx->tfd[i] = open(cfg->rootdir, O_RDONLY);
cfg->seg_fd_r[i] = open(cfg->rootdir, O_RDONLY);
}
cfg->dbdata = indx;
slab_cache_add(hash_entry_size);
slab_cache_add(cfg->chunk_cksum_sz);
}
return (cfg);
}
int
db_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, segment_entry_t *seg, int thr_id)
static inline int
mycmp(uchar_t *a, uchar_t *b, int sz)
{
size_t val1, val2;
uchar_t *v1 = a;
uchar_t *v2 = b;
int len;
len = 0;
do {
val1 = *((size_t *)v1);
val2 = *((size_t *)v1);
if (val1 != val2) {
return (1);
}
v1 += sizeof (size_t);
v2 += sizeof (size_t);
len += sizeof (size_t);
} while (len < sz);
return (0);
}
segment_entry_t *
db_query_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, int thr_id)
uint64_t
db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
uint64_t seg_offset, int do_insert)
{
uint32_t htab_entry;
index_t *indx = (index_t *)(cfg->dbdata);
hash_entry_t **htab, *ent, **pent;
assert(cfg->similarity_cksum_sz & (sizeof (size_t) - 1) == 0);
htab_entry = XXH32(sim_cksum, cfg->similarity_cksum_sz, 0);
htab_entry ^= (htab_entry / cfg->similarity_cksum_sz);
htab_entry = htab_entry % indx->hash_slots;
htab = indx->list[interval].tab;
pent = &(htab[htab_entry]);
pthread_mutex_lock(&(indx->mlist[interval]));
ent = htab[htab_entry];
while (ent) {
if (mycmp(sim_cksum, ent->cksum, cfg->similarity_cksum_sz) == 0) {
uint64_t off;
off = ent->seg_offset;
pthread_mutex_unlock(&(indx->mlist[interval]));
return (off+1);
}
pent = &(ent->next);
ent = ent->next;
}
if (do_insert) {
if (indx->memused + indx->hash_entry_size >= indx->memlimit - (indx->hash_entry_size << 2)) {
ent = htab[htab_entry];
pent = &(htab[htab_entry]);
htab[htab_entry] = htab[htab_entry]->next;
} else {
ent = (hash_entry_t *)malloc(indx->hash_entry_size);
}
ent->seg_offset = seg_offset;
ent->next = 0;
memcpy(ent->cksum, sim_cksum, cfg->similarity_cksum_sz);
*pent = ent;
}
pthread_mutex_unlock(&(indx->mlist[interval]));
return (0);
}

View file

@ -31,9 +31,8 @@ archive_config_t *init_global_db(char *configfile);
archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize,
int pct_interval, compress_algo_t algo, cksum_t ck,
cksum_t ck_sim, size_t file_sz, size_t memlimit, int nthreads);
int db_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
segment_entry_t *seg, int thr_id);
segment_entry_t *db_query_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, int thr_id);
uint64_t db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
uint64_t seg_offset, int do_insert);
#ifdef __cplusplus
}

View file

@ -62,6 +62,8 @@ typedef struct {
int directory_fanout; // Number of subdirectories in a directory
int directory_levels; // Levels of nested directories
int num_containers; // Number of containers in a directory
int seg_fd_w;
int *seg_fd_r;
void *dbdata;
} archive_config_t;