From a43fdd7d2cefc83243154f5b78e5a40b1e429baa Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Sun, 23 Dec 2012 00:44:56 +0530 Subject: [PATCH] Improve Delta2 scanning speed and effectiveness. Add destination buffer overflow check in Delta2. Add rough speed computation. --- delta2/delta2.c | 33 ++++++++++++++++++++++++++++----- lzp/lzp.c | 7 +++++++ main.c | 21 +++++++++++++++++++++ rabin/rabin_dedup.c | 17 ++++++++++++----- utils/utils.c | 22 ++++++++++++++++++++++ utils/utils.h | 4 ++++ 6 files changed, 94 insertions(+), 10 deletions(-) diff --git a/delta2/delta2.c b/delta2/delta2.c index 302f745..8fa82df 100644 --- a/delta2/delta2.c +++ b/delta2/delta2.c @@ -110,6 +110,7 @@ delta2_encode(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen, int uchar_t *srcpos, *dstpos, *lastdst, *lastsrc, *dstend; uint64_t slen, sz, dsz, pending; int rem, lenc, transp_count, hdr_ovr; + DEBUG_STAT_EN(double strt, en); srcpos = src; dstpos = dst; @@ -123,6 +124,7 @@ delta2_encode(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen, int transp_count = 0; hdr_ovr = 0; + DEBUG_STAT_EN(strt = get_wtime_millis()); while (slen > 0) { if (slen > DELTA2_CHUNK) { sz = DELTA2_CHUNK; @@ -174,8 +176,10 @@ delta2_encode(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen, int memcpy(lastdst, lastsrc, pending); } *dstlen = dstpos - dst; + DEBUG_STAT_EN(en = get_wtime_millis()); DEBUG_STAT_EN(fprintf(stderr, "DELTA2: srclen: %" PRIu64 ", dstlen: %" PRIu64 "\n", srclen, *dstlen)); DEBUG_STAT_EN(fprintf(stderr, "DELTA2: transpositions: %d, header overhead: %d\n", transp_count, hdr_ovr)); + DEBUG_STAT_EN(fprintf(stderr, "DELTA2: Processed at %.3f MB/s\n", get_mb_s(srclen, strt, en))); } return (0); } @@ -195,12 +199,16 @@ delta2_encode_real(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen return (-1); gtot1 = ULL_MAX; stride = 0; + sval = 0; + val = 0; sz = sizeof (strides) / sizeof (strides[0]); /* * Estimate which stride length gives the max reduction given rle_thresh. */ for (st = 0; st < sz; st++) { + int gt; + snum = 0; gtot2 = MAIN_HDR + LIT_HDR; vl1 = 0; @@ -215,10 +223,9 @@ delta2_encode_real(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen vld2 = vl2 - vl1; if (vld1 != vld2) { if (snum > rle_thresh) { - if (tot > 0) { - gtot2 += LIT_HDR; - tot = 0; - } + gt = (tot > 0); + gtot2 += (LIT_HDR * gt); + tot = 0; gtot2 += DELTA_HDR; } else { gtot2 += snum; @@ -233,16 +240,32 @@ delta2_encode_real(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen } if (snum > rle_thresh) { gtot2 += DELTA_HDR; + /* + * If this ended partially into another table reset next scan + * point to before the table. + */ + val = cnt - snum; } else { gtot2 += snum; + /* + * If this ended partially into another table reset next scan + * point to before the table. + */ + if (snum >= st1 * 5) + val = cnt - snum; } if (gtot2 < gtot1) { gtot1 = gtot2; stride = st1; + sval = val; } } - if (!(gtot1 < srclen && srclen - gtot1 > (DELTA_HDR + LIT_HDR + MAIN_HDR))) { + if ( !(gtot1 < srclen && srclen - gtot1 > (DELTA_HDR + LIT_HDR + MAIN_HDR) && gtot1 < *dstlen) ) { + if (srclen >= DELTA2_CHUNK) { + if (sval > 0) + *dstlen = sval; + } return (-1); } diff --git a/lzp/lzp.c b/lzp/lzp.c index 45ea6ff..dd910c2 100644 --- a/lzp/lzp.c +++ b/lzp/lzp.c @@ -43,6 +43,8 @@ See also the bsc and libbsc web site: #include #include #include +#include +#include #include "lzp.h" @@ -245,7 +247,9 @@ int64_t bsc_lzp_compress_serial(const unsigned char * input, unsigned char * out int chunkSize; int blockId; int64_t outputPtr = 1 + 8 * nBlocks; + DEBUG_STAT_EN(double strt, en); + DEBUG_STAT_EN(strt = get_wtime_millis()); if (n > LZP_MAX_BLOCK) chunkSize = LZP_MAX_BLOCK; else @@ -270,7 +274,10 @@ int64_t bsc_lzp_compress_serial(const unsigned char * input, unsigned char * out outputPtr += result; } + DEBUG_STAT_EN(en = get_wtime_millis()); + DEBUG_STAT_EN(fprintf(stderr, "LZP: Insize: %" PRId64 ", Outsize: %" PRId64 "\n", n, outputPtr)); + DEBUG_STAT_EN(fprintf(stderr, "LZP: Processed at %.3f MB/s\n", get_mb_s(n, strt, en))); return outputPtr; } diff --git a/main.c b/main.c index 3a09809..a422933 100644 --- a/main.c +++ b/main.c @@ -199,6 +199,7 @@ preproc_compress(compress_func_ptr cmp_func, void *src, uint64_t srclen, void *d { uchar_t *dest = (uchar_t *)dst, type = 0; int64_t result, _dstlen; + DEBUG_STAT_EN(double strt, en); _dstlen = *dstlen; if (lzp_preprocess) { @@ -233,10 +234,14 @@ preproc_compress(compress_func_ptr cmp_func, void *src, uint64_t srclen, void *d *dest = type; *((int64_t *)(dest + 1)) = htonll(srclen); _dstlen = srclen; + DEBUG_STAT_EN(strt = get_wtime_millis()); result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, data); + DEBUG_STAT_EN(en = get_wtime_millis()); + if (result > -1 && _dstlen < srclen) { *dest |= PREPROC_COMPRESSED; *dstlen = _dstlen + 9; + DEBUG_STAT_EN(fprintf(stderr, "Chunk compression speed %.3f MB/s\n", get_mb_s(srclen, strt, en))); } else { memcpy(dest+1, src, srclen); *dstlen = srclen + 1; @@ -253,6 +258,7 @@ preproc_decompress(compress_func_ptr dec_func, void *src, uint64_t srclen, void uchar_t *sorc = (uchar_t *)src, type; int64_t result; uint64_t _dstlen = *dstlen; + DEBUG_STAT_EN(double strt, en); type = *sorc; sorc++; @@ -261,8 +267,11 @@ preproc_decompress(compress_func_ptr dec_func, void *src, uint64_t srclen, void *dstlen = ntohll(*((int64_t *)(sorc))); sorc += 8; srclen -= 8; + DEBUG_STAT_EN(strt = get_wtime_millis()); result = dec_func(sorc, srclen, dst, dstlen, level, chdr, data); + DEBUG_STAT_EN(en = get_wtime_millis()); if (result < 0) return (result); + DEBUG_STAT_EN(fprintf(stderr, "Chunk decompression speed %.3f MB/s\n", get_mb_s(srclen, strt, en))); memcpy(src, dst, *dstlen); srclen = *dstlen; } @@ -445,8 +454,14 @@ redo: rv = preproc_decompress(tdat->decompress, cmpbuf, dedupe_data_sz_cmp, ubuf, &_chunksize, tdat->level, HDR, tdat->data, tdat->props); } else { + DEBUG_STAT_EN(double strt, en); + + DEBUG_STAT_EN(strt = get_wtime_millis()); rv = tdat->decompress(cmpbuf, dedupe_data_sz_cmp, ubuf, &_chunksize, tdat->level, HDR, tdat->data); + DEBUG_STAT_EN(en = get_wtime_millis()); + DEBUG_STAT_EN(fprintf(stderr, "Chunk decompression speed %.3f MB/s\n", + get_mb_s(_chunksize, strt, en))); } if (rv == -1) { tdat->len_cmp = 0; @@ -1208,8 +1223,14 @@ plain_index: _chunksize, compressed_chunk + index_size_cmp, &_chunksize, tdat->level, 0, tdat->data, tdat->props); } else { + DEBUG_STAT_EN(double strt, en); + + DEBUG_STAT_EN(strt = get_wtime_millis()); rv = tdat->compress(tdat->uncompressed_chunk + dedupe_index_sz, _chunksize, compressed_chunk + index_size_cmp, &_chunksize, tdat->level, 0, tdat->data); + DEBUG_STAT_EN(en = get_wtime_millis()); + DEBUG_STAT_EN(fprintf(stderr, "Chunk compression speed %.3f MB/s\n", + get_mb_s(_chunksize, strt, en))); } /* Can't compress data just retain as-is. */ diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index e7d051b..4ab43b9 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -291,12 +291,14 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, int64_t *size, int64_t offs heap_t heap; DEBUG_STAT_EN(uint32_t max_count); DEBUG_STAT_EN(max_count = 0); + DEBUG_STAT_EN(double strt, en); length = offset; last_offset = 0; blknum = 0; ctx->valid = 0; cur_roll_checksum = 0; + DEBUG_STAT_EN(strt = get_wtime_millis()); if (ctx->fixed_flag) { blknum = *size / ctx->rabin_poly_avg_block_size; @@ -484,8 +486,8 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, int64_t *size, int64_t offs process_blocks: // If we found at least a few chunks, perform dedup. - DEBUG_STAT_EN(printf("Original size: %" PRId64 ", blknum: %u\n", *size, blknum)); - DEBUG_STAT_EN(printf("Number of maxlen blocks: %u\n", max_count)); + DEBUG_STAT_EN(fprintf(stderr, "Original size: %" PRId64 ", blknum: %u\n", *size, blknum)); + DEBUG_STAT_EN(fprintf(stderr, "Number of maxlen blocks: %u\n", max_count)); if (blknum > 2) { int64_t pos, matchlen, pos1; int valid = 1; @@ -605,10 +607,11 @@ process_blocks: } } } - DEBUG_STAT_EN(printf("Total Hashtable bucket collisions: %u\n", hash_collisions)); + DEBUG_STAT_EN(fprintf(stderr, "Total Hashtable bucket collisions: %u\n", hash_collisions)); dedupe_index_sz = (int64_t)blknum * RABIN_ENTRY_SIZE; if (matchlen < dedupe_index_sz) { + DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n")); ctx->valid = 0; return; } @@ -639,7 +642,7 @@ process_blocks: i++; } } - DEBUG_STAT_EN(printf("Merge count: %u\n", merge_count)); + DEBUG_STAT_EN(fprintf(stderr, "Merge count: %u\n", merge_count)); /* * Final pass update dedupe index and copy data. @@ -688,7 +691,9 @@ cont: if (valid) { uchar_t *cbuf = ctx->cbuf; int64_t *entries; + DEBUG_STAT_EN(int64_t sz); + DEBUG_STAT_EN(sz = *size); *((uint32_t *)cbuf) = htonl(blknum); cbuf += sizeof (uint32_t); entries = (int64_t *)cbuf; @@ -697,8 +702,10 @@ cont: entries[2] = htonll(pos1 - dedupe_index_sz - RABIN_HDR_SIZE); *size = pos1; ctx->valid = 1; - DEBUG_STAT_EN(printf("Deduped size: %" PRId64 ", blknum: %u, delta_calls: %u, delta_fails: %u\n", + DEBUG_STAT_EN(en = get_wtime_millis()); + DEBUG_STAT_EN(fprintf(stderr, "Deduped size: %" PRId64 ", blknum: %u, delta_calls: %u, delta_fails: %u\n", *size, blknum, delta_calls, delta_fails)); + DEBUG_STAT_EN(fprintf(stderr, "Dedupe speed %.3f MB/s\n", get_mb_s(sz, strt, en))); /* * Remaining header entries: size of compressed index and size of * compressed data are inserted later via rabin_update_hdr, after actual compression! diff --git a/utils/utils.c b/utils/utils.c index ffa2590..afc9f76 100644 --- a/utils/utils.c +++ b/utils/utils.c @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -308,3 +309,24 @@ get_total_ram() phys_pages = sysconf(_SC_PHYS_PAGES); return (phys_pages * page_size); } + +double +get_wtime_millis(void) +{ + struct timespec ts; + int rv; + + rv = clock_gettime(CLOCK_MONOTONIC, &ts); + if (rv == 0) + return (ts.tv_sec * 1000 + ((double)ts.tv_nsec) / 1000000L); + return (1); +} + +double +get_mb_s(uint64_t bytes, double strt, double en) +{ + double bytes_sec; + + bytes_sec = ((double)bytes / (en - strt)) * 1000; + return (BYTES_TO_MB(bytes_sec)); +} diff --git a/utils/utils.h b/utils/utils.h index f96777f..fe6892d 100644 --- a/utils/utils.h +++ b/utils/utils.h @@ -102,6 +102,8 @@ typedef int64_t bsize_t; #define DEBUG_STAT_EN(...) #endif +#define BYTES_TO_MB(x) ((x) / (1024 * 1024)) + typedef struct { uint32_t buf_extra; int compress_mt_capable; @@ -143,6 +145,8 @@ extern int64_t Write(int fd, const void *buf, uint64_t count); extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs, algo_threads_type_t typ); extern uint64_t get_total_ram(); +extern double get_wtime_millis(void); +extern double get_mb_s(uint64_t bytes, double strt, double en); /* Pointer type for compress and decompress functions. */ typedef int (*compress_func_ptr)(void *src, uint64_t srclen, void *dst,