Remove unneeded archive writing function.

Improve filter scratch buffer handling.
Improve memory accounting.
Remove delayed allocation when compressing. Allows better memory estimation.
Some cstyle fixes.
This commit is contained in:
Moinak Ghosh 2014-09-24 21:54:36 +05:30
parent 6ba9c4b721
commit e3c32ed6d6
9 changed files with 124 additions and 114 deletions

View file

@ -37,18 +37,9 @@
#include <utils.h> #include <utils.h>
#include <sys/mman.h> #include <sys/mman.h>
#include <ctype.h> #include <ctype.h>
#include <archive.h>
#include <archive_entry.h>
#include "pc_arc_filter.h" #include "pc_arc_filter.h"
#include "pc_archive.h" #include "pc_archive.h"
#ifndef _MPLV2_LICENSE_
# define HELPER_DEF_BUFSIZ (512 * 1024)
# define FILE_SIZE_LIMIT (32 * 1024 * 1024)
# define PJG_APPVERSION1 (25)
# define PJG_APPVERSION2 (25)
#endif
struct scratch_buffer { struct scratch_buffer {
uchar_t *in_buff; uchar_t *in_buff;
size_t in_bufflen; size_t in_bufflen;
@ -164,36 +155,6 @@ copy_archive_data(struct archive *ar, uchar_t *out_buf)
return (tot); return (tot);
} }
/*
* Copy the given buffer into the archive stream.
*/
static ssize_t
write_archive_data(struct archive *aw, uchar_t *out_buf, size_t len, int block_size)
{
int64_t offset;
uchar_t *buff;
int r;
size_t tot;
buff = out_buf;
offset = 0;
tot = len;
while (len > 0) {
if (len < block_size)
block_size = len;
r = (int)archive_write_data_block(aw, buff, block_size, offset);
if (r < ARCHIVE_WARN)
r = ARCHIVE_WARN;
if (r != ARCHIVE_OK) {
return (r);
}
offset += block_size;
len -= block_size;
buff += block_size;
}
return (tot);
}
#ifndef _MPLV2_LICENSE_ #ifndef _MPLV2_LICENSE_
int int
pjg_version_supported(char ver) pjg_version_supported(char ver)
@ -211,7 +172,7 @@ packjpg_filter(struct filter_info *fi, void *filter_private)
len = archive_entry_size(fi->entry); len = archive_entry_size(fi->entry);
len1 = len; len1 = len;
if (len > FILE_SIZE_LIMIT) // Bork on massive JPEGs if (len > PJG_FILE_SIZE_LIMIT) // Bork on massive JPEGs
return (FILTER_RETURN_SKIP); return (FILTER_RETURN_SKIP);
if (fi->compressing) { if (fi->compressing) {
@ -264,8 +225,7 @@ packjpg_filter(struct filter_info *fi, void *filter_private)
* version number. We also check if it is supported. * version number. We also check if it is supported.
*/ */
if (mapbuf[0] != 'J' || mapbuf[1] != 'S' || !pjg_version_supported(mapbuf[2])) { if (mapbuf[0] != 'J' || mapbuf[1] != 'S' || !pjg_version_supported(mapbuf[2])) {
return (write_archive_data(fi->target_arc, sdat->in_buff, return (archive_write_data(fi->target_arc, sdat->in_buff, len));
len, fi->block_size));
} }
} }
@ -301,11 +261,11 @@ packjpg_filter(struct filter_info *fi, void *filter_private)
* soft error to continue the archive extraction. * soft error to continue the archive extraction.
*/ */
free(out); free(out);
if (write_archive_data(fi->target_arc, mapbuf, len1, fi->block_size) < len1) if (archive_write_data(fi->target_arc, mapbuf, len1) < len1)
return (FILTER_RETURN_ERROR); return (FILTER_RETURN_ERROR);
return (FILTER_RETURN_SOFT_ERROR); return (FILTER_RETURN_SOFT_ERROR);
} }
rv = write_archive_data(fi->target_arc, out, len, fi->block_size); rv = archive_write_data(fi->target_arc, out, len);
free(out); free(out);
return (rv); return (rv);
} }
@ -320,7 +280,7 @@ packpnm_filter(struct filter_info *fi, void *filter_private)
len = archive_entry_size(fi->entry); len = archive_entry_size(fi->entry);
len1 = len; len1 = len;
if (len > FILE_SIZE_LIMIT) // Bork on massive JPEGs if (len > PJG_FILE_SIZE_LIMIT) // Bork on massive JPEGs
return (FILTER_RETURN_SKIP); return (FILTER_RETURN_SKIP);
if (fi->compressing) { if (fi->compressing) {
@ -367,8 +327,7 @@ packpnm_filter(struct filter_info *fi, void *filter_private)
* Write the raw data and skip. * Write the raw data and skip.
*/ */
if (identify_pnm_type(mapbuf, len - 8) != 2) { if (identify_pnm_type(mapbuf, len - 8) != 2) {
return (write_archive_data(fi->target_arc, sdat->in_buff, return (archive_write_data(fi->target_arc, sdat->in_buff, len));
len, fi->block_size));
} }
} }
@ -404,11 +363,11 @@ packpnm_filter(struct filter_info *fi, void *filter_private)
* soft error to continue the archive extraction. * soft error to continue the archive extraction.
*/ */
free(out); free(out);
if (write_archive_data(fi->target_arc, mapbuf, len1, fi->block_size) < len1) if (archive_write_data(fi->target_arc, mapbuf, len1) < len1)
return (FILTER_RETURN_ERROR); return (FILTER_RETURN_ERROR);
return (FILTER_RETURN_SOFT_ERROR); return (FILTER_RETURN_SOFT_ERROR);
} }
rv = write_archive_data(fi->target_arc, out, len, fi->block_size); rv = archive_write_data(fi->target_arc, out, len);
free(out); free(out);
return (rv); return (rv);
} }
@ -425,7 +384,7 @@ wavpack_filter(struct filter_info *fi, void *filter_private)
len = archive_entry_size(fi->entry); len = archive_entry_size(fi->entry);
len1 = len; len1 = len;
if (len > FILE_SIZE_LIMIT) // Bork on massive JPEGs if (len > WVPK_FILE_SIZE_LIMIT)
return (FILTER_RETURN_SKIP); return (FILTER_RETURN_SKIP);
if (fi->compressing) { if (fi->compressing) {
@ -474,9 +433,8 @@ wavpack_filter(struct filter_info *fi, void *filter_private)
* Write the raw data and skip. * Write the raw data and skip.
*/ */
wpkstr = (char *)mapbuf; wpkstr = (char *)mapbuf;
if (strncmp(wpkstr, "wvpk", 4) == 0) { if (strncmp(wpkstr, "wvpk", 4) != 0) {
return (write_archive_data(fi->target_arc, sdat->in_buff, return (archive_write_data(fi->target_arc, sdat->in_buff, len));
len, fi->block_size));
} }
} }
@ -512,11 +470,11 @@ wavpack_filter(struct filter_info *fi, void *filter_private)
* soft error to continue the archive extraction. * soft error to continue the archive extraction.
*/ */
free(out); free(out);
if (write_archive_data(fi->target_arc, mapbuf, len1, fi->block_size) < len1) if (archive_write_data(fi->target_arc, mapbuf, len1) < len1)
return (FILTER_RETURN_ERROR); return (FILTER_RETURN_ERROR);
return (FILTER_RETURN_SOFT_ERROR); return (FILTER_RETURN_SOFT_ERROR);
} }
rv = write_archive_data(fi->target_arc, out, len, fi->block_size); rv = archive_write_data(fi->target_arc, out, len);
free(out); free(out);
return (rv); return (rv);
} }

View file

@ -42,6 +42,21 @@ extern "C" {
#define FILTER_RETURN_SOFT_ERROR (-2) #define FILTER_RETURN_SOFT_ERROR (-2)
#define FILTER_XATTR_ENTRY "_._pc_filter_xattr" #define FILTER_XATTR_ENTRY "_._pc_filter_xattr"
#define HELPER_DEF_BUFSIZ (512 * 1024)
#define WVPK_FILE_SIZE_LIMIT (18 * 1024 * 1024)
/*
* The biggest scratch buffer reqd by filter routines.
* Currently this is the WavPack filter buffer.
*/
#define FILTER_SCRATCH_SIZE_MAX WVPK_FILE_SIZE_LIMIT
#ifndef _MPLV2_LICENSE_
# define PJG_FILE_SIZE_LIMIT (8 * 1024 * 1024)
# define PJG_APPVERSION1 (25)
# define PJG_APPVERSION2 (25)
#endif
struct filter_info { struct filter_info {
struct archive *source_arc; struct archive *source_arc;
struct archive *target_arc; struct archive *target_arc;
@ -50,6 +65,8 @@ struct filter_info {
int compressing, block_size; int compressing, block_size;
int *type_ptr; int *type_ptr;
int cmp_level; int cmp_level;
uchar_t scratch_buffer;
size_t scratch_buffer_size;
}; };
struct filter_flags { struct filter_flags {

View file

@ -1561,6 +1561,23 @@ init_filters(struct filter_flags *ff)
pthread_mutex_unlock(&init_mutex); pthread_mutex_unlock(&init_mutex);
} }
void
disable_all_filters()
{
struct filter_flags ff;
pthread_mutex_lock(&init_mutex);
if (!filters_inited) {
ff.enable_packjpg = 0;
ff.enable_wavpack = 0;
add_filters_by_type(typetab, &ff);
filters_inited = 1;
} else {
memset(typetab, 0, sizeof (typetab));
}
pthread_mutex_unlock(&init_mutex);
}
/* /*
* Identify file type based on extension. Lookup is fast as we have a perfect hash function. * Identify file type based on extension. Lookup is fast as we have a perfect hash function.
* If the given extension maps to a slot which has a different extension or maps to a slot * If the given extension maps to a slot which has a different extension or maps to a slot

View file

@ -55,6 +55,8 @@ int archiver_close(void *ctx);
int init_archive_mod(); int init_archive_mod();
int insert_filter_data(filter_func_ptr func, void *filter_private, const char *ext); int insert_filter_data(filter_func_ptr func, void *filter_private, const char *ext);
void init_filters(struct filter_flags *ff); void init_filters(struct filter_flags *ff);
void disable_all_filters();
#ifdef __cplusplus #ifdef __cplusplus
} }

View file

@ -1328,10 +1328,14 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
UNCOMP_BAIL; UNCOMP_BAIL;
} }
} }
/*
* The last parameter is freeram. It is not needed during decompression.
*/
if (pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global) { if (pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global) {
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, pctx->rab_blk_size, tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, pctx->rab_blk_size,
pctx->algo, &props, pctx->enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0, pctx->algo, &props, pctx->enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0,
NULL, pctx->pipe_mode, nprocs); NULL, pctx->pipe_mode, nprocs, 0);
if (tdat->rctx == NULL) { if (tdat->rctx == NULL) {
UNCOMP_BAIL; UNCOMP_BAIL;
} }
@ -1925,6 +1929,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
uchar_t *cread_buf, *pos; uchar_t *cread_buf, *pos;
dedupe_context_t *rctx; dedupe_context_t *rctx;
algo_props_t props; algo_props_t props;
my_sysinfo msys_info;
init_algo_props(&props); init_algo_props(&props);
props.cksum = pctx->cksum; props.cksum = pctx->cksum;
@ -2007,7 +2012,15 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
single_chunk = 0; single_chunk = 0;
rctx = NULL; rctx = NULL;
/*
* Get number of lCPUs. When archiving with advanced filters, we use one less
* lCPU to reduce threads due to increased memory requirements.
*/
nprocs = sysconf(_SC_NPROCESSORS_ONLN); nprocs = sysconf(_SC_NPROCESSORS_ONLN);
if (pctx->archive_mode && (pctx->enable_packjpg || pctx->enable_wavpack)) {
nprocs = nprocs > 1 ? nprocs-1:nprocs;
}
if (pctx->nthreads > 0 && pctx->nthreads < nprocs) if (pctx->nthreads > 0 && pctx->nthreads < nprocs)
nprocs = pctx->nthreads; nprocs = pctx->nthreads;
else else
@ -2254,6 +2267,26 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
tdat->compress = pctx->_compress_func; tdat->compress = pctx->_compress_func;
tdat->decompress = pctx->_decompress_func; tdat->decompress = pctx->_decompress_func;
tdat->uncompressed_chunk = (uchar_t *)1; tdat->uncompressed_chunk = (uchar_t *)1;
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) {
if (single_chunk)
tdat->cmp_seg = (uchar_t *)1;
else
tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, compressed_chunksize);
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize);
} else {
if (single_chunk)
tdat->uncompressed_chunk = (uchar_t *)1;
else
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize);
tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, compressed_chunksize);
}
tdat->compressed_chunk = tdat->cmp_seg + COMPRESSED_CHUNKSZ +
pctx->cksum_bytes + pctx->mac_bytes;
if (!tdat->cmp_seg || !tdat->uncompressed_chunk) {
log_msg(LOG_ERR, 0, "5: Out of memory");
COMP_BAIL;
}
tdat->cancel = 0; tdat->cancel = 0;
tdat->decompressing = 0; tdat->decompressing = 0;
if (single_chunk) if (single_chunk)
@ -2270,8 +2303,8 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
Sem_Init(&(tdat->index_sem), 0, 0); Sem_Init(&(tdat->index_sem), 0, 0);
if (pctx->_init_func) { if (pctx->_init_func) {
if (pctx->_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, if (pctx->_init_func(&(tdat->data), &(tdat->level), props.nthreads,
VERSION, COMPRESS) != 0) { chunksize, VERSION, COMPRESS) != 0) {
COMP_BAIL; COMP_BAIL;
} }
} }
@ -2291,15 +2324,29 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
thread = 1; thread = 1;
/* /*
* initialize Dedupe Context here after all other allocations so that index size can be correctly * initialize Dedupe Context here after all other allocations so that index size can be
* computed based on free memory. * correctly computed based on free memory. The freeram got here is adjusted amount.
* When archiving, filter scratch buffer is taken into account.
*/ */
get_sys_limits(&msys_info);
if (pctx->enable_packjpg || pctx->enable_wavpack) {
if (FILTER_SCRATCH_SIZE_MAX >= msys_info.freeram ||
msys_info.freeram - FILTER_SCRATCH_SIZE_MAX < FILTER_SCRATCH_SIZE_MAX) {
log_msg(LOG_WARN, 0, "Not enough memory. Disabling advanced filters.");
disable_all_filters();
} else {
msys_info.freeram -= FILTER_SCRATCH_SIZE_MAX;
}
}
if (pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global) { if (pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global) {
for (i = 0; i < nprocs; i++) { for (i = 0; i < nprocs; i++) {
tdat = dary[i]; tdat = dary[i];
tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, pctx->rab_blk_size, tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize,
pctx->algo, &props, pctx->enable_delta_encode, dedupe_flag, VERSION, COMPRESS, sbuf.st_size, pctx->rab_blk_size, pctx->algo, &props, pctx->enable_delta_encode,
tmpdir, pctx->pipe_mode, nprocs); dedupe_flag, VERSION, COMPRESS, sbuf.st_size, tmpdir,
pctx->pipe_mode, nprocs, msys_info.freeram);
if (tdat->rctx == NULL) { if (tdat->rctx == NULL) {
COMP_BAIL; COMP_BAIL;
} }
@ -2442,7 +2489,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
if (pctx->enable_rabin_split) { if (pctx->enable_rabin_split) {
rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props, rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props,
pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL,
pctx->pipe_mode, nprocs); pctx->pipe_mode, nprocs, msys_info.freeram);
if (pctx->archive_mode) if (pctx->archive_mode)
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx, pctx); rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx, pctx);
else else
@ -2470,34 +2517,6 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
bail = 1; bail = 1;
break; break;
} }
/*
* Delayed allocation. Allocate chunks if not already done.
*/
if (!tdat->cmp_seg) {
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) {
if (single_chunk)
tdat->cmp_seg = (uchar_t *)1;
else
tdat->cmp_seg = (uchar_t *)slab_alloc(NULL,
compressed_chunksize);
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize);
} else {
if (single_chunk)
tdat->uncompressed_chunk = (uchar_t *)1;
else
tdat->uncompressed_chunk =
(uchar_t *)slab_alloc(NULL, chunksize);
tdat->cmp_seg = (uchar_t *)slab_alloc(NULL,
compressed_chunksize);
}
tdat->compressed_chunk = tdat->cmp_seg + COMPRESSED_CHUNKSZ +
pctx->cksum_bytes + pctx->mac_bytes;
if (!tdat->cmp_seg || !tdat->uncompressed_chunk) {
log_msg(LOG_ERR, 0, "5: Out of memory");
COMP_BAIL;
}
}
/* /*
* Once previous chunk is done swap already read buffer and * Once previous chunk is done swap already read buffer and

View file

@ -165,7 +165,7 @@ dedupe_context_t *
create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz, create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_sz,
const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int file_version, compress_op_t op, uint64_t file_size, char *tmppath,
int pipe_mode, int nthreads) { int pipe_mode, int nthreads, size_t freeram) {
dedupe_context_t *ctx; dedupe_context_t *ctx;
uint32_t i; uint32_t i;
@ -213,14 +213,9 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
* chunk matching. * chunk matching.
*/ */
if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz >= 0) { if (dedupe_flag == RABIN_DEDUPE_FILE_GLOBAL && op == COMPRESS && rab_blk_sz >= 0) {
my_sysinfo msys_info;
int pct_interval, chunk_cksum, cksum_bytes, mac_bytes; int pct_interval, chunk_cksum, cksum_bytes, mac_bytes;
char *ck; char *ck;
/*
* Get amount of memory to use. The freeram got here is adjusted amount.
*/
get_sys_limits(&msys_info);
pct_interval = 0; pct_interval = 0;
if (pipe_mode) if (pipe_mode)
pct_interval = DEFAULT_PCT_INTERVAL; pct_interval = DEFAULT_PCT_INTERVAL;
@ -245,7 +240,7 @@ create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, int rab_blk_s
} }
arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, pct_interval, arc = init_global_db_s(NULL, tmppath, rab_blk_sz, chunksize, pct_interval,
algo, chunk_cksum, GLOBAL_SIM_CKSUM, file_size, algo, chunk_cksum, GLOBAL_SIM_CKSUM, file_size,
msys_info.freeram, nthreads); freeram, nthreads);
if (arc == NULL) { if (arc == NULL) {
pthread_mutex_unlock(&init_lock); pthread_mutex_unlock(&init_lock);
return (NULL); return (NULL);

View file

@ -193,7 +193,8 @@ typedef struct {
extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize, extern dedupe_context_t *create_dedupe_context(uint64_t chunksize, uint64_t real_chunksize,
int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag, int rab_blk_sz, const char *algo, const algo_props_t *props, int delta_flag, int dedupe_flag,
int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode, int nthreads); int file_version, compress_op_t op, uint64_t file_size, char *tmppath, int pipe_mode,
int nthreads, size_t freeram);
extern void destroy_dedupe_context(dedupe_context_t *ctx); extern void destroy_dedupe_context(dedupe_context_t *ctx);
extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf, extern unsigned int dedupe_compress(dedupe_context_t *ctx, unsigned char *buf,
uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt); uint64_t *size, uint64_t offset, uint64_t *rabin_pos, int mt);

View file

@ -43,7 +43,8 @@
#include <rabin_dedup.h> #include <rabin_dedup.h>
#include <cpuid.h> #include <cpuid.h>
#include <xxhash.h> #include <xxhash.h>
#include <pc_archive.h> #include "archive/pc_archive.h"
#include "archive/pc_arc_filter.h"
#ifdef __APPLE__ #ifdef __APPLE__
#include <sys/sysctl.h> #include <sys/sysctl.h>