From e3c32ed6d67eff4016e1302bc87b2b28b05f68b7 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Wed, 24 Sep 2014 21:54:36 +0530 Subject: [PATCH] Remove unneeded archive writing function. Improve filter scratch buffer handling. Improve memory accounting. Remove delayed allocation when compressing. Allows better memory estimation. Some cstyle fixes. --- archive/pc_arc_filter.c | 68 ++++++---------------------- archive/pc_arc_filter.h | 17 +++++++ archive/pc_archive.c | 17 +++++++ archive/pc_archive.h | 2 + archive/wavpack_helper.c | 24 +++++----- pcompress.c | 95 ++++++++++++++++++++++++---------------- rabin/rabin_dedup.c | 9 +--- rabin/rabin_dedup.h | 3 +- utils/utils.c | 3 +- 9 files changed, 124 insertions(+), 114 deletions(-) diff --git a/archive/pc_arc_filter.c b/archive/pc_arc_filter.c index 0a4d226..e9fd7af 100644 --- a/archive/pc_arc_filter.c +++ b/archive/pc_arc_filter.c @@ -37,18 +37,9 @@ #include #include #include -#include -#include #include "pc_arc_filter.h" #include "pc_archive.h" -#ifndef _MPLV2_LICENSE_ -# define HELPER_DEF_BUFSIZ (512 * 1024) -# define FILE_SIZE_LIMIT (32 * 1024 * 1024) -# define PJG_APPVERSION1 (25) -# define PJG_APPVERSION2 (25) -#endif - struct scratch_buffer { uchar_t *in_buff; size_t in_bufflen; @@ -164,36 +155,6 @@ copy_archive_data(struct archive *ar, uchar_t *out_buf) return (tot); } -/* - * Copy the given buffer into the archive stream. - */ -static ssize_t -write_archive_data(struct archive *aw, uchar_t *out_buf, size_t len, int block_size) -{ - int64_t offset; - uchar_t *buff; - int r; - size_t tot; - - buff = out_buf; - offset = 0; - tot = len; - while (len > 0) { - if (len < block_size) - block_size = len; - r = (int)archive_write_data_block(aw, buff, block_size, offset); - if (r < ARCHIVE_WARN) - r = ARCHIVE_WARN; - if (r != ARCHIVE_OK) { - return (r); - } - offset += block_size; - len -= block_size; - buff += block_size; - } - return (tot); -} - #ifndef _MPLV2_LICENSE_ int pjg_version_supported(char ver) @@ -211,7 +172,7 @@ packjpg_filter(struct filter_info *fi, void *filter_private) len = archive_entry_size(fi->entry); len1 = len; - if (len > FILE_SIZE_LIMIT) // Bork on massive JPEGs + if (len > PJG_FILE_SIZE_LIMIT) // Bork on massive JPEGs return (FILTER_RETURN_SKIP); if (fi->compressing) { @@ -264,8 +225,7 @@ packjpg_filter(struct filter_info *fi, void *filter_private) * version number. We also check if it is supported. */ if (mapbuf[0] != 'J' || mapbuf[1] != 'S' || !pjg_version_supported(mapbuf[2])) { - return (write_archive_data(fi->target_arc, sdat->in_buff, - len, fi->block_size)); + return (archive_write_data(fi->target_arc, sdat->in_buff, len)); } } @@ -301,11 +261,11 @@ packjpg_filter(struct filter_info *fi, void *filter_private) * soft error to continue the archive extraction. */ free(out); - if (write_archive_data(fi->target_arc, mapbuf, len1, fi->block_size) < len1) + if (archive_write_data(fi->target_arc, mapbuf, len1) < len1) return (FILTER_RETURN_ERROR); return (FILTER_RETURN_SOFT_ERROR); } - rv = write_archive_data(fi->target_arc, out, len, fi->block_size); + rv = archive_write_data(fi->target_arc, out, len); free(out); return (rv); } @@ -320,7 +280,7 @@ packpnm_filter(struct filter_info *fi, void *filter_private) len = archive_entry_size(fi->entry); len1 = len; - if (len > FILE_SIZE_LIMIT) // Bork on massive JPEGs + if (len > PJG_FILE_SIZE_LIMIT) // Bork on massive JPEGs return (FILTER_RETURN_SKIP); if (fi->compressing) { @@ -367,8 +327,7 @@ packpnm_filter(struct filter_info *fi, void *filter_private) * Write the raw data and skip. */ if (identify_pnm_type(mapbuf, len - 8) != 2) { - return (write_archive_data(fi->target_arc, sdat->in_buff, - len, fi->block_size)); + return (archive_write_data(fi->target_arc, sdat->in_buff, len)); } } @@ -404,11 +363,11 @@ packpnm_filter(struct filter_info *fi, void *filter_private) * soft error to continue the archive extraction. */ free(out); - if (write_archive_data(fi->target_arc, mapbuf, len1, fi->block_size) < len1) + if (archive_write_data(fi->target_arc, mapbuf, len1) < len1) return (FILTER_RETURN_ERROR); return (FILTER_RETURN_SOFT_ERROR); } - rv = write_archive_data(fi->target_arc, out, len, fi->block_size); + rv = archive_write_data(fi->target_arc, out, len); free(out); return (rv); } @@ -425,7 +384,7 @@ wavpack_filter(struct filter_info *fi, void *filter_private) len = archive_entry_size(fi->entry); len1 = len; - if (len > FILE_SIZE_LIMIT) // Bork on massive JPEGs + if (len > WVPK_FILE_SIZE_LIMIT) return (FILTER_RETURN_SKIP); if (fi->compressing) { @@ -474,9 +433,8 @@ wavpack_filter(struct filter_info *fi, void *filter_private) * Write the raw data and skip. */ wpkstr = (char *)mapbuf; - if (strncmp(wpkstr, "wvpk", 4) == 0) { - return (write_archive_data(fi->target_arc, sdat->in_buff, - len, fi->block_size)); + if (strncmp(wpkstr, "wvpk", 4) != 0) { + return (archive_write_data(fi->target_arc, sdat->in_buff, len)); } } @@ -512,11 +470,11 @@ wavpack_filter(struct filter_info *fi, void *filter_private) * soft error to continue the archive extraction. */ free(out); - if (write_archive_data(fi->target_arc, mapbuf, len1, fi->block_size) < len1) + if (archive_write_data(fi->target_arc, mapbuf, len1) < len1) return (FILTER_RETURN_ERROR); return (FILTER_RETURN_SOFT_ERROR); } - rv = write_archive_data(fi->target_arc, out, len, fi->block_size); + rv = archive_write_data(fi->target_arc, out, len); free(out); return (rv); } diff --git a/archive/pc_arc_filter.h b/archive/pc_arc_filter.h index 00e80f9..0d5e9e7 100644 --- a/archive/pc_arc_filter.h +++ b/archive/pc_arc_filter.h @@ -42,6 +42,21 @@ extern "C" { #define FILTER_RETURN_SOFT_ERROR (-2) #define FILTER_XATTR_ENTRY "_._pc_filter_xattr" +#define HELPER_DEF_BUFSIZ (512 * 1024) +#define WVPK_FILE_SIZE_LIMIT (18 * 1024 * 1024) + +/* + * The biggest scratch buffer reqd by filter routines. + * Currently this is the WavPack filter buffer. + */ +#define FILTER_SCRATCH_SIZE_MAX WVPK_FILE_SIZE_LIMIT + +#ifndef _MPLV2_LICENSE_ +# define PJG_FILE_SIZE_LIMIT (8 * 1024 * 1024) +# define PJG_APPVERSION1 (25) +# define PJG_APPVERSION2 (25) +#endif + struct filter_info { struct archive *source_arc; struct archive *target_arc; @@ -50,6 +65,8 @@ struct filter_info { int compressing, block_size; int *type_ptr; int cmp_level; + uchar_t scratch_buffer; + size_t scratch_buffer_size; }; struct filter_flags { diff --git a/archive/pc_archive.c b/archive/pc_archive.c index 88edf59..cd527f4 100644 --- a/archive/pc_archive.c +++ b/archive/pc_archive.c @@ -1561,6 +1561,23 @@ init_filters(struct filter_flags *ff) pthread_mutex_unlock(&init_mutex); } +void +disable_all_filters() +{ + struct filter_flags ff; + + pthread_mutex_lock(&init_mutex); + if (!filters_inited) { + ff.enable_packjpg = 0; + ff.enable_wavpack = 0; + add_filters_by_type(typetab, &ff); + filters_inited = 1; + } else { + memset(typetab, 0, sizeof (typetab)); + } + pthread_mutex_unlock(&init_mutex); +} + /* * Identify file type based on extension. Lookup is fast as we have a perfect hash function. * If the given extension maps to a slot which has a different extension or maps to a slot diff --git a/archive/pc_archive.h b/archive/pc_archive.h index d79224a..dfc6636 100644 --- a/archive/pc_archive.h +++ b/archive/pc_archive.h @@ -55,6 +55,8 @@ int archiver_close(void *ctx); int init_archive_mod(); int insert_filter_data(filter_func_ptr func, void *filter_private, const char *ext); void init_filters(struct filter_flags *ff); +void disable_all_filters(); + #ifdef __cplusplus } diff --git a/archive/wavpack_helper.c b/archive/wavpack_helper.c index 3ea4a99..81f32fa 100644 --- a/archive/wavpack_helper.c +++ b/archive/wavpack_helper.c @@ -228,13 +228,13 @@ pack_audio(WavpackContext *wpc, read_data *rdat) // don't use an absurd amount of memory just because we have an absurd number of channels - while (input_samples * sizeof (int32_t) * WavpackGetNumChannels (wpc) > 2048*1024) + while (input_samples * sizeof (int32_t) * WavpackGetNumChannels(wpc) > 2048*1024) input_samples >>= 1; WavpackPackInit(wpc); - bytes_per_sample = WavpackGetBytesPerSample (wpc) * WavpackGetNumChannels (wpc); - sample_buffer = malloc(input_samples * sizeof (int32_t) * WavpackGetNumChannels (wpc)); - samples_remaining = WavpackGetNumSamples (wpc); + bytes_per_sample = WavpackGetBytesPerSample(wpc) * WavpackGetNumChannels(wpc); + sample_buffer = malloc(input_samples * sizeof (int32_t) * WavpackGetNumChannels(wpc)); + samples_remaining = WavpackGetNumSamples(wpc); while (1) { uint32_t bytes_to_read, bytes_read = 0; @@ -256,7 +256,7 @@ pack_audio(WavpackContext *wpc, read_data *rdat) unsigned char *sptr = input_buffer; int32_t *dptr = sample_buffer; - switch (WavpackGetBytesPerSample (wpc)) { + switch (WavpackGetBytesPerSample(wpc)) { case 1: while (cnt--) *dptr++ = *sptr++ - 128; @@ -264,24 +264,24 @@ pack_audio(WavpackContext *wpc, read_data *rdat) case 2: while (cnt--) { - *dptr++ = sptr [0] | ((int32_t)(signed char) sptr [1] << 8); + *dptr++ = sptr[0] | ((int32_t)(signed char) sptr[1] << 8); sptr += 2; } break; case 3: while (cnt--) { - *dptr++ = sptr [0] | ((int32_t) sptr [1] << 8) | - ((int32_t)(signed char) sptr [2] << 16); + *dptr++ = sptr[0] | ((int32_t) sptr[1] << 8) | + ((int32_t)(signed char) sptr[2] << 16); sptr += 3; } break; case 4: while (cnt--) { - *dptr++ = sptr [0] | ((int32_t) sptr [1] << 8) | - ((int32_t) sptr [2] << 16) | - ((int32_t)(signed char) sptr [3] << 24); + *dptr++ = sptr[0] | ((int32_t) sptr[1] << 8) | + ((int32_t) sptr[2] << 16) | + ((int32_t)(signed char) sptr[3] << 24); sptr += 4; } break; @@ -295,7 +295,7 @@ pack_audio(WavpackContext *wpc, read_data *rdat) } free(sample_buffer); - if (!WavpackFlushSamples (wpc)) { + if (!WavpackFlushSamples(wpc)) { return (0); } diff --git a/pcompress.c b/pcompress.c index e05a2b9..ea05d12 100644 --- a/pcompress.c +++ b/pcompress.c @@ -1328,10 +1328,14 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) UNCOMP_BAIL; } } + + /* + * The last parameter is freeram. It is not needed during decompression. + */ if (pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global) { tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, pctx->rab_blk_size, pctx->algo, &props, pctx->enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0, - NULL, pctx->pipe_mode, nprocs); + NULL, pctx->pipe_mode, nprocs, 0); if (tdat->rctx == NULL) { UNCOMP_BAIL; } @@ -1842,7 +1846,7 @@ plain_index: ORIGINAL_CHUNKSZ, crc); U32_P(mac_ptr) = htonl(crc); } - + Sem_Post(&tdat->cmp_done_sem); goto redo; } @@ -1925,6 +1929,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev uchar_t *cread_buf, *pos; dedupe_context_t *rctx; algo_props_t props; + my_sysinfo msys_info; init_algo_props(&props); props.cksum = pctx->cksum; @@ -2007,7 +2012,15 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev single_chunk = 0; rctx = NULL; + /* + * Get number of lCPUs. When archiving with advanced filters, we use one less + * lCPU to reduce threads due to increased memory requirements. + */ nprocs = sysconf(_SC_NPROCESSORS_ONLN); + if (pctx->archive_mode && (pctx->enable_packjpg || pctx->enable_wavpack)) { + nprocs = nprocs > 1 ? nprocs-1:nprocs; + } + if (pctx->nthreads > 0 && pctx->nthreads < nprocs) nprocs = pctx->nthreads; else @@ -2254,6 +2267,26 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev tdat->compress = pctx->_compress_func; tdat->decompress = pctx->_decompress_func; tdat->uncompressed_chunk = (uchar_t *)1; + if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) { + if (single_chunk) + tdat->cmp_seg = (uchar_t *)1; + else + tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, compressed_chunksize); + tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, + compressed_chunksize); + } else { + if (single_chunk) + tdat->uncompressed_chunk = (uchar_t *)1; + else + tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); + tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, compressed_chunksize); + } + tdat->compressed_chunk = tdat->cmp_seg + COMPRESSED_CHUNKSZ + + pctx->cksum_bytes + pctx->mac_bytes; + if (!tdat->cmp_seg || !tdat->uncompressed_chunk) { + log_msg(LOG_ERR, 0, "5: Out of memory"); + COMP_BAIL; + } tdat->cancel = 0; tdat->decompressing = 0; if (single_chunk) @@ -2270,8 +2303,8 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev Sem_Init(&(tdat->index_sem), 0, 0); if (pctx->_init_func) { - if (pctx->_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, - VERSION, COMPRESS) != 0) { + if (pctx->_init_func(&(tdat->data), &(tdat->level), props.nthreads, + chunksize, VERSION, COMPRESS) != 0) { COMP_BAIL; } } @@ -2291,15 +2324,29 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev thread = 1; /* - * initialize Dedupe Context here after all other allocations so that index size can be correctly - * computed based on free memory. + * initialize Dedupe Context here after all other allocations so that index size can be + * correctly computed based on free memory. The freeram got here is adjusted amount. + * When archiving, filter scratch buffer is taken into account. */ + get_sys_limits(&msys_info); + + if (pctx->enable_packjpg || pctx->enable_wavpack) { + if (FILTER_SCRATCH_SIZE_MAX >= msys_info.freeram || + msys_info.freeram - FILTER_SCRATCH_SIZE_MAX < FILTER_SCRATCH_SIZE_MAX) { + log_msg(LOG_WARN, 0, "Not enough memory. Disabling advanced filters."); + disable_all_filters(); + } else { + msys_info.freeram -= FILTER_SCRATCH_SIZE_MAX; + } + } + if (pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global) { for (i = 0; i < nprocs; i++) { tdat = dary[i]; - tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, pctx->rab_blk_size, - pctx->algo, &props, pctx->enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size, - tmpdir, pctx->pipe_mode, nprocs); + tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, + pctx->rab_blk_size, pctx->algo, &props, pctx->enable_delta_encode, + dedupe_flag, VERSION, COMPRESS, sbuf.st_size, tmpdir, + pctx->pipe_mode, nprocs, msys_info.freeram); if (tdat->rctx == NULL) { COMP_BAIL; } @@ -2442,7 +2489,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev if (pctx->enable_rabin_split) { rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props, pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL, - pctx->pipe_mode, nprocs); + pctx->pipe_mode, nprocs, msys_info.freeram); if (pctx->archive_mode) rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx, pctx); else @@ -2470,34 +2517,6 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev bail = 1; break; } - /* - * Delayed allocation. Allocate chunks if not already done. - */ - if (!tdat->cmp_seg) { - if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) { - if (single_chunk) - tdat->cmp_seg = (uchar_t *)1; - else - tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, - compressed_chunksize); - tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, - compressed_chunksize); - } else { - if (single_chunk) - tdat->uncompressed_chunk = (uchar_t *)1; - else - tdat->uncompressed_chunk = - (uchar_t *)slab_alloc(NULL, chunksize); - tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, - compressed_chunksize); - } - tdat->compressed_chunk = tdat->cmp_seg + COMPRESSED_CHUNKSZ + - pctx->cksum_bytes + pctx->mac_bytes; - if (!tdat->cmp_seg || !tdat->uncompressed_chunk) { - log_msg(LOG_ERR, 0, "5: Out of memory"); - COMP_BAIL; - } - } /* * Once previous chunk is done swap already read buffer and diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index b7b8efb..a039e99 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -165,7 +165,7 @@ dedupe_context_t * create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, int file_version, compress_op_t op, uint64_t file_size, char *tmppath, - int pipe_mode, int nthreads) { + int pipe_mode, int nthreads, size_t freeram) { dedupe_context_t *ctx; uint32_t i; @@ -213,14 +213,9 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s * chunk matching. */ if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz >= 0) { - my_sysinfo msys_info; int pct_interval, chunk_cksum, cksum_bytes, mac_bytes; char *ck; - /* - * Get amount of memory to use. The freeram got here is adjusted amount. - */ - get_sys_limits(&msys_info); pct_interval = 0; if (pipe_mode) pct_interval = DEFAULT_PCT_INTERVAL; @@ -245,7 +240,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s } arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, pct_interval, algo, chunk_cksum, GLOBAL_SIM_CKSUM, file_size, - msys_info.freeram, nthreads); + freeram, nthreads); if (arc == NULL) { pthread_mutex_unlock(&init_lock); return (NULL); diff --git a/rabin/rabin_dedup.h b/rabin/rabin_dedup.h index 1d9f82e..ab05d25 100644 --- a/rabin/rabin_dedup.h +++ b/rabin/rabin_dedup.h @@ -193,7 +193,8 @@ typedef struct { extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, - int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode, int nthreads); + int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode, + int nthreads, size_t freeram); extern void destroy_dedupe_context(dedupe_context_t *ctx); extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf, uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt); diff --git a/utils/utils.c b/utils/utils.c index 08ab227..f444bd7 100644 --- a/utils/utils.c +++ b/utils/utils.c @@ -43,7 +43,8 @@ #include #include #include -#include +#include "archive/pc_archive.h" +#include "archive/pc_arc_filter.h" #ifdef __APPLE__ #include