Change Segmented Dedupe flow to improve parallelism.

Periodically sync writes to segcache file.
Use simple insertion sort for small numbers of elements.
This commit is contained in:
Moinak Ghosh 2013-04-25 23:42:32 +05:30
parent 79a6e7f770
commit 5bb028fe03
3 changed files with 172 additions and 80 deletions

View file

@ -313,6 +313,12 @@ db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, ui
return (0);
}
void
db_segcache_sync(archive_config_t *cfg)
{
fdatasync(cfg->seg_fd_w);
}
/*
* Get the current file pointer position of the metadata file. This indicates the
* position where the next entry will be added.

View file

@ -55,6 +55,7 @@ hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int
void destroy_global_db_s(archive_config_t *cfg);
int db_segcache_write(archive_config_t *cfg, int tid, uchar_t *buf, uint32_t len, uint32_t blknum, uint64_t file_offset);
void db_segcache_sync(archive_config_t *cfg);
int db_segcache_pos(archive_config_t *cfg, int tid);
int db_segcache_map(archive_config_t *cfg, int tid, uint32_t *blknum, uint64_t *offset, uchar_t **blocks);
int db_segcache_unmap(archive_config_t *cfg, int tid);

View file

@ -380,6 +380,34 @@ destroy_dedupe_context(dedupe_context_t *ctx)
}
}
/*
* Simple insertion sort of integers. Used for sorting a small number of items to
* avoid overheads of qsort() with callback function.
*/
static void
isort_uint64(uint64_t *ary, uint32_t nitems)
{
uint32_t i, j, k;
uint64_t tmp;
for (i = 1 ; i < nitems; i++) {
for (j = 0 ; j < i ; j++) {
if (ary[j] > ary[i]) {
tmp = ary[j] ;
ary[j] = ary[i] ;
for (k = i ; k > j ; k--)
ary[k] = ary[k - 1] ;
ary[k + 1] = tmp ;
}
}
}
}
/*
* Callback for qsort() for 64-bit min-values list in hash values.
*/
int
cmpint(const void *a, const void *b)
{
@ -823,7 +851,7 @@ process_blocks:
g_dedupe_idx += (RABIN_ENTRY_SIZE * 2);
} else {
uchar_t *seg_heap, *sim_ck;
uchar_t *seg_heap, *sim_ck, *sim_offsets;
archive_config_t *cfg;
uint32_t increment, len, blks, o_blks, k;
global_blockentry_t *seg_blocks;
@ -836,10 +864,16 @@ process_blocks:
* in-memory index for very large datasets.
* ======================================================================
*/
cfg = ctx->arc;
assert(cfg->similarity_cksum_sz >= sizeof (uint64_t));
seg_heap = (uchar_t *)(ctx->g_blocks) - cfg->segment_sz * cfg->chunk_cksum_sz;
ary_sz = (cfg->sub_intervals * cfg->similarity_cksum_sz + sizeof (blks) + 1) *
((blknum+1) / cfg->segment_sz) + 3;
sim_offsets = seg_heap - ary_sz;
src = sim_offsets;
ary_sz = cfg->segment_sz * sizeof (global_blockentry_t **);
htab = (global_blockentry_t **)(seg_heap - ary_sz);
htab = (global_blockentry_t **)(src - ary_sz);
for (i=0; i<blknum;) {
uint64_t crc, off1;
length = 0;
@ -856,6 +890,8 @@ process_blocks:
length += cfg->chunk_cksum_sz;
tgt += cfg->chunk_cksum_sz;
}
*((uint32_t *)src) = blks;
src += sizeof (blks);
blks = j+i;
/*
@ -890,11 +926,88 @@ process_blocks:
DEBUG_STAT_EN(w2 = get_wtime_millis());
}
sim_ck -= cfg->similarity_cksum_sz;
seg_offset = db_segcache_pos(cfg, ctx->id);
src = (uchar_t *)&(ctx->g_blocks[i]);
len = blks * sizeof (global_blockentry_t);
db_segcache_write(cfg, ctx->id, src, len, blks-i, ctx->file_offset);
db_segcache_write(cfg, ctx->id, (uchar_t *)&(ctx->g_blocks[i]),
len, blks-i, ctx->file_offset);
/*
* Now lookup all the similarity hashes. We sort the hashes first so that
* all duplicate hash values can be easily eliminated.
*
* The matching segment offsets in the segcache are stored in a list.
*/
if (cfg->similarity_cksum_sz == 8) {
isort_uint64((uint64_t *)(ctx->similarity_cksums), sub_i);
} else {
fprintf(stderr, "Similarity Checksum Size: %d not implemented.\n",
cfg->similarity_cksum_sz);
ctx->valid = 0;
sem_post(ctx->index_sem_next);
return (0);
}
sim_ck = ctx->similarity_cksums;
tgt = src + 1; // One byte for number of entries
crc = 0;
off1 = UINT64_MAX;
k = 0;
for (j=0; j < sub_i; j++) {
hash_entry_t *he = NULL;
/*
* Check for duplicate checksum which need not be looked up
* again.
*/
if (crc == *((uint64_t *)sim_ck)) {
he = NULL;
} else {
he = db_lookup_insert_s(cfg, sim_ck, 0, seg_offset, 0, 1);
/*
* Check for different checksum but same segment match.
* This is not a complete check but does help to reduce
* wasted processing.
*/
if (he && off1 == he->item_offset) {
crc = *((uint64_t *)sim_ck);
he = NULL;
}
}
if (he) {
crc = *((uint64_t *)sim_ck);
off1 = he->item_offset;
*((uint64_t *)tgt) = off1;
tgt += cfg->similarity_cksum_sz;
k++;
}
sim_ck += cfg->similarity_cksum_sz;
}
*src = k; // Number of entries
src++;
/*
* At this point we have a list of segment offsets from the segcache
* file. Sort the offsets to avoid subsequent random access.
*/
isort_uint64((uint64_t *)src, k);
src = tgt;
i = blks;
}
/*
* Signal the next thread in sequence to access the index.
*/
sem_post(ctx->index_sem_next);
db_segcache_sync(cfg);
/*
* Now go through all the matching segments for all the current segments
* and perform actual deduplication.
*/
src = sim_offsets;
for (i=0; i<blknum;) {
blks = *((uint32_t *)src) + i;
src += sizeof (blks);
/*
* Insert current segment blocks into local hashtable and do partial
@ -944,93 +1057,65 @@ process_blocks:
}
/*
* Now lookup all the similarity hashes. We sort the hashes first so that
* all duplicate hash values can be easily eliminated.
* Now go through segment match list which was prepared earlier
* and deduplicate with the matching segment blocks.
*/
qsort(ctx->similarity_cksums, sub_i, 8, cmpint);
crc = 0;
off1 = UINT64_MAX;
for (j=sub_i; j > 0; j--) {
hash_entry_t *he = NULL;
sub_i = *src;
src++;
sim_ck = src;
for (j=0; j < sub_i; j++) {
/*
* Load segment metadata from disk and perform identity deduplication
* with the segment chunks.
*/
offset = *((uint64_t *)sim_ck);
if (db_segcache_map(cfg, ctx->id, &o_blks, &offset,
(uchar_t **)&seg_blocks) == -1) {
fprintf(stderr, "** Segment cache mmap failed.\n");
ctx->valid = 0;
return (0);
}
/*
* Check for duplicate checksum which need not be looked up
* again.
* Now lookup loaded segment blocks in hashtable. If match is
* found then the hashtable entry is updated to point to the
* loaded segment block.
*/
if (crc == *((uint64_t *)sim_ck)) {
he = NULL;
} else {
he = db_lookup_insert_s(cfg, sim_ck, 0, seg_offset, 0, 1);
/*
* Check for different checksum but same segment match.
* This is not a complete check but does help to reduce
* wasted processing.
*/
if (he && off1 == he->item_offset) {
crc = *((uint64_t *)sim_ck);
he = NULL;
}
}
if (he) {
/*
* Match found. Load segment metadata from disk and perform
* identity deduplication with the segment chunks.
*/
crc = *((uint64_t *)sim_ck);
offset = he->item_offset;
off1 = offset;
if (db_segcache_map(cfg, ctx->id, &o_blks, &offset,
(uchar_t **)&seg_blocks) == -1) {
fprintf(stderr, "Segment cache mmap failed.\n");
ctx->valid = 0;
return (0);
}
for (k=0; k<o_blks; k++) {
uint32_t hent;
hent = XXH32(seg_blocks[k].cksum, cfg->chunk_cksum_sz, 0);
hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz;
/*
* Now lookup loaded segment blocks in hashtable. If match is
* found then the hashtable entry is updated to point to the
* loaded segment block.
*/
for (k=0; k<o_blks; k++) {
uint32_t hent;
hent = XXH32(seg_blocks[k].cksum, cfg->chunk_cksum_sz, 0);
hent ^= (hent / cfg->chunk_cksum_sz);
hent = hent % cfg->segment_sz;
if (htab[hent] != NULL) {
be = htab[hent];
do {
if (be->length & RABIN_INDEX_FLAG)
goto next_ent;
if (ckcmp(seg_blocks[k].cksum,
be->cksum, cfg->chunk_cksum_sz) == 0 &&
seg_blocks[k].length == be->length) {
be->length = (be->length |
RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG;
be->offset = seg_blocks[k].offset +
offset;
break;
}
if (htab[hent] != NULL) {
be = htab[hent];
do {
if (be->length & RABIN_INDEX_FLAG)
goto next_ent;
if (ckcmp(seg_blocks[k].cksum,
be->cksum, cfg->chunk_cksum_sz) == 0 &&
seg_blocks[k].length == be->length) {
be->length = (be->length |
RABIN_INDEX_FLAG) &
CLEAR_SIMILARITY_FLAG;
be->offset = seg_blocks[k].offset +
offset;
break;
}
next_ent:
if (be->next)
be = be->next;
else
break;
} while(1);
}
if (be->next)
be = be->next;
else
break;
} while(1);
}
}
sim_ck -= cfg->similarity_cksum_sz;
sim_ck += cfg->similarity_cksum_sz;
}
src = sim_ck;
i = blks;
}
/*
* Signal the next thread in sequence to access the index.
*/
sem_post(ctx->index_sem_next);
/*======================================================================
* Finally scan the blocks array and update dedupe index.
*======================================================================