Add global index cleanup function.

Fix location of sem_wait().
More comments.
This commit is contained in:
Moinak Ghosh 2013-03-25 21:04:16 +05:30
parent 1143207cd5
commit 19b304f30c
5 changed files with 55 additions and 10 deletions

19
main.c
View file

@ -551,13 +551,6 @@ redo:
memcpy(ubuf, cmpbuf, dedupe_index_sz); memcpy(ubuf, cmpbuf, dedupe_index_sz);
} else { } else {
/*
* This chunk was not deduplicated, however we still need to down the
* semaphore in order to maintain proper thread coordination.
*/
if (enable_rabin_global) {
sem_wait(tdat->rctx->index_sem);
}
if (HDR & COMPRESSED) { if (HDR & COMPRESSED) {
if (HDR & CHUNK_FLAG_PREPROC) { if (HDR & CHUNK_FLAG_PREPROC) {
rv = preproc_decompress(tdat->decompress, cseg, tdat->len_cmp, rv = preproc_decompress(tdat->decompress, cseg, tdat->len_cmp,
@ -606,6 +599,16 @@ redo:
tdat->uncompressed_chunk = tdat->compressed_chunk; tdat->uncompressed_chunk = tdat->compressed_chunk;
tdat->compressed_chunk = tmp; tdat->compressed_chunk = tmp;
tdat->cmp_seg = tdat->uncompressed_chunk; tdat->cmp_seg = tdat->uncompressed_chunk;
} else {
/*
* This chunk was not deduplicated, however we still need to down the
* semaphore in order to maintain proper thread coordination. We do this after
* decompression to achieve better concurrency. Decompression does not need
* to wait for the previous thread's dedupe recovery to complete.
*/
if (enable_rabin_global) {
sem_wait(tdat->rctx->index_sem);
}
} }
if (!encrypt_type) { if (!encrypt_type) {
@ -2575,7 +2578,7 @@ main(int argc, char *argv[])
} }
if (enable_rabin_global && enable_delta_encode) { if (enable_rabin_global && enable_delta_encode) {
fprintf(stderr, "Global Deduplication does not support Delta Compression yet.\n"); fprintf(stderr, "Global Deduplication does not support Delta Compression.\n");
exit(1); exit(1);
} }

View file

@ -102,6 +102,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
if (cfg->dedupe_mode == MODE_SIMPLE) { if (cfg->dedupe_mode == MODE_SIMPLE) {
pct_interval = 0; pct_interval = 0;
cfg->pct_interval = 0;
} }
if (path != NULL) { if (path != NULL) {
@ -153,6 +154,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch
indx->hash_entry_size = hash_entry_size; indx->hash_entry_size = hash_entry_size;
indx->intervals = intervals; indx->intervals = intervals;
indx->hash_slots = hash_slots / intervals; indx->hash_slots = hash_slots / intervals;
cfg->nthreads = nthreads;
for (i = 0; i < intervals; i++) { for (i = 0; i < intervals; i++) {
indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals, indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals,
@ -264,3 +266,21 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
} }
return (NULL); return (NULL);
} }
void
destroy_global_db_s(archive_config_t *cfg)
{
int i;
index_t *indx = (index_t *)(cfg->dbdata);
cleanup_indx(indx);
if (cfg->pct_interval > 0) {
for (i = 0; i < cfg->nthreads; i++) {
close(cfg->seg_fd_r[i]);
}
free(cfg->seg_fd_r);
close(cfg->seg_fd_w);
unlink(cfg->rootdir);
}
}

View file

@ -48,6 +48,7 @@ archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize
int nthreads); int nthreads);
hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval,
uint64_t item_offset, uint32_t item_size, int do_insert); uint64_t item_offset, uint32_t item_size, int do_insert);
void destroy_global_db_s(archive_config_t *cfg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View file

@ -73,8 +73,10 @@ typedef struct {
int directory_fanout; // Number of subdirectories in a directory int directory_fanout; // Number of subdirectories in a directory
int directory_levels; // Levels of nested directories int directory_levels; // Levels of nested directories
int num_containers; // Number of containers in a directory int num_containers; // Number of containers in a directory
int nthreads; // Number of threads processing data segments in parallel
int seg_fd_w; int seg_fd_w;
int *seg_fd_r; int *seg_fd_r; // One read-only fd per thread for mapping in portions of the
// segment metadata cache.
void *dbdata; void *dbdata;
} archive_config_t; } archive_config_t;

View file

@ -320,6 +320,16 @@ destroy_dedupe_context(dedupe_context_t *ctx)
#ifndef SSE_MODE #ifndef SSE_MODE
if (ctx->current_window_data) slab_free(NULL, ctx->current_window_data); if (ctx->current_window_data) slab_free(NULL, ctx->current_window_data);
#endif #endif
pthread_mutex_lock(&init_lock);
if (arc) {
if (arc->dedupe_mode == MODE_SIMPLE) {
destroy_global_db_s(arc);
}
}
arc = NULL;
pthread_mutex_unlock(&init_lock);
if (ctx->blocks) { if (ctx->blocks) {
for (i=0; i<ctx->blknum && ctx->blocks[i] != NULL; i++) { for (i=0; i<ctx->blknum && ctx->blocks[i] != NULL; i++) {
slab_free(NULL, ctx->blocks[i]); slab_free(NULL, ctx->blocks[i]);
@ -767,6 +777,10 @@ process_blocks:
goto dedupe_done; goto dedupe_done;
} }
/*
* Subsequent processing below is for per-segment Deduplication.
*/
/* /*
* Compute hash signature for each block. We do this in a separate loop to * Compute hash signature for each block. We do this in a separate loop to
* have a fast linear scan through the buffer. * have a fast linear scan through the buffer.
@ -1120,6 +1134,11 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size)
return; return;
} }
/*
* Handling for per-segment Deduplication.
* First pass re-create the rabin block array from the index metadata.
* Second pass copy over blocks to the target buffer to re-create the original segment.
*/
slab_cache_add(sizeof (rabin_blockentry_t)); slab_cache_add(sizeof (rabin_blockentry_t));
for (blk = 0; blk < blknum; blk++) { for (blk = 0; blk < blknum; blk++) {
if (ctx->blocks[blk] == 0) if (ctx->blocks[blk] == 0)