Delay allocation of per-thread chunks for performance and memory efficiency.

Avoid allocating double-buffer for single-chunk files.
Introduce lzmaMt option to indicate multithreaded LZMA.
Update README.
This commit is contained in:
Moinak Ghosh 2012-08-18 22:00:14 +05:30
parent 9eac774eb1
commit 3851c9c6cc
4 changed files with 106 additions and 42 deletions

View file

@ -35,6 +35,9 @@ Usage
lz4 - Ultra fast, high-throughput algorithm reaching RAM B/W at level1.
zlib - The base Zlib format compression (not Gzip).
lzma - The LZMA (Lempel-Ziv Markov) algorithm from 7Zip.
lzmaMt - Multithreaded version of LZMA. This is a faster version but
uses more memory for the dictionary. Thread count is balanced
between chunk processing threads and algorithm threads.
bzip2 - Bzip2 Algorithm from libbzip2.
ppmd - The PPMd algorithm excellent for textual data. PPMd requires
at least 64MB X CPUs more memory than the other modes.
@ -44,6 +47,8 @@ Usage
adapt2 - Adaptive mode which includes ppmd and lzma. This requires
more memory than adapt mode, is slower and potentially gives
the best compression.
none - No compression. This is only meaningful with -D and -E so Dedupe
can be done for post-processing with an external utility.
<chunk_size> - This can be in bytes or can use the following suffixes:
g - Gigabyte, m - Megabyte, k - Kilobyte.
Larger chunks produce better compression at the cost of memory.
@ -73,7 +78,8 @@ Environment Variables
Set ALLOCATOR_BYPASS=1 in the environment to avoid using the the built-in
allocator. Due to the the way it rounds up an allocation request to the nearest
slab the built-in allocator can allocate extra unused memory.
slab the built-in allocator can allocate extra unused memory. In addition you
may want to use a different allocator in your environment.
Examples
========
@ -123,8 +129,8 @@ Adapt2 - Ultra slow synthetic mode. Both LZMA and PPMD are tried per chunk and
Since both LZMA and PPMD are used together memory requirements are
quite extensive especially if you are also using extreme levels above
10. For example with 64MB chunk, Level 14, 2 threads and with or without
dedupe, it uses upto 3.5GB physical RAM. So minimum requirement is 6GB
RAM *and* at least 4GB physical swap.
dedupe, it uses upto 3.5GB physical RAM and requires 6GB of virtual
memory space.
It is possible for a single chunk to span the entire file if enough RAM is
available. However for adaptive modes to be effective for large files, especially

View file

@ -47,13 +47,20 @@ lzma_stats(int show)
}
void
lzma_props(algo_props_t *data, int level, ssize_t chunksize) {
lzma_mt_props(algo_props_t *data, int level, ssize_t chunksize) {
data->compress_mt_capable = 1;
data->decompress_mt_capable = 0;
data->buf_extra = 0;
data->c_max_threads = 2;
}
void
lzma_props(algo_props_t *data, int level, ssize_t chunksize) {
data->compress_mt_capable = 0;
data->decompress_mt_capable = 0;
data->buf_extra = 0;
}
/*
* The two functions below are not thread-safe, by design.
*/

116
main.c
View file

@ -99,6 +99,9 @@ usage(void)
" lz4 - Ultra fast, high-throughput algorithm reaching RAM B/W at level1.\n"
" zlib - The base Zlib format compression (not Gzip).\n"
" lzma - The LZMA (Lempel-Ziv Markov) algorithm from 7Zip.\n"
" lzmaMt - Multithreaded version of LZMA. This is a faster version but\n"
" uses more memory for the dictionary. Thread count is balanced\n"
" between chunk processing threads and algorithm threads.\n"
" bzip2 - Bzip2 Algorithm from libbzip2.\n"
" ppmd - The PPMd algorithm excellent for textual data. PPMd requires\n"
" at least 64MB X CPUs more memory than the other modes.\n"
@ -108,6 +111,8 @@ usage(void)
" adapt2 - Adaptive mode which includes ppmd and lzma. This requires\n"
" more memory than adapt mode, is slower and potentially gives\n"
" the best compression.\n"
" none - No compression. This is only meaningful with -D and -E so Dedupe\n"
" can be done for post-processing with an external utility.\n"
" <chunk_size> - This can be in bytes or can use the following suffixes:\n"
" g - Gigabyte, m - Megabyte, k - Kilobyte.\n"
" Larger chunks produce better compression at the cost of memory.\n"
@ -424,8 +429,9 @@ start_decompress(const char *filename, const char *to_filename)
set_threadcounts(&props, &nthreads, nprocs, DECOMPRESS_THREADS);
fprintf(stderr, "Scaling to %d thread", nthreads * props.nthreads);
if (nprocs > 1) fprintf(stderr, "s");
if (nthreads * props.nthreads > 1) fprintf(stderr, "s");
fprintf(stderr, "\n");
nprocs = nthreads;
slab_cache_add(compressed_chunksize + CHDR_SZ);
slab_cache_add(chunksize);
slab_cache_add(sizeof (struct cmp_data));
@ -438,22 +444,7 @@ start_decompress(const char *filename, const char *to_filename)
UNCOMP_BAIL;
}
tdat = dary[i];
tdat->compressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
if (!tdat->compressed_chunk) {
fprintf(stderr, "Out of memory\n");
UNCOMP_BAIL;
}
if (enable_rabin_scan)
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
else
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize);
if (!tdat->uncompressed_chunk) {
fprintf(stderr, "Out of memory\n");
UNCOMP_BAIL;
}
tdat->cmp_seg = tdat->uncompressed_chunk;
tdat->compressed_chunk = NULL;
tdat->chunksize = chunksize;
tdat->compress = _compress_func;
tdat->decompress = _decompress_func;
@ -513,6 +504,28 @@ start_decompress(const char *filename, const char *to_filename)
if (main_cancel) break;
tdat->id = chunk_num;
/*
* Delayed allocation. Allocate chunks if not already done. The compressed
* file format does not provide any info on how many chunks are there in
* order to allow pipe mode operation. So delayed allocation during
* decompression allows to avoid allocating per-thread chunks which will
* never be used. This can happen if chunk count < thread count.
*/
if (!tdat->compressed_chunk) {
tdat->compressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
if (enable_rabin_scan)
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
else
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize);
if (!tdat->compressed_chunk || !tdat->uncompressed_chunk) {
fprintf(stderr, "Out of memory\n");
UNCOMP_BAIL;
}
tdat->cmp_seg = tdat->uncompressed_chunk;
}
/*
* First read length of compressed chunk.
*/
@ -591,7 +604,9 @@ uncomp_done:
perror("Chown ");
if (dary != NULL) {
for (i = 0; i < nprocs; i++) {
if (dary[i]->uncompressed_chunk)
slab_free(NULL, dary[i]->uncompressed_chunk);
if (dary[i]->compressed_chunk)
slab_free(NULL, dary[i]->compressed_chunk);
if (_deinit_func)
_deinit_func(&(dary[i]->data));
@ -820,7 +835,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
short version, flags;
struct stat sbuf;
int compfd = -1, uncompfd = -1, err;
int i, thread = 0, bail;
int i, thread, bail, single_chunk;
int nprocs, np, p;
struct cmp_data **dary = NULL, *tdat;
pthread_t writer_thr;
@ -862,6 +877,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
err = 0;
thread = 0;
single_chunk = 0;
slab_cache_add(chunksize);
slab_cache_add(compressed_chunksize + CHDR_SZ);
slab_cache_add(sizeof (struct cmp_data));
@ -900,6 +916,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
chunksize = sbuf.st_size;
enable_rabin_split = 0; // Do not split for whole files.
nthreads = 1;
single_chunk = 1;
} else {
if (nthreads == 0 || nthreads > sbuf.st_size / chunksize) {
nthreads = sbuf.st_size / chunksize;
@ -939,7 +956,8 @@ start_compress(const char *filename, uint64_t chunksize, int level)
set_threadcounts(&props, &nthreads, nprocs, COMPRESS_THREADS);
fprintf(stderr, "Scaling to %d thread", nthreads * props.nthreads);
if (nprocs > 1) fprintf(stderr, "s");
if (nthreads * props.nthreads > 1) fprintf(stderr, "s");
nprocs = nthreads;
fprintf(stderr, "\n");
dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *));
@ -959,20 +977,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
COMP_BAIL;
}
tdat = dary[i];
tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ);
tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t);
if (!tdat->compressed_chunk) {
fprintf(stderr, "Out of memory\n");
COMP_BAIL;
}
if (enable_rabin_scan)
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ);
else
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize);
if (!tdat->uncompressed_chunk) {
fprintf(stderr, "Out of memory\n");
COMP_BAIL;
}
tdat->cmp_seg = NULL;
tdat->chunksize = chunksize;
tdat->compress = _compress_func;
tdat->decompress = _decompress_func;
@ -1070,6 +1075,34 @@ start_compress(const char *filename, uint64_t chunksize, int level)
sem_wait(&tdat->write_done_sem);
if (main_cancel) break;
/*
* Delayed allocation. Allocate chunks if not already done.
*/
if (!tdat->cmp_seg) {
if (enable_rabin_scan) {
if (single_chunk)
tdat->cmp_seg = (uchar_t *)1;
else
tdat->cmp_seg = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
} else {
if (single_chunk)
tdat->uncompressed_chunk = (uchar_t *)1;
else
tdat->uncompressed_chunk =
(uchar_t *)slab_alloc(NULL, chunksize);
tdat->cmp_seg = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
}
tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t);
if (!tdat->cmp_seg || !tdat->uncompressed_chunk) {
fprintf(stderr, "Out of memory\n");
COMP_BAIL;
}
}
/*
* Once previous chunk is done swap already read buffer and
* it's size into the thread data.
@ -1118,6 +1151,11 @@ start_compress(const char *filename, uint64_t chunksize, int level)
sem_post(&tdat->start_sem);
chunk_num++;
if (single_chunk) {
rbytes = 0;
continue;
}
/*
* Read the next buffer we want to process while previous
* buffer is in progress.
@ -1193,7 +1231,9 @@ comp_done:
if (dary != NULL) {
for (i = 0; i < nprocs; i++) {
if (!dary[i]) continue;
if (dary[i]->uncompressed_chunk != (uchar_t *)1)
slab_free(NULL, dary[i]->uncompressed_chunk);
if (dary[i]->cmp_seg != (uchar_t *)1)
slab_free(NULL, dary[i]->cmp_seg);
if (enable_rabin_scan) {
destroy_rabin_context(dary[i]->rctx);
@ -1205,6 +1245,7 @@ comp_done:
slab_free(NULL, dary);
}
if (enable_rabin_split) destroy_rabin_context(rctx);
if (cread_buf != (uchar_t *)1)
slab_free(NULL, cread_buf);
if (!pipe_mode) {
if (compfd != -1) close(compfd);
@ -1236,6 +1277,15 @@ init_algo(const char *algo, int bail)
_stats_func = zlib_stats;
rv = 0;
} else if (memcmp(algorithm, "lzmaMt", 6) == 0) {
_compress_func = lzma_compress;
_decompress_func = lzma_decompress;
_init_func = lzma_init;
_deinit_func = lzma_deinit;
_stats_func = lzma_stats;
_props_func = lzma_mt_props;
rv = 0;
} else if (memcmp(algorithm, "lzma", 4) == 0) {
_compress_func = lzma_compress;
_decompress_func = lzma_decompress;

View file

@ -108,6 +108,7 @@ extern int lz4_init(void **data, int *level, int nthreads, ssize_t chunksize);
extern int none_init(void **data, int *level, int nthreads, ssize_t chunksize);
extern void lzma_props(algo_props_t *data, int level, ssize_t chunksize);
extern void lzma_mt_props(algo_props_t *data, int level, ssize_t chunksize);
extern void lz4_props(algo_props_t *data, int level, ssize_t chunksize);
extern int zlib_deinit(void **data);