Make DICT filter useful.

Improve data analysis in adaptive_compress.
This commit is contained in:
Moinak Ghosh 2014-09-20 21:49:06 +05:30
parent 071a9e2b26
commit 2e5f2d8aab
7 changed files with 90 additions and 70 deletions

View file

@ -251,7 +251,7 @@ adapt_compress(void *src, uint64_t srclen, void *dst,
int stype = PC_SUBTYPE(btype); int stype = PC_SUBTYPE(btype);
if (btype == TYPE_UNKNOWN || stype == TYPE_ARCHIVE_TAR) { if (btype == TYPE_UNKNOWN || stype == TYPE_ARCHIVE_TAR) {
uint64_t i, tot8b, tag1, tag2, tag3; uint64_t i, tot8b, tag1, tag2, tag3, lbytes;
double tagcnt, pct_tag; double tagcnt, pct_tag;
uchar_t cur_byte, prev_byte; uchar_t cur_byte, prev_byte;
/* /*
@ -261,10 +261,12 @@ adapt_compress(void *src, uint64_t srclen, void *dst,
tag1 = 0; tag1 = 0;
tag2 = 0; tag2 = 0;
tag3 = 0; tag3 = 0;
lbytes = 0;
prev_byte = cur_byte = 0; prev_byte = cur_byte = 0;
for (i = 0; i < srclen; i++) { for (i = 0; i < srclen; i++) {
cur_byte = src1[i]; cur_byte = src1[i];
tot8b += (cur_byte & 0x80); // This way for possible auto-vectorization tot8b += (cur_byte & 0x80); // This way for possible auto-vectorization
lbytes += (cur_byte < 32);
tag1 += (cur_byte == '<'); tag1 += (cur_byte == '<');
tag2 += (cur_byte == '>'); tag2 += (cur_byte == '>');
tag3 += ((prev_byte == '<') & (cur_byte == '/')); tag3 += ((prev_byte == '<') & (cur_byte == '/'));
@ -276,7 +278,7 @@ adapt_compress(void *src, uint64_t srclen, void *dst,
/* /*
* Heuristics for detecting BINARY vs generic TEXT vs XML data. * Heuristics for detecting BINARY vs generic TEXT vs XML data.
*/ */
tot8b /= 0x80; tot8b = tot8b / 0x80 + lbytes;
tagcnt = tag1 + tag2 + tag3; tagcnt = tag1 + tag2 + tag3;
pct_tag = tagcnt / (double)srclen; pct_tag = tagcnt / (double)srclen;
if (adat->adapt_mode == 2 && tot8b > FORTY_PCT(srclen)) { if (adat->adapt_mode == 2 && tot8b > FORTY_PCT(srclen)) {

View file

@ -173,6 +173,8 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len
if (pctx->btype != pctx->ctype) { if (pctx->btype != pctx->ctype) {
if (pctx->btype == TYPE_UNKNOWN || pctx->arc_buf_pos == 0) { if (pctx->btype == TYPE_UNKNOWN || pctx->arc_buf_pos == 0) {
pctx->btype = pctx->ctype; pctx->btype = pctx->ctype;
if (pctx->arc_buf_pos != 0)
pctx->interesting = 1;
} else { } else {
if (pctx->arc_buf_pos < pctx->min_chunk) { if (pctx->arc_buf_pos < pctx->min_chunk) {
int diff = pctx->min_chunk - (int)(pctx->arc_buf_pos); int diff = pctx->min_chunk - (int)(pctx->arc_buf_pos);
@ -180,6 +182,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len
pctx->btype = pctx->ctype; pctx->btype = pctx->ctype;
else else
pctx->ctype = pctx->btype; pctx->ctype = pctx->btype;
pctx->interesting = 1;
} else { } else {
pctx->arc_writing = 0; pctx->arc_writing = 0;
Sem_Post(&(pctx->read_sem)); Sem_Post(&(pctx->read_sem));

View file

@ -25,44 +25,30 @@
#include "utils.h" #include "utils.h"
int int
analyze_buffer(void *src, uint64_t srclen, int btype, int adapt_mode) analyze_buffer(void *src, uint64_t srclen)
{ {
uchar_t *src1 = (uchar_t *)src; uchar_t *src1 = (uchar_t *)src;
int stype = PC_SUBTYPE(btype); uint64_t i, tot8b, lbytes;
if (btype == TYPE_UNKNOWN || stype == TYPE_ARCHIVE_TAR) {
uint32_t freq[256], freq0x80[2] = {0};
uint64_t i, alphabetNum = 0, tot8b = 0;
uchar_t cur_byte; uchar_t cur_byte;
int btype = TYPE_UNKNOWN;
/* /*
* Count number of 8-bit binary bytes and XML tags in source. * Count number of 8-bit binary bytes in source
*/ */
tot8b = 0; tot8b = 0;
lbytes = 0;
for (i = 0; i < srclen; i++) { for (i = 0; i < srclen; i++) {
cur_byte = src1[i]; cur_byte = src1[i];
tot8b += (cur_byte & 0x80); // This way for possible auto-vectorization tot8b += (cur_byte & 0x80); // This way for possible auto-vectorization
freq[cur_byte]++; lbytes += (cur_byte < 32);
} }
for (i = 0; i < 256; i++)
freq0x80[i>>7]+=freq[i];
for(i = 'a'; i <= 'z'; i++)
alphabetNum+=freq[i];
/* /*
* Heuristics for detecting BINARY vs generic TEXT * Heuristics for detecting BINARY vs generic TEXT
*/ */
tot8b /= 0x80; tot8b /= 0x80;
if (tot8b < (srclen>>2 + srclen>>3)) { if (tot8b == 0 && lbytes < ((srclen>>1) + (srclen>>2) + (srclen>>3))) {
btype = TYPE_TEXT; btype = TYPE_TEXT;
if (freq0x80[1]<(srclen>>3) && (freq[' ']>(srclen>>7))
&& (freq['a']+freq['e']+freq['t']>(srclen>>4))
&& alphabetNum>(srclen>>2)) {
btype |= TYPE_ENGLISH;
}
}
} }
return (btype); return (btype);

View file

@ -25,6 +25,14 @@
#ifndef _ANALYZER_H #ifndef _ANALYZER_H
#define _ANALYZER_H #define _ANALYZER_H
int analyze_buffer(void *src, uint64_t srclen, int btype, int adapt_mode); #ifdef __cplusplus
extern "C" {
#endif
extern int analyze_buffer(void *src, uint64_t srclen);
#ifdef __cplusplus
}
#endif
#endif #endif

View file

@ -31,8 +31,14 @@
#include <string.h> #include <string.h>
#include <strings.h> #include <strings.h>
#include <stdint.h> #include <stdint.h>
#include <stdio.h>
#include "DictFilter.h" #include "DictFilter.h"
#include "Common.h" #include "Common.h"
#include "utils.h"
extern "C" {
extern int analyze_buffer(void *src, uint64_t srclen);
}
class DictFilter class DictFilter
{ {
@ -264,17 +270,27 @@ dict_encode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64
DictFilter *df = static_cast<DictFilter *>(dict_ctx); DictFilter *df = static_cast<DictFilter *>(dict_ctx);
u32 fl = fromlen; u32 fl = fromlen;
u32 dl = *dstlen; u32 dl = *dstlen;
u8 *dst; int atype;
uchar_t *dst;
DEBUG_STAT_EN(double strt, en);
if (fromlen > UINT32_MAX) DEBUG_STAT_EN(strt = get_wtime_millis());
return (-1); atype = analyze_buffer(from, fromlen);
U32_P(to) = LE32(fromlen); if (PC_TYPE(atype) == TYPE_TEXT) {
U32_P(to) = LE32(fl);
dst = to + 4; dst = to + 4;
dl -= 4; dl -= 4;
if (df->Forward_Dict(from, fl, dst, &dl)) { if (df->Forward_Dict(from, fl, dst, &dl)) {
*dstlen = dl + 4; *dstlen = dl + 8;
return (0); DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "DICT: fromlen: %" PRIu64 ", dstlen: %" PRIu64 "\n",
fromlen, *dstlen));
DEBUG_STAT_EN(fprintf(stderr, "DICT: Processed at %.3f MB/s\n",
get_mb_s(fromlen, strt, en)));
return (1);
} }
}
DEBUG_STAT_EN(fprintf(stderr, "No DICT\n"));
return (-1); return (-1);
} }
@ -285,10 +301,13 @@ dict_decode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64
u32 fl = fromlen; u32 fl = fromlen;
u32 dl; u32 dl;
u8 *src; u8 *src;
DEBUG_STAT_EN(double strt, en);
DEBUG_STAT_EN(strt = get_wtime_millis());
dl = U32_P(from); dl = U32_P(from);
if (dl > *dstlen) { if (dl > *dstlen) {
log_msg(LOG_ERR, 0, "Destination overflow in dict_decode."); log_msg(LOG_ERR, 0, "Destination overflow in dict_decode. Need: %" PRIu64 ", Got: %" PRIu64 "\n",
dl, *dstlen);
return (-1); return (-1);
} }
*dstlen = dl; *dstlen = dl;
@ -296,8 +315,16 @@ dict_decode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64
fl -= 4; fl -= 4;
df->Inverse_Dict(src, fl, to, &dl); df->Inverse_Dict(src, fl, to, &dl);
if (dl < *dstlen) if (dl < *dstlen) {
log_msg(LOG_ERR, 0, "dict_decode: Expected: %" PRIu64 ", Got: %" PRIu64 "\n",
*dstlen, dl);
return (-1); return (-1);
}
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "DICT: fromlen: %" PRIu64 ", dstlen: %" PRIu64 "\n",
fromlen, *dstlen));
DEBUG_STAT_EN(fprintf(stderr, "DICT: Processed at %.3f MB/s\n",
get_mb_s(fromlen, strt, en)));
return (0); return (0);
} }

View file

@ -56,7 +56,6 @@
#include <errno.h> #include <errno.h>
#include <pc_archive.h> #include <pc_archive.h>
#include <filters/dispack/dis.hpp> #include <filters/dispack/dis.hpp>
#include "analyzer.h"
#include "filters/dict/DictFilter.h" #include "filters/dict/DictFilter.h"
/* /*
@ -204,13 +203,13 @@ show_compression_stats(pc_ctx_t *pctx)
static int static int
preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t srclen, preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t srclen,
void *dst, uint64_t *dstlen, int level, uchar_t chdr, int btype, void *data, void *dst, uint64_t *dstlen, int level, uchar_t chdr, int btype, void *data,
algo_props_t *props) algo_props_t *props, int interesting)
{ {
uchar_t *dest = (uchar_t *)dst, type = 0, atype; uchar_t *dest = (uchar_t *)dst, type = 0;
int64_t result; int64_t result;
uint64_t _dstlen, fromlen; uint64_t _dstlen, fromlen;
uchar_t *from, *to; uchar_t *from, *to;
int stype; int stype, dict;
DEBUG_STAT_EN(double strt, en); DEBUG_STAT_EN(double strt, en);
_dstlen = *dstlen; _dstlen = *dstlen;
@ -219,6 +218,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t
fromlen = srclen; fromlen = srclen;
result = 0; result = 0;
stype = PC_SUBTYPE(btype); stype = PC_SUBTYPE(btype);
dict = 0;
/* /*
* If Dispack is enabled it has to be done first since Dispack analyses the * If Dispack is enabled it has to be done first since Dispack analyses the
@ -240,25 +240,12 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t
} }
} }
/*
* The analyzer is run below only for non-archive mode. When archiving the
* archiver thread runs analyzer on incremental blocks and sets the type
* accordingly.
*/
atype = btype;
/*
* Run an analyzer on the data. At present the analyzer only tries
* to detect if this is text for running the dict filter.
*/
if (pctx->enable_analyzer) {
atype = analyze_buffer(src, srclen, btype, pctx->adapt_mode);
}
/* /*
* Enabling LZP also enables the DICT filter since we are dealing with text * Enabling LZP also enables the DICT filter since we are dealing with text
* in any case. * in any case.
*/ */
if (pctx->lzp_preprocess && PC_TYPE(atype) == TYPE_TEXT) { if (pctx->lzp_preprocess && (PC_TYPE(btype) == TYPE_UNKNOWN ||
PC_TYPE(btype) == TYPE_TEXT || interesting)) {
void *dct = new_dict_context(); void *dct = new_dict_context();
_dstlen = fromlen; _dstlen = fromlen;
result = dict_encode(dct, from, fromlen, to, &_dstlen); result = dict_encode(dct, from, fromlen, to, &_dstlen);
@ -270,8 +257,10 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t
to = tmp; to = tmp;
fromlen = _dstlen; fromlen = _dstlen;
type |= PREPROC_TYPE_DICT; type |= PREPROC_TYPE_DICT;
dict = result;
} }
} }
#ifndef _MPLV2_LICENSE_ #ifndef _MPLV2_LICENSE_
if (pctx->lzp_preprocess && stype != TYPE_BMP && stype != TYPE_TIFF) { if (pctx->lzp_preprocess && stype != TYPE_BMP && stype != TYPE_TIFF) {
int hashsize; int hashsize;
@ -321,7 +310,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t
U64_P(dest + 1) = htonll(srclen); U64_P(dest + 1) = htonll(srclen);
_dstlen = srclen; _dstlen = srclen;
DEBUG_STAT_EN(strt = get_wtime_millis()); DEBUG_STAT_EN(strt = get_wtime_millis());
result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, btype, data); result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, (dict?TYPE_TEXT:btype), data);
DEBUG_STAT_EN(en = get_wtime_millis()); DEBUG_STAT_EN(en = get_wtime_millis());
if (result > -1 && _dstlen < srclen) { if (result > -1 && _dstlen < srclen) {
@ -1690,7 +1679,7 @@ plain_index:
rv = preproc_compress(pctx, tdat->compress, rv = preproc_compress(pctx, tdat->compress,
tdat->uncompressed_chunk + dedupe_index_sz, _chunksize, tdat->uncompressed_chunk + dedupe_index_sz, _chunksize,
compressed_chunk + index_size_cmp, &_chunksize, tdat->level, 0, compressed_chunk + index_size_cmp, &_chunksize, tdat->level, 0,
tdat->btype, tdat->data, tdat->props); tdat->btype, tdat->data, tdat->props, tdat->interesting);
} else { } else {
DEBUG_STAT_EN(double strt, en); DEBUG_STAT_EN(double strt, en);
@ -1718,7 +1707,7 @@ plain_index:
if (pctx->preprocess_mode) { if (pctx->preprocess_mode) {
rv = preproc_compress(pctx, tdat->compress, tdat->uncompressed_chunk, rv = preproc_compress(pctx, tdat->compress, tdat->uncompressed_chunk,
tdat->rbytes, compressed_chunk, &_chunksize, tdat->level, 0, tdat->rbytes, compressed_chunk, &_chunksize, tdat->level, 0,
tdat->btype, tdat->data, tdat->props); tdat->btype, tdat->data, tdat->props, tdat->interesting);
} else { } else {
DEBUG_STAT_EN(double strt, en); DEBUG_STAT_EN(double strt, en);
@ -2449,6 +2438,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
* Read the first chunk into a spare buffer (a simple double-buffering). * Read the first chunk into a spare buffer (a simple double-buffering).
*/ */
file_offset = 0; file_offset = 0;
pctx->interesting = 0;
if (pctx->enable_rabin_split) { if (pctx->enable_rabin_split) {
rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props, rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props,
pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL,
@ -2520,6 +2510,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
*/ */
tdat->id = pctx->chunk_num; tdat->id = pctx->chunk_num;
tdat->rbytes = rbytes; tdat->rbytes = rbytes;
tdat->interesting = pctx->interesting;
tdat->btype = pctx->btype; // Have to copy btype for this buffer as pctx->btype will change tdat->btype = pctx->btype; // Have to copy btype for this buffer as pctx->btype will change
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global)) { if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global)) {
tmp = tdat->cmp_seg; tmp = tdat->cmp_seg;
@ -2568,6 +2559,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
* Read the next buffer we want to process while previous * Read the next buffer we want to process while previous
* buffer is in progress. * buffer is in progress.
*/ */
pctx->interesting = 0;
if (pctx->enable_rabin_split) { if (pctx->enable_rabin_split) {
if (pctx->archive_mode) if (pctx->archive_mode)
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize,

View file

@ -245,6 +245,7 @@ typedef struct pc_ctx {
uint64_t arc_buf_size, arc_buf_pos; uint64_t arc_buf_size, arc_buf_pos;
int arc_closed, arc_writing; int arc_closed, arc_writing;
int btype, ctype; int btype, ctype;
int interesting;
int min_chunk; int min_chunk;
int enable_packjpg; int enable_packjpg;
int enable_wavpack; int enable_wavpack;
@ -286,6 +287,7 @@ struct cmp_data {
compress_func_ptr compress; compress_func_ptr compress;
compress_func_ptr decompress; compress_func_ptr decompress;
int cancel; int cancel;
int interesting;
Sem_t start_sem; Sem_t start_sem;
Sem_t cmp_done_sem; Sem_t cmp_done_sem;
Sem_t write_done_sem; Sem_t write_done_sem;