Improve Delta2 scanning speed and effectiveness.

Add destination buffer overflow check in Delta2.
Add rough speed computation.
This commit is contained in:
Moinak Ghosh 2012-12-23 00:44:56 +05:30
parent 765b014018
commit a43fdd7d2c
6 changed files with 94 additions and 10 deletions

View file

@ -110,6 +110,7 @@ delta2_encode(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen, int
uchar_t *srcpos, *dstpos, *lastdst, *lastsrc, *dstend;
uint64_t slen, sz, dsz, pending;
int rem, lenc, transp_count, hdr_ovr;
DEBUG_STAT_EN(double strt, en);
srcpos = src;
dstpos = dst;
@ -123,6 +124,7 @@ delta2_encode(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen, int
transp_count = 0;
hdr_ovr = 0;
DEBUG_STAT_EN(strt = get_wtime_millis());
while (slen > 0) {
if (slen > DELTA2_CHUNK) {
sz = DELTA2_CHUNK;
@ -174,8 +176,10 @@ delta2_encode(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen, int
memcpy(lastdst, lastsrc, pending);
}
*dstlen = dstpos - dst;
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "DELTA2: srclen: %" PRIu64 ", dstlen: %" PRIu64 "\n", srclen, *dstlen));
DEBUG_STAT_EN(fprintf(stderr, "DELTA2: transpositions: %d, header overhead: %d\n", transp_count, hdr_ovr));
DEBUG_STAT_EN(fprintf(stderr, "DELTA2: Processed at %.3f MB/s\n", get_mb_s(srclen, strt, en)));
}
return (0);
}
@ -195,12 +199,16 @@ delta2_encode_real(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen
return (-1);
gtot1 = ULL_MAX;
stride = 0;
sval = 0;
val = 0;
sz = sizeof (strides) / sizeof (strides[0]);
/*
* Estimate which stride length gives the max reduction given rle_thresh.
*/
for (st = 0; st < sz; st++) {
int gt;
snum = 0;
gtot2 = MAIN_HDR + LIT_HDR;
vl1 = 0;
@ -215,10 +223,9 @@ delta2_encode_real(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen
vld2 = vl2 - vl1;
if (vld1 != vld2) {
if (snum > rle_thresh) {
if (tot > 0) {
gtot2 += LIT_HDR;
gt = (tot > 0);
gtot2 += (LIT_HDR * gt);
tot = 0;
}
gtot2 += DELTA_HDR;
} else {
gtot2 += snum;
@ -233,16 +240,32 @@ delta2_encode_real(uchar_t *src, uint64_t srclen, uchar_t *dst, uint64_t *dstlen
}
if (snum > rle_thresh) {
gtot2 += DELTA_HDR;
/*
* If this ended partially into another table reset next scan
* point to before the table.
*/
val = cnt - snum;
} else {
gtot2 += snum;
/*
* If this ended partially into another table reset next scan
* point to before the table.
*/
if (snum >= st1 * 5)
val = cnt - snum;
}
if (gtot2 < gtot1) {
gtot1 = gtot2;
stride = st1;
sval = val;
}
}
if (!(gtot1 < srclen && srclen - gtot1 > (DELTA_HDR + LIT_HDR + MAIN_HDR))) {
if ( !(gtot1 < srclen && srclen - gtot1 > (DELTA_HDR + LIT_HDR + MAIN_HDR) && gtot1 < *dstlen) ) {
if (srclen >= DELTA2_CHUNK) {
if (sval > 0)
*dstlen = sval;
}
return (-1);
}

View file

@ -43,6 +43,8 @@ See also the bsc and libbsc web site:
#include <string.h>
#include <allocator.h>
#include <sys/types.h>
#include <stdio.h>
#include <utils.h>
#include "lzp.h"
@ -245,7 +247,9 @@ int64_t bsc_lzp_compress_serial(const unsigned char * input, unsigned char * out
int chunkSize;
int blockId;
int64_t outputPtr = 1 + 8 * nBlocks;
DEBUG_STAT_EN(double strt, en);
DEBUG_STAT_EN(strt = get_wtime_millis());
if (n > LZP_MAX_BLOCK)
chunkSize = LZP_MAX_BLOCK;
else
@ -270,7 +274,10 @@ int64_t bsc_lzp_compress_serial(const unsigned char * input, unsigned char * out
outputPtr += result;
}
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "LZP: Insize: %" PRId64 ", Outsize: %" PRId64 "\n", n, outputPtr));
DEBUG_STAT_EN(fprintf(stderr, "LZP: Processed at %.3f MB/s\n", get_mb_s(n, strt, en)));
return outputPtr;
}

21
main.c
View file

@ -199,6 +199,7 @@ preproc_compress(compress_func_ptr cmp_func, void *src, uint64_t srclen, void *d
{
uchar_t *dest = (uchar_t *)dst, type = 0;
int64_t result, _dstlen;
DEBUG_STAT_EN(double strt, en);
_dstlen = *dstlen;
if (lzp_preprocess) {
@ -233,10 +234,14 @@ preproc_compress(compress_func_ptr cmp_func, void *src, uint64_t srclen, void *d
*dest = type;
*((int64_t *)(dest + 1)) = htonll(srclen);
_dstlen = srclen;
DEBUG_STAT_EN(strt = get_wtime_millis());
result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, data);
DEBUG_STAT_EN(en = get_wtime_millis());
if (result > -1 && _dstlen < srclen) {
*dest |= PREPROC_COMPRESSED;
*dstlen = _dstlen + 9;
DEBUG_STAT_EN(fprintf(stderr, "Chunk compression speed %.3f MB/s\n", get_mb_s(srclen, strt, en)));
} else {
memcpy(dest+1, src, srclen);
*dstlen = srclen + 1;
@ -253,6 +258,7 @@ preproc_decompress(compress_func_ptr dec_func, void *src, uint64_t srclen, void
uchar_t *sorc = (uchar_t *)src, type;
int64_t result;
uint64_t _dstlen = *dstlen;
DEBUG_STAT_EN(double strt, en);
type = *sorc;
sorc++;
@ -261,8 +267,11 @@ preproc_decompress(compress_func_ptr dec_func, void *src, uint64_t srclen, void
*dstlen = ntohll(*((int64_t *)(sorc)));
sorc += 8;
srclen -= 8;
DEBUG_STAT_EN(strt = get_wtime_millis());
result = dec_func(sorc, srclen, dst, dstlen, level, chdr, data);
DEBUG_STAT_EN(en = get_wtime_millis());
if (result < 0) return (result);
DEBUG_STAT_EN(fprintf(stderr, "Chunk decompression speed %.3f MB/s\n", get_mb_s(srclen, strt, en)));
memcpy(src, dst, *dstlen);
srclen = *dstlen;
}
@ -445,8 +454,14 @@ redo:
rv = preproc_decompress(tdat->decompress, cmpbuf, dedupe_data_sz_cmp,
ubuf, &_chunksize, tdat->level, HDR, tdat->data, tdat->props);
} else {
DEBUG_STAT_EN(double strt, en);
DEBUG_STAT_EN(strt = get_wtime_millis());
rv = tdat->decompress(cmpbuf, dedupe_data_sz_cmp, ubuf, &_chunksize,
tdat->level, HDR, tdat->data);
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Chunk decompression speed %.3f MB/s\n",
get_mb_s(_chunksize, strt, en)));
}
if (rv == -1) {
tdat->len_cmp = 0;
@ -1208,8 +1223,14 @@ plain_index:
_chunksize, compressed_chunk + index_size_cmp, &_chunksize,
tdat->level, 0, tdat->data, tdat->props);
} else {
DEBUG_STAT_EN(double strt, en);
DEBUG_STAT_EN(strt = get_wtime_millis());
rv = tdat->compress(tdat->uncompressed_chunk + dedupe_index_sz, _chunksize,
compressed_chunk + index_size_cmp, &_chunksize, tdat->level, 0, tdat->data);
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Chunk compression speed %.3f MB/s\n",
get_mb_s(_chunksize, strt, en)));
}
/* Can't compress data just retain as-is. */

View file

@ -291,12 +291,14 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, int64_t *size, int64_t offs
heap_t heap;
DEBUG_STAT_EN(uint32_t max_count);
DEBUG_STAT_EN(max_count = 0);
DEBUG_STAT_EN(double strt, en);
length = offset;
last_offset = 0;
blknum = 0;
ctx->valid = 0;
cur_roll_checksum = 0;
DEBUG_STAT_EN(strt = get_wtime_millis());
if (ctx->fixed_flag) {
blknum = *size / ctx->rabin_poly_avg_block_size;
@ -484,8 +486,8 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, int64_t *size, int64_t offs
process_blocks:
// If we found at least a few chunks, perform dedup.
DEBUG_STAT_EN(printf("Original size: %" PRId64 ", blknum: %u\n", *size, blknum));
DEBUG_STAT_EN(printf("Number of maxlen blocks: %u\n", max_count));
DEBUG_STAT_EN(fprintf(stderr, "Original size: %" PRId64 ", blknum: %u\n", *size, blknum));
DEBUG_STAT_EN(fprintf(stderr, "Number of maxlen blocks: %u\n", max_count));
if (blknum > 2) {
int64_t pos, matchlen, pos1;
int valid = 1;
@ -605,10 +607,11 @@ process_blocks:
}
}
}
DEBUG_STAT_EN(printf("Total Hashtable bucket collisions: %u\n", hash_collisions));
DEBUG_STAT_EN(fprintf(stderr, "Total Hashtable bucket collisions: %u\n", hash_collisions));
dedupe_index_sz = (int64_t)blknum * RABIN_ENTRY_SIZE;
if (matchlen < dedupe_index_sz) {
DEBUG_STAT_EN(fprintf(stderr, "No Dedupe possible.\n"));
ctx->valid = 0;
return;
}
@ -639,7 +642,7 @@ process_blocks:
i++;
}
}
DEBUG_STAT_EN(printf("Merge count: %u\n", merge_count));
DEBUG_STAT_EN(fprintf(stderr, "Merge count: %u\n", merge_count));
/*
* Final pass update dedupe index and copy data.
@ -688,7 +691,9 @@ cont:
if (valid) {
uchar_t *cbuf = ctx->cbuf;
int64_t *entries;
DEBUG_STAT_EN(int64_t sz);
DEBUG_STAT_EN(sz = *size);
*((uint32_t *)cbuf) = htonl(blknum);
cbuf += sizeof (uint32_t);
entries = (int64_t *)cbuf;
@ -697,8 +702,10 @@ cont:
entries[2] = htonll(pos1 - dedupe_index_sz - RABIN_HDR_SIZE);
*size = pos1;
ctx->valid = 1;
DEBUG_STAT_EN(printf("Deduped size: %" PRId64 ", blknum: %u, delta_calls: %u, delta_fails: %u\n",
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "Deduped size: %" PRId64 ", blknum: %u, delta_calls: %u, delta_fails: %u\n",
*size, blknum, delta_calls, delta_fails));
DEBUG_STAT_EN(fprintf(stderr, "Dedupe speed %.3f MB/s\n", get_mb_s(sz, strt, en)));
/*
* Remaining header entries: size of compressed index and size of
* compressed data are inserted later via rabin_update_hdr, after actual compression!

View file

@ -23,6 +23,7 @@
#include <sys/types.h>
#include <sys/param.h>
#include <sys/time.h>
#include <fcntl.h>
#include <time.h>
#include <libgen.h>
@ -308,3 +309,24 @@ get_total_ram()
phys_pages = sysconf(_SC_PHYS_PAGES);
return (phys_pages * page_size);
}
double
get_wtime_millis(void)
{
struct timespec ts;
int rv;
rv = clock_gettime(CLOCK_MONOTONIC, &ts);
if (rv == 0)
return (ts.tv_sec * 1000 + ((double)ts.tv_nsec) / 1000000L);
return (1);
}
double
get_mb_s(uint64_t bytes, double strt, double en)
{
double bytes_sec;
bytes_sec = ((double)bytes / (en - strt)) * 1000;
return (BYTES_TO_MB(bytes_sec));
}

View file

@ -102,6 +102,8 @@ typedef int64_t bsize_t;
#define DEBUG_STAT_EN(...)
#endif
#define BYTES_TO_MB(x) ((x) / (1024 * 1024))
typedef struct {
uint32_t buf_extra;
int compress_mt_capable;
@ -143,6 +145,8 @@ extern int64_t Write(int fd, const void *buf, uint64_t count);
extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs,
algo_threads_type_t typ);
extern uint64_t get_total_ram();
extern double get_wtime_millis(void);
extern double get_mb_s(uint64_t bytes, double strt, double en);
/* Pointer type for compress and decompress functions. */
typedef int (*compress_func_ptr)(void *src, uint64_t srclen, void *dst,