From 2e5f2d8aab80db00139b2f0a6924b4f18a050868 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Sat, 20 Sep 2014 21:49:06 +0530 Subject: [PATCH] Make DICT filter useful. Improve data analysis in adaptive_compress. --- adaptive_compress.c | 6 +++-- archive/pc_archive.c | 3 +++ filters/analyzer/analyzer.c | 54 ++++++++++++++----------------------- filters/analyzer/analyzer.h | 10 ++++++- filters/dict/DictFilter.cpp | 49 +++++++++++++++++++++++++-------- pcompress.c | 36 ++++++++++--------------- pcompress.h | 2 ++ 7 files changed, 90 insertions(+), 70 deletions(-) diff --git a/adaptive_compress.c b/adaptive_compress.c index 26d19c4..bbce293 100644 --- a/adaptive_compress.c +++ b/adaptive_compress.c @@ -251,7 +251,7 @@ adapt_compress(void *src, uint64_t srclen, void *dst, int stype = PC_SUBTYPE(btype); if (btype == TYPE_UNKNOWN || stype == TYPE_ARCHIVE_TAR) { - uint64_t i, tot8b, tag1, tag2, tag3; + uint64_t i, tot8b, tag1, tag2, tag3, lbytes; double tagcnt, pct_tag; uchar_t cur_byte, prev_byte; /* @@ -261,10 +261,12 @@ adapt_compress(void *src, uint64_t srclen, void *dst, tag1 = 0; tag2 = 0; tag3 = 0; + lbytes = 0; prev_byte = cur_byte = 0; for (i = 0; i < srclen; i++) { cur_byte = src1[i]; tot8b += (cur_byte & 0x80); // This way for possible auto-vectorization + lbytes += (cur_byte < 32); tag1 += (cur_byte == '<'); tag2 += (cur_byte == '>'); tag3 += ((prev_byte == '<') & (cur_byte == '/')); @@ -276,7 +278,7 @@ adapt_compress(void *src, uint64_t srclen, void *dst, /* * Heuristics for detecting BINARY vs generic TEXT vs XML data. */ - tot8b /= 0x80; + tot8b = tot8b / 0x80 + lbytes; tagcnt = tag1 + tag2 + tag3; pct_tag = tagcnt / (double)srclen; if (adat->adapt_mode == 2 && tot8b > FORTY_PCT(srclen)) { diff --git a/archive/pc_archive.c b/archive/pc_archive.c index 506c7d4..88edf59 100644 --- a/archive/pc_archive.c +++ b/archive/pc_archive.c @@ -173,6 +173,8 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len if (pctx->btype != pctx->ctype) { if (pctx->btype == TYPE_UNKNOWN || pctx->arc_buf_pos == 0) { pctx->btype = pctx->ctype; + if (pctx->arc_buf_pos != 0) + pctx->interesting = 1; } else { if (pctx->arc_buf_pos < pctx->min_chunk) { int diff = pctx->min_chunk - (int)(pctx->arc_buf_pos); @@ -180,6 +182,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len pctx->btype = pctx->ctype; else pctx->ctype = pctx->btype; + pctx->interesting = 1; } else { pctx->arc_writing = 0; Sem_Post(&(pctx->read_sem)); diff --git a/filters/analyzer/analyzer.c b/filters/analyzer/analyzer.c index 1dce8c4..a52824a 100644 --- a/filters/analyzer/analyzer.c +++ b/filters/analyzer/analyzer.c @@ -25,44 +25,30 @@ #include "utils.h" int -analyze_buffer(void *src, uint64_t srclen, int btype, int adapt_mode) +analyze_buffer(void *src, uint64_t srclen) { uchar_t *src1 = (uchar_t *)src; - int stype = PC_SUBTYPE(btype); + uint64_t i, tot8b, lbytes; + uchar_t cur_byte; + int btype = TYPE_UNKNOWN; - if (btype == TYPE_UNKNOWN || stype == TYPE_ARCHIVE_TAR) { - uint32_t freq[256], freq0x80[2] = {0}; - uint64_t i, alphabetNum = 0, tot8b = 0; - uchar_t cur_byte; + /* + * Count number of 8-bit binary bytes in source + */ + tot8b = 0; + lbytes = 0; + for (i = 0; i < srclen; i++) { + cur_byte = src1[i]; + tot8b += (cur_byte & 0x80); // This way for possible auto-vectorization + lbytes += (cur_byte < 32); + } - /* - * Count number of 8-bit binary bytes and XML tags in source. - */ - tot8b = 0; - for (i = 0; i < srclen; i++) { - cur_byte = src1[i]; - tot8b += (cur_byte & 0x80); // This way for possible auto-vectorization - freq[cur_byte]++; - } - - for (i = 0; i < 256; i++) - freq0x80[i>>7]+=freq[i]; - - for(i = 'a'; i <= 'z'; i++) - alphabetNum+=freq[i]; - - /* - * Heuristics for detecting BINARY vs generic TEXT - */ - tot8b /= 0x80; - if (tot8b < (srclen>>2 + srclen>>3)) { - btype = TYPE_TEXT; - if (freq0x80[1]<(srclen>>3) && (freq[' ']>(srclen>>7)) - && (freq['a']+freq['e']+freq['t']>(srclen>>4)) - && alphabetNum>(srclen>>2)) { - btype |= TYPE_ENGLISH; - } - } + /* + * Heuristics for detecting BINARY vs generic TEXT + */ + tot8b /= 0x80; + if (tot8b == 0 && lbytes < ((srclen>>1) + (srclen>>2) + (srclen>>3))) { + btype = TYPE_TEXT; } return (btype); diff --git a/filters/analyzer/analyzer.h b/filters/analyzer/analyzer.h index 922b596..682111d 100644 --- a/filters/analyzer/analyzer.h +++ b/filters/analyzer/analyzer.h @@ -25,6 +25,14 @@ #ifndef _ANALYZER_H #define _ANALYZER_H -int analyze_buffer(void *src, uint64_t srclen, int btype, int adapt_mode); +#ifdef __cplusplus +extern "C" { +#endif + +extern int analyze_buffer(void *src, uint64_t srclen); + +#ifdef __cplusplus +} +#endif #endif diff --git a/filters/dict/DictFilter.cpp b/filters/dict/DictFilter.cpp index bb0d151..759bd73 100644 --- a/filters/dict/DictFilter.cpp +++ b/filters/dict/DictFilter.cpp @@ -31,8 +31,14 @@ #include #include #include +#include #include "DictFilter.h" #include "Common.h" +#include "utils.h" + +extern "C" { +extern int analyze_buffer(void *src, uint64_t srclen); +} class DictFilter { @@ -264,17 +270,27 @@ dict_encode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64 DictFilter *df = static_cast(dict_ctx); u32 fl = fromlen; u32 dl = *dstlen; - u8 *dst; + int atype; + uchar_t *dst; + DEBUG_STAT_EN(double strt, en); - if (fromlen > UINT32_MAX) - return (-1); - U32_P(to) = LE32(fromlen); - dst = to + 4; - dl -= 4; - if (df->Forward_Dict(from, fl, dst, &dl)) { - *dstlen = dl + 4; - return (0); + DEBUG_STAT_EN(strt = get_wtime_millis()); + atype = analyze_buffer(from, fromlen); + if (PC_TYPE(atype) == TYPE_TEXT) { + U32_P(to) = LE32(fl); + dst = to + 4; + dl -= 4; + if (df->Forward_Dict(from, fl, dst, &dl)) { + *dstlen = dl + 8; + DEBUG_STAT_EN(en = get_wtime_millis()); + DEBUG_STAT_EN(fprintf(stderr, "DICT: fromlen: %" PRIu64 ", dstlen: %" PRIu64 "\n", + fromlen, *dstlen)); + DEBUG_STAT_EN(fprintf(stderr, "DICT: Processed at %.3f MB/s\n", + get_mb_s(fromlen, strt, en))); + return (1); + } } + DEBUG_STAT_EN(fprintf(stderr, "No DICT\n")); return (-1); } @@ -285,10 +301,13 @@ dict_decode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64 u32 fl = fromlen; u32 dl; u8 *src; + DEBUG_STAT_EN(double strt, en); + DEBUG_STAT_EN(strt = get_wtime_millis()); dl = U32_P(from); if (dl > *dstlen) { - log_msg(LOG_ERR, 0, "Destination overflow in dict_decode."); + log_msg(LOG_ERR, 0, "Destination overflow in dict_decode. Need: %" PRIu64 ", Got: %" PRIu64 "\n", + dl, *dstlen); return (-1); } *dstlen = dl; @@ -296,8 +315,16 @@ dict_decode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64 fl -= 4; df->Inverse_Dict(src, fl, to, &dl); - if (dl < *dstlen) + if (dl < *dstlen) { + log_msg(LOG_ERR, 0, "dict_decode: Expected: %" PRIu64 ", Got: %" PRIu64 "\n", + *dstlen, dl); return (-1); + } + DEBUG_STAT_EN(en = get_wtime_millis()); + DEBUG_STAT_EN(fprintf(stderr, "DICT: fromlen: %" PRIu64 ", dstlen: %" PRIu64 "\n", + fromlen, *dstlen)); + DEBUG_STAT_EN(fprintf(stderr, "DICT: Processed at %.3f MB/s\n", + get_mb_s(fromlen, strt, en))); return (0); } diff --git a/pcompress.c b/pcompress.c index d6b9f2a..e05a2b9 100644 --- a/pcompress.c +++ b/pcompress.c @@ -56,7 +56,6 @@ #include #include #include -#include "analyzer.h" #include "filters/dict/DictFilter.h" /* @@ -204,13 +203,13 @@ show_compression_stats(pc_ctx_t *pctx) static int preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t srclen, void *dst, uint64_t *dstlen, int level, uchar_t chdr, int btype, void *data, - algo_props_t *props) + algo_props_t *props, int interesting) { - uchar_t *dest = (uchar_t *)dst, type = 0, atype; + uchar_t *dest = (uchar_t *)dst, type = 0; int64_t result; uint64_t _dstlen, fromlen; uchar_t *from, *to; - int stype; + int stype, dict; DEBUG_STAT_EN(double strt, en); _dstlen = *dstlen; @@ -219,6 +218,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t fromlen = srclen; result = 0; stype = PC_SUBTYPE(btype); + dict = 0; /* * If Dispack is enabled it has to be done first since Dispack analyses the @@ -240,25 +240,12 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t } } - /* - * The analyzer is run below only for non-archive mode. When archiving the - * archiver thread runs analyzer on incremental blocks and sets the type - * accordingly. - */ - atype = btype; - /* - * Run an analyzer on the data. At present the analyzer only tries - * to detect if this is text for running the dict filter. - */ - if (pctx->enable_analyzer) { - atype = analyze_buffer(src, srclen, btype, pctx->adapt_mode); - } - /* * Enabling LZP also enables the DICT filter since we are dealing with text * in any case. */ - if (pctx->lzp_preprocess && PC_TYPE(atype) == TYPE_TEXT) { + if (pctx->lzp_preprocess && (PC_TYPE(btype) == TYPE_UNKNOWN || + PC_TYPE(btype) == TYPE_TEXT || interesting)) { void *dct = new_dict_context(); _dstlen = fromlen; result = dict_encode(dct, from, fromlen, to, &_dstlen); @@ -270,8 +257,10 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t to = tmp; fromlen = _dstlen; type |= PREPROC_TYPE_DICT; + dict = result; } } + #ifndef _MPLV2_LICENSE_ if (pctx->lzp_preprocess && stype != TYPE_BMP && stype != TYPE_TIFF) { int hashsize; @@ -321,7 +310,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t U64_P(dest + 1) = htonll(srclen); _dstlen = srclen; DEBUG_STAT_EN(strt = get_wtime_millis()); - result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, btype, data); + result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, (dict?TYPE_TEXT:btype), data); DEBUG_STAT_EN(en = get_wtime_millis()); if (result > -1 && _dstlen < srclen) { @@ -1690,7 +1679,7 @@ plain_index: rv = preproc_compress(pctx, tdat->compress, tdat->uncompressed_chunk + dedupe_index_sz, _chunksize, compressed_chunk + index_size_cmp, &_chunksize, tdat->level, 0, - tdat->btype, tdat->data, tdat->props); + tdat->btype, tdat->data, tdat->props, tdat->interesting); } else { DEBUG_STAT_EN(double strt, en); @@ -1718,7 +1707,7 @@ plain_index: if (pctx->preprocess_mode) { rv = preproc_compress(pctx, tdat->compress, tdat->uncompressed_chunk, tdat->rbytes, compressed_chunk, &_chunksize, tdat->level, 0, - tdat->btype, tdat->data, tdat->props); + tdat->btype, tdat->data, tdat->props, tdat->interesting); } else { DEBUG_STAT_EN(double strt, en); @@ -2449,6 +2438,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev * Read the first chunk into a spare buffer (a simple double-buffering). */ file_offset = 0; + pctx->interesting = 0; if (pctx->enable_rabin_split) { rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props, pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL, @@ -2520,6 +2510,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev */ tdat->id = pctx->chunk_num; tdat->rbytes = rbytes; + tdat->interesting = pctx->interesting; tdat->btype = pctx->btype; // Have to copy btype for this buffer as pctx->btype will change if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global)) { tmp = tdat->cmp_seg; @@ -2568,6 +2559,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev * Read the next buffer we want to process while previous * buffer is in progress. */ + pctx->interesting = 0; if (pctx->enable_rabin_split) { if (pctx->archive_mode) rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, diff --git a/pcompress.h b/pcompress.h index 9787f6c..5546463 100644 --- a/pcompress.h +++ b/pcompress.h @@ -245,6 +245,7 @@ typedef struct pc_ctx { uint64_t arc_buf_size, arc_buf_pos; int arc_closed, arc_writing; int btype, ctype; + int interesting; int min_chunk; int enable_packjpg; int enable_wavpack; @@ -286,6 +287,7 @@ struct cmp_data { compress_func_ptr compress; compress_func_ptr decompress; int cancel; + int interesting; Sem_t start_sem; Sem_t cmp_done_sem; Sem_t write_done_sem;