From 010f49f41232f3b235645a013f34989afcbfe9c9 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Sun, 8 Jul 2012 21:44:08 +0530 Subject: [PATCH] Implement ability to partition chunks at the last rabin boundary instead of fixed size. --- main.c | 40 +++++++++++++++++++++++++++---- rabin/rabin_polynomial.c | 52 +++++++++++++++++++++++++++------------- rabin/rabin_polynomial.h | 2 +- utils.c | 33 +++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 22 deletions(-) diff --git a/main.c b/main.c index 8c74b30..faeae60 100644 --- a/main.c +++ b/main.c @@ -78,6 +78,7 @@ static int nthreads = 0; static int hide_mem_stats = 1; static int hide_cmp_stats = 1; static int enable_rabin_scan = 0; +static int enable_rabin_split = 1; static unsigned int chunk_num; static uint64_t largest_chunk, smallest_chunk, avg_chunk; static const char *exec_name; @@ -116,6 +117,7 @@ usage(void) " %s -p ...\n" "4) Attempt Rabin fingerprinting based deduplication on chunks:\n" " %s -D ...\n" + " %s -D -r ... - Do NOT split chunks at a rabin boundary. Default is to split.\n" "5) Number of threads can optionally be specified: -t <1 - 256 count>\n" "6) Pass '-M' to display memory allocator statistics\n" "7) Pass '-C' to display compression statistics\n\n", @@ -602,7 +604,7 @@ redo: rbytes = tdat->rbytes; reset_rabin_context(tdat->rctx); rctx->cbuf = tdat->uncompressed_chunk; - rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0); + rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0, NULL); if (!rctx->valid) { memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes); tdat->rbytes = rbytes; @@ -753,7 +755,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) char tmpfile1[MAXPATHLEN]; char to_filename[MAXPATHLEN]; ssize_t compressed_chunksize; - ssize_t n_chunksize, rbytes; + ssize_t n_chunksize, rbytes, rabin_count; short version, flags; struct stat sbuf; int compfd = -1, uncompfd = -1, err; @@ -762,6 +764,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) struct cmp_data **dary = NULL, *tdat; pthread_t writer_thr; uchar_t *cread_buf, *pos; + rabin_context_t *rctx; /* * Compressed buffer size must include zlib scratch space and @@ -956,11 +959,17 @@ start_compress(const char *filename, uint64_t chunksize, int level) largest_chunk = 0; smallest_chunk = chunksize; avg_chunk = 0; + rabin_count = 0; /* * Read the first chunk into a spare buffer (a simple double-buffering). */ - rbytes = Read(uncompfd, cread_buf, chunksize); + if (enable_rabin_split) { + rctx = create_rabin_context(chunksize, 0, algo); + rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); + } else { + rbytes = Read(uncompfd, cread_buf, chunksize); + } while (!bail) { uchar_t *tmp; @@ -990,6 +999,18 @@ start_compress(const char *filename, uint64_t chunksize, int level) tdat->cmp_seg = cread_buf; cread_buf = tmp; tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t); + + /* + * If there is data after the last rabin boundary in the chunk, then + * rabin_count will be non-zero. We carry over the data to the beginning + * of the next chunk. + */ + if (rabin_count) { + memcpy(cread_buf, + tdat->cmp_seg + rabin_count, rbytes - rabin_count); + tdat->rbytes = rabin_count; + rabin_count = rbytes - rabin_count; + } } else { tmp = tdat->uncompressed_chunk; tdat->uncompressed_chunk = cread_buf; @@ -1014,7 +1035,11 @@ start_compress(const char *filename, uint64_t chunksize, int level) * Read the next buffer we want to process while previous * buffer is in progress. */ - rbytes = Read(uncompfd, cread_buf, chunksize); + if (enable_rabin_split) { + rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); + } else { + rbytes = Read(uncompfd, cread_buf, chunksize); + } } } @@ -1088,6 +1113,7 @@ comp_done: } slab_free(NULL, dary); } + if (enable_rabin_split) destroy_rabin_context(rctx); slab_free(NULL, cread_buf); if (!pipe_mode) { if (compfd != -1) close(compfd); @@ -1177,7 +1203,7 @@ main(int argc, char *argv[]) level = 6; slab_init(); - while ((opt = getopt(argc, argv, "dc:s:l:pt:MCD")) != -1) { + while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDr")) != -1) { int ovr; switch (opt) { @@ -1233,6 +1259,10 @@ main(int argc, char *argv[]) enable_rabin_scan = 1; break; + case 'r': + enable_rabin_split = 0; + break; + case '?': default: usage(); diff --git a/rabin/rabin_polynomial.c b/rabin/rabin_polynomial.c index 305f5ab..1b4ada7 100755 --- a/rabin/rabin_polynomial.c +++ b/rabin/rabin_polynomial.c @@ -123,21 +123,27 @@ create_rabin_context(uint64_t chunksize, uint64_t real_chunksize, const char *al return (NULL); } current_window_data = slab_alloc(NULL, RAB_POLYNOMIAL_WIN_SIZE); - ctx->blocks = (rabin_blockentry_t *)slab_alloc(NULL, - blknum * ctx->rabin_poly_min_block_size); - if(ctx == NULL || current_window_data == NULL || ctx->blocks == NULL) { + ctx->blocks = NULL; + if (real_chunksize > 0) { + ctx->blocks = (rabin_blockentry_t *)slab_alloc(NULL, + blknum * ctx->rabin_poly_min_block_size); + } + if(ctx == NULL || current_window_data == NULL || (ctx->blocks == NULL && real_chunksize > 0)) { fprintf(stderr, "Could not allocate rabin polynomial context, out of memory\n"); destroy_rabin_context(ctx); return (NULL); } - lzma_init(&(ctx->lzma_data), &(ctx->level), chunksize); - if (!(ctx->lzma_data)) { - fprintf(stderr, - "Could not allocate rabin polynomial context, out of memory\n"); - destroy_rabin_context(ctx); - return (NULL); + ctx->lzma_data = NULL; + if (real_chunksize > 0) { + lzma_init(&(ctx->lzma_data), &(ctx->level), chunksize); + if (!(ctx->lzma_data)) { + fprintf(stderr, + "Could not allocate rabin polynomial context, out of memory\n"); + destroy_rabin_context(ctx); + return (NULL); + } } /* * We should compute the power for the window size. @@ -198,7 +204,7 @@ cmpblks(const void *a, const void *b) * the rolling checksum and dedup blocks vary in size from 4K-128K. */ uint32_t -rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) +rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, ssize_t *rabin_pos) { ssize_t i, last_offset,j; uint32_t blknum; @@ -211,6 +217,14 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) ctx->valid = 0; ctx->cur_checksum = 0; + /* + * If rabin_pos is non-zero then we are being asked to scan for the last rabin boundary + * in the chunk. We start scanning at chunk end - max rabin block size. We avoid doing + * a full chunk scan. + */ + if (rabin_pos) { + offset = *size - RAB_POLYNOMIAL_MAX_BLOCK_SIZE; + } if (*size < ctx->rabin_poly_avg_block_size) return; for (i=offset; i<*size; i++) { char cur_byte = buf1[i]; @@ -241,18 +255,24 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) // If we hit our special value or reached the max block size update block offset if ((ctx->cur_roll_checksum & ctx->rabin_avg_block_mask) == ctx->rabin_break_patt || length >= rabin_polynomial_max_block_size) { - ctx->blocks[blknum].offset = last_offset; - ctx->blocks[blknum].index = blknum; // Need to store for sorting - ctx->blocks[blknum].cksum_n_offset = ctx->cur_checksum; - ctx->blocks[blknum].length = length; - ctx->blocks[blknum].refcount = 0; - blknum++; + if (rabin_pos == NULL) { + ctx->blocks[blknum].offset = last_offset; + ctx->blocks[blknum].index = blknum; // Need to store for sorting + ctx->blocks[blknum].cksum_n_offset = ctx->cur_checksum; + ctx->blocks[blknum].length = length; + ctx->blocks[blknum].refcount = 0; + blknum++; + } ctx->cur_checksum = 0; last_offset = i+1; length = 0; } } + if (rabin_pos && last_offset < *size) { + *rabin_pos = last_offset; + return (0); + } // If we found at least a few chunks, perform dedup. if (blknum > 2) { uint64_t prev_cksum; diff --git a/rabin/rabin_polynomial.h b/rabin/rabin_polynomial.h index 9488283..8c58122 100644 --- a/rabin/rabin_polynomial.h +++ b/rabin/rabin_polynomial.h @@ -144,7 +144,7 @@ extern rabin_context_t *create_rabin_context(uint64_t chunksize, uint64_t real_c const char *algo); extern void destroy_rabin_context(rabin_context_t *ctx); extern unsigned int rabin_dedup(rabin_context_t *ctx, unsigned char *buf, - ssize_t *size, ssize_t offset); + ssize_t *size, ssize_t offset, ssize_t *rabin_pos); extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size); extern void rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz, ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp, diff --git a/utils.c b/utils.c index 274da2f..442547d 100644 --- a/utils.c +++ b/utils.c @@ -194,6 +194,39 @@ Read(int fd, void *buf, size_t count) return (count - rem); } +/* + * Read the requested chunk and return the last rabin boundary in the chunk. + * This helps in splitting chunks at rabin boundaries rather than fixed points. + * The request buffer may have some data at the beginning carried over from + * after the previous rabin boundary. + */ +ssize_t +Read_Adjusted(int fd, uchar_t *buf, size_t count, ssize_t *rabin_count, void *ctx) +{ + char *buf2; + ssize_t rcount; + rabin_context_t *rctx = (rabin_context_t *)ctx; + + if (!ctx) return (Read(fd, buf, count)); + buf2 = buf; + if (*rabin_count) { + buf2 = (char *)buf + *rabin_count; + count -= *rabin_count; + } + rcount = Read(fd, buf2, count); + if (rcount > 0) { + rcount += *rabin_count; + if (!rcount < count) + rabin_dedup(rctx, buf, &rcount, *rabin_count, rabin_count); + else + *rabin_count = 0; + } else { + if (rcount == 0) rcount = *rabin_count; + *rabin_count = 0; + } + return (rcount); +} + ssize_t Write(int fd, const void *buf, size_t count) {