From 19b304f30cf0c077e929c7094145ab7fc918eaa2 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Mon, 25 Mar 2013 21:04:16 +0530 Subject: [PATCH] Add global index cleanup function. Fix location of sem_wait(). More comments. --- main.c | 19 +++++++++++-------- rabin/global/db.c | 20 ++++++++++++++++++++ rabin/global/db.h | 1 + rabin/global/dedupe_config.h | 6 ++++-- rabin/rabin_dedup.c | 19 +++++++++++++++++++ 5 files changed, 55 insertions(+), 10 deletions(-) diff --git a/main.c b/main.c index fdafbe6..5278cee 100644 --- a/main.c +++ b/main.c @@ -551,13 +551,6 @@ redo: memcpy(ubuf, cmpbuf, dedupe_index_sz); } else { - /* - * This chunk was not deduplicated, however we still need to down the - * semaphore in order to maintain proper thread coordination. - */ - if (enable_rabin_global) { - sem_wait(tdat->rctx->index_sem); - } if (HDR & COMPRESSED) { if (HDR & CHUNK_FLAG_PREPROC) { rv = preproc_decompress(tdat->decompress, cseg, tdat->len_cmp, @@ -606,6 +599,16 @@ redo: tdat->uncompressed_chunk = tdat->compressed_chunk; tdat->compressed_chunk = tmp; tdat->cmp_seg = tdat->uncompressed_chunk; + } else { + /* + * This chunk was not deduplicated, however we still need to down the + * semaphore in order to maintain proper thread coordination. We do this after + * decompression to achieve better concurrency. Decompression does not need + * to wait for the previous thread's dedupe recovery to complete. + */ + if (enable_rabin_global) { + sem_wait(tdat->rctx->index_sem); + } } if (!encrypt_type) { @@ -2575,7 +2578,7 @@ main(int argc, char *argv[]) } if (enable_rabin_global && enable_delta_encode) { - fprintf(stderr, "Global Deduplication does not support Delta Compression yet.\n"); + fprintf(stderr, "Global Deduplication does not support Delta Compression.\n"); exit(1); } diff --git a/rabin/global/db.c b/rabin/global/db.c index ac6e6f6..5b1c186 100644 --- a/rabin/global/db.c +++ b/rabin/global/db.c @@ -102,6 +102,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch if (cfg->dedupe_mode == MODE_SIMPLE) { pct_interval = 0; + cfg->pct_interval = 0; } if (path != NULL) { @@ -153,6 +154,7 @@ init_global_db_s(char *path, char *tmppath, uint32_t chunksize, uint64_t user_ch indx->hash_entry_size = hash_entry_size; indx->intervals = intervals; indx->hash_slots = hash_slots / intervals; + cfg->nthreads = nthreads; for (i = 0; i < intervals; i++) { indx->list[i].tab = (hash_entry_t **)calloc(hash_slots / intervals, @@ -264,3 +266,21 @@ db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, } return (NULL); } + +void +destroy_global_db_s(archive_config_t *cfg) +{ + int i; + index_t *indx = (index_t *)(cfg->dbdata); + + cleanup_indx(indx); + if (cfg->pct_interval > 0) { + for (i = 0; i < cfg->nthreads; i++) { + close(cfg->seg_fd_r[i]); + } + free(cfg->seg_fd_r); + close(cfg->seg_fd_w); + unlink(cfg->rootdir); + } +} + diff --git a/rabin/global/db.h b/rabin/global/db.h index 228b953..58285fb 100644 --- a/rabin/global/db.h +++ b/rabin/global/db.h @@ -48,6 +48,7 @@ archive_config_t *init_global_db_s(char *path, char *tmppath, uint32_t chunksize int nthreads); hash_entry_t *db_lookup_insert_s(archive_config_t *cfg, uchar_t *sim_cksum, int interval, uint64_t item_offset, uint32_t item_size, int do_insert); +void destroy_global_db_s(archive_config_t *cfg); #ifdef __cplusplus } diff --git a/rabin/global/dedupe_config.h b/rabin/global/dedupe_config.h index 14529b7..996e1f8 100644 --- a/rabin/global/dedupe_config.h +++ b/rabin/global/dedupe_config.h @@ -73,8 +73,10 @@ typedef struct { int directory_fanout; // Number of subdirectories in a directory int directory_levels; // Levels of nested directories int num_containers; // Number of containers in a directory - int seg_fd_w; - int *seg_fd_r; + int nthreads; // Number of threads processing data segments in parallel + int seg_fd_w; + int *seg_fd_r; // One read-only fd per thread for mapping in portions of the + // segment metadata cache. void *dbdata; } archive_config_t; diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index 0b06d45..842de2d 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -320,6 +320,16 @@ destroy_dedupe_context(dedupe_context_t *ctx) #ifndef SSE_MODE if (ctx->current_window_data) slab_free(NULL, ctx->current_window_data); #endif + + pthread_mutex_lock(&init_lock); + if (arc) { + if (arc->dedupe_mode == MODE_SIMPLE) { + destroy_global_db_s(arc); + } + } + arc = NULL; + pthread_mutex_unlock(&init_lock); + if (ctx->blocks) { for (i=0; iblknum && ctx->blocks[i] != NULL; i++) { slab_free(NULL, ctx->blocks[i]); @@ -767,6 +777,10 @@ process_blocks: goto dedupe_done; } + /* + * Subsequent processing below is for per-segment Deduplication. + */ + /* * Compute hash signature for each block. We do this in a separate loop to * have a fast linear scan through the buffer. @@ -1120,6 +1134,11 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size) return; } + /* + * Handling for per-segment Deduplication. + * First pass re-create the rabin block array from the index metadata. + * Second pass copy over blocks to the target buffer to re-create the original segment. + */ slab_cache_add(sizeof (rabin_blockentry_t)); for (blk = 0; blk < blknum; blk++) { if (ctx->blocks[blk] == 0)