From 3851c9c6cc037545030e2033f7943151429fc11f Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Sat, 18 Aug 2012 22:00:14 +0530 Subject: [PATCH] Delay allocation of per-thread chunks for performance and memory efficiency. Avoid allocating double-buffer for single-chunk files. Introduce lzmaMt option to indicate multithreaded LZMA. Update README. --- README.md | 12 +++-- lzma_compress.c | 9 +++- main.c | 126 +++++++++++++++++++++++++++++++++--------------- pcompress.h | 1 + 4 files changed, 106 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index d3c860a..cc29002 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,9 @@ Usage lz4 - Ultra fast, high-throughput algorithm reaching RAM B/W at level1. zlib - The base Zlib format compression (not Gzip). lzma - The LZMA (Lempel-Ziv Markov) algorithm from 7Zip. + lzmaMt - Multithreaded version of LZMA. This is a faster version but + uses more memory for the dictionary. Thread count is balanced + between chunk processing threads and algorithm threads. bzip2 - Bzip2 Algorithm from libbzip2. ppmd - The PPMd algorithm excellent for textual data. PPMd requires at least 64MB X CPUs more memory than the other modes. @@ -44,6 +47,8 @@ Usage adapt2 - Adaptive mode which includes ppmd and lzma. This requires more memory than adapt mode, is slower and potentially gives the best compression. + none - No compression. This is only meaningful with -D and -E so Dedupe + can be done for post-processing with an external utility. - This can be in bytes or can use the following suffixes: g - Gigabyte, m - Megabyte, k - Kilobyte. Larger chunks produce better compression at the cost of memory. @@ -73,7 +78,8 @@ Environment Variables Set ALLOCATOR_BYPASS=1 in the environment to avoid using the the built-in allocator. Due to the the way it rounds up an allocation request to the nearest -slab the built-in allocator can allocate extra unused memory. +slab the built-in allocator can allocate extra unused memory. In addition you +may want to use a different allocator in your environment. Examples ======== @@ -123,8 +129,8 @@ Adapt2 - Ultra slow synthetic mode. Both LZMA and PPMD are tried per chunk and Since both LZMA and PPMD are used together memory requirements are quite extensive especially if you are also using extreme levels above 10. For example with 64MB chunk, Level 14, 2 threads and with or without - dedupe, it uses upto 3.5GB physical RAM. So minimum requirement is 6GB - RAM *and* at least 4GB physical swap. + dedupe, it uses upto 3.5GB physical RAM and requires 6GB of virtual + memory space. It is possible for a single chunk to span the entire file if enough RAM is available. However for adaptive modes to be effective for large files, especially diff --git a/lzma_compress.c b/lzma_compress.c index e0b204a..c1dc970 100644 --- a/lzma_compress.c +++ b/lzma_compress.c @@ -47,13 +47,20 @@ lzma_stats(int show) } void -lzma_props(algo_props_t *data, int level, ssize_t chunksize) { +lzma_mt_props(algo_props_t *data, int level, ssize_t chunksize) { data->compress_mt_capable = 1; data->decompress_mt_capable = 0; data->buf_extra = 0; data->c_max_threads = 2; } +void +lzma_props(algo_props_t *data, int level, ssize_t chunksize) { + data->compress_mt_capable = 0; + data->decompress_mt_capable = 0; + data->buf_extra = 0; +} + /* * The two functions below are not thread-safe, by design. */ diff --git a/main.c b/main.c index ef11cb9..3c2213d 100644 --- a/main.c +++ b/main.c @@ -99,6 +99,9 @@ usage(void) " lz4 - Ultra fast, high-throughput algorithm reaching RAM B/W at level1.\n" " zlib - The base Zlib format compression (not Gzip).\n" " lzma - The LZMA (Lempel-Ziv Markov) algorithm from 7Zip.\n" + " lzmaMt - Multithreaded version of LZMA. This is a faster version but\n" + " uses more memory for the dictionary. Thread count is balanced\n" + " between chunk processing threads and algorithm threads.\n" " bzip2 - Bzip2 Algorithm from libbzip2.\n" " ppmd - The PPMd algorithm excellent for textual data. PPMd requires\n" " at least 64MB X CPUs more memory than the other modes.\n" @@ -108,6 +111,8 @@ usage(void) " adapt2 - Adaptive mode which includes ppmd and lzma. This requires\n" " more memory than adapt mode, is slower and potentially gives\n" " the best compression.\n" + " none - No compression. This is only meaningful with -D and -E so Dedupe\n" + " can be done for post-processing with an external utility.\n" " - This can be in bytes or can use the following suffixes:\n" " g - Gigabyte, m - Megabyte, k - Kilobyte.\n" " Larger chunks produce better compression at the cost of memory.\n" @@ -424,8 +429,9 @@ start_decompress(const char *filename, const char *to_filename) set_threadcounts(&props, &nthreads, nprocs, DECOMPRESS_THREADS); fprintf(stderr, "Scaling to %d thread", nthreads * props.nthreads); - if (nprocs > 1) fprintf(stderr, "s"); + if (nthreads * props.nthreads > 1) fprintf(stderr, "s"); fprintf(stderr, "\n"); + nprocs = nthreads; slab_cache_add(compressed_chunksize + CHDR_SZ); slab_cache_add(chunksize); slab_cache_add(sizeof (struct cmp_data)); @@ -438,22 +444,7 @@ start_decompress(const char *filename, const char *to_filename) UNCOMP_BAIL; } tdat = dary[i]; - tdat->compressed_chunk = (uchar_t *)slab_alloc(NULL, - compressed_chunksize + CHDR_SZ); - if (!tdat->compressed_chunk) { - fprintf(stderr, "Out of memory\n"); - UNCOMP_BAIL; - } - if (enable_rabin_scan) - tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, - compressed_chunksize + CHDR_SZ); - else - tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); - if (!tdat->uncompressed_chunk) { - fprintf(stderr, "Out of memory\n"); - UNCOMP_BAIL; - } - tdat->cmp_seg = tdat->uncompressed_chunk; + tdat->compressed_chunk = NULL; tdat->chunksize = chunksize; tdat->compress = _compress_func; tdat->decompress = _decompress_func; @@ -513,6 +504,28 @@ start_decompress(const char *filename, const char *to_filename) if (main_cancel) break; tdat->id = chunk_num; + /* + * Delayed allocation. Allocate chunks if not already done. The compressed + * file format does not provide any info on how many chunks are there in + * order to allow pipe mode operation. So delayed allocation during + * decompression allows to avoid allocating per-thread chunks which will + * never be used. This can happen if chunk count < thread count. + */ + if (!tdat->compressed_chunk) { + tdat->compressed_chunk = (uchar_t *)slab_alloc(NULL, + compressed_chunksize + CHDR_SZ); + if (enable_rabin_scan) + tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, + compressed_chunksize + CHDR_SZ); + else + tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); + if (!tdat->compressed_chunk || !tdat->uncompressed_chunk) { + fprintf(stderr, "Out of memory\n"); + UNCOMP_BAIL; + } + tdat->cmp_seg = tdat->uncompressed_chunk; + } + /* * First read length of compressed chunk. */ @@ -591,8 +604,10 @@ uncomp_done: perror("Chown "); if (dary != NULL) { for (i = 0; i < nprocs; i++) { - slab_free(NULL, dary[i]->uncompressed_chunk); - slab_free(NULL, dary[i]->compressed_chunk); + if (dary[i]->uncompressed_chunk) + slab_free(NULL, dary[i]->uncompressed_chunk); + if (dary[i]->compressed_chunk) + slab_free(NULL, dary[i]->compressed_chunk); if (_deinit_func) _deinit_func(&(dary[i]->data)); if (enable_rabin_scan) { @@ -820,7 +835,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) short version, flags; struct stat sbuf; int compfd = -1, uncompfd = -1, err; - int i, thread = 0, bail; + int i, thread, bail, single_chunk; int nprocs, np, p; struct cmp_data **dary = NULL, *tdat; pthread_t writer_thr; @@ -862,6 +877,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) err = 0; thread = 0; + single_chunk = 0; slab_cache_add(chunksize); slab_cache_add(compressed_chunksize + CHDR_SZ); slab_cache_add(sizeof (struct cmp_data)); @@ -900,6 +916,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) chunksize = sbuf.st_size; enable_rabin_split = 0; // Do not split for whole files. nthreads = 1; + single_chunk = 1; } else { if (nthreads == 0 || nthreads > sbuf.st_size / chunksize) { nthreads = sbuf.st_size / chunksize; @@ -939,7 +956,8 @@ start_compress(const char *filename, uint64_t chunksize, int level) set_threadcounts(&props, &nthreads, nprocs, COMPRESS_THREADS); fprintf(stderr, "Scaling to %d thread", nthreads * props.nthreads); - if (nprocs > 1) fprintf(stderr, "s"); + if (nthreads * props.nthreads > 1) fprintf(stderr, "s"); + nprocs = nthreads; fprintf(stderr, "\n"); dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *)); @@ -959,20 +977,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) COMP_BAIL; } tdat = dary[i]; - tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ); - tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t); - if (!tdat->compressed_chunk) { - fprintf(stderr, "Out of memory\n"); - COMP_BAIL; - } - if (enable_rabin_scan) - tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ); - else - tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); - if (!tdat->uncompressed_chunk) { - fprintf(stderr, "Out of memory\n"); - COMP_BAIL; - } + tdat->cmp_seg = NULL; tdat->chunksize = chunksize; tdat->compress = _compress_func; tdat->decompress = _decompress_func; @@ -1070,6 +1075,34 @@ start_compress(const char *filename, uint64_t chunksize, int level) sem_wait(&tdat->write_done_sem); if (main_cancel) break; + /* + * Delayed allocation. Allocate chunks if not already done. + */ + if (!tdat->cmp_seg) { + if (enable_rabin_scan) { + if (single_chunk) + tdat->cmp_seg = (uchar_t *)1; + else + tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, + compressed_chunksize + CHDR_SZ); + tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, + compressed_chunksize + CHDR_SZ); + } else { + if (single_chunk) + tdat->uncompressed_chunk = (uchar_t *)1; + else + tdat->uncompressed_chunk = + (uchar_t *)slab_alloc(NULL, chunksize); + tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, + compressed_chunksize + CHDR_SZ); + } + tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t); + if (!tdat->cmp_seg || !tdat->uncompressed_chunk) { + fprintf(stderr, "Out of memory\n"); + COMP_BAIL; + } + } + /* * Once previous chunk is done swap already read buffer and * it's size into the thread data. @@ -1118,6 +1151,11 @@ start_compress(const char *filename, uint64_t chunksize, int level) sem_post(&tdat->start_sem); chunk_num++; + if (single_chunk) { + rbytes = 0; + continue; + } + /* * Read the next buffer we want to process while previous * buffer is in progress. @@ -1193,8 +1231,10 @@ comp_done: if (dary != NULL) { for (i = 0; i < nprocs; i++) { if (!dary[i]) continue; - slab_free(NULL, dary[i]->uncompressed_chunk); - slab_free(NULL, dary[i]->cmp_seg); + if (dary[i]->uncompressed_chunk != (uchar_t *)1) + slab_free(NULL, dary[i]->uncompressed_chunk); + if (dary[i]->cmp_seg != (uchar_t *)1) + slab_free(NULL, dary[i]->cmp_seg); if (enable_rabin_scan) { destroy_rabin_context(dary[i]->rctx); } @@ -1205,7 +1245,8 @@ comp_done: slab_free(NULL, dary); } if (enable_rabin_split) destroy_rabin_context(rctx); - slab_free(NULL, cread_buf); + if (cread_buf != (uchar_t *)1) + slab_free(NULL, cread_buf); if (!pipe_mode) { if (compfd != -1) close(compfd); if (uncompfd != -1) close(uncompfd); @@ -1236,6 +1277,15 @@ init_algo(const char *algo, int bail) _stats_func = zlib_stats; rv = 0; + } else if (memcmp(algorithm, "lzmaMt", 6) == 0) { + _compress_func = lzma_compress; + _decompress_func = lzma_decompress; + _init_func = lzma_init; + _deinit_func = lzma_deinit; + _stats_func = lzma_stats; + _props_func = lzma_mt_props; + rv = 0; + } else if (memcmp(algorithm, "lzma", 4) == 0) { _compress_func = lzma_compress; _decompress_func = lzma_decompress; diff --git a/pcompress.h b/pcompress.h index ab088ee..2842dbf 100644 --- a/pcompress.h +++ b/pcompress.h @@ -108,6 +108,7 @@ extern int lz4_init(void **data, int *level, int nthreads, ssize_t chunksize); extern int none_init(void **data, int *level, int nthreads, ssize_t chunksize); extern void lzma_props(algo_props_t *data, int level, ssize_t chunksize); +extern void lzma_mt_props(algo_props_t *data, int level, ssize_t chunksize); extern void lz4_props(algo_props_t *data, int level, ssize_t chunksize); extern int zlib_deinit(void **data);