Update chunk size computation to reduce memory usage.

Implement runtime bypass of custom allocator.
Update README.
This commit is contained in:
Moinak Ghosh 2012-07-27 22:03:24 +05:30
parent 9c3423530c
commit c7cc7b469c
7 changed files with 122 additions and 12 deletions

View file

@ -1,4 +1,82 @@
pcompress Pcompress
========= =========
A Parallel Compression/Decompression utility Copyright (C) 2012 Moinak Ghosh. All rights reserved.
Use is subject to license terms.
Pcompress is a utility to do compression and decompression in parallel by
splitting input data into chunks. It has a modular structure and includes
support for multiple algorithms like LZMA, Bzip2, PPMD, etc., with CRC64
chunk checksums. SSE optimizations for the bundled LZMA are included. It
also implements chunk-level Content-Aware Deduplication and Delta
Compression features based on a Semi-Rabin Fingerprinting scheme. Delta
Compression is implemented via the widely popular bsdiff algorithm.
Similarity is detected using a custom hashing of maximal features of a
block. When doing chunk-level dedupe it attempts to merge adjacent
non-duplicate blocks index entries into a single larger entry to reduce
metadata. In addition to all these it can internally split chunks at
rabin boundaries to help dedupe and compression.
It has low metadata overhead and overlaps I/O and compression to achieve
maximum parallelism. It also bundles a simple slab allocator to speed
repeated allocation of similar chunks. It can work in pipe mode, reading
from stdin and writing to stdout. It also provides some adaptive compression
modes in which multiple algorithms are tried per chunk to determine the best
one for the given chunk. Finally it support 14 compression levels to allow
for ultra compression modes in some algorithms.
Usage
=====
To compress a file:
pcompress -c <algorithm> [-l <compress level>] [-s <chunk size>] <file>
Where <algorithm> can be the folowing:
lzfx - Very fast and small algorithm based on LZF.
lz4 - Ultra fast, high-throughput algorithm reaching RAM B/W at level1.
zlib - The base Zlib format compression (not Gzip).
lzma - The LZMA (Lempel-Ziv Markov) algorithm from 7Zip.
bzip2 - Bzip2 Algorithm from libbzip2.
ppmd - The PPMd algorithm excellent for textual data. PPMd requires
at least 64MB X CPUs more memory than the other modes.
adapt - Adaptive mode where ppmd or bzip2 will be used per chunk,
depending on which one produces better compression. This mode
is obviously fairly slow and requires lots of memory.
adapt2 - Adaptive mode which includes ppmd and lzma. This requires
more memory than adapt mode, is slower and potentially gives
the best compression.
<chunk_size> - This can be in bytes or can use the following suffixes:
g - Gigabyte, m - Megabyte, k - Kilobyte.
Larger chunks produce better compression at the cost of memory.
<compress_level> - Can be a number from 0 meaning minimum and 14 meaning
maximum compression.
To decompress a file compressed using above command:
pcompress -d <compressed file> <target file>
To operate as a pipe, read from stdin and write to stdout:
pcompress -p ...
Attempt Rabin fingerprinting based deduplication on chunks:
pcompress -D ...
pcompress -D -r ... - Do NOT split chunks at a rabin boundary. Default is to split.
Perform Delta Encoding in addition to Exact Dedup:
pcompress -E ... - This also implies '-D'.
Number of threads can optionally be specified: -t <1 - 256 count>
Pass '-M' to display memory allocator statistics
Pass '-C' to display compression statistics
Examples
========
Compress "file.tar" using bzip2 level 6, 64MB chunk size and use 4 threads. In
addition perform exact deduplication and delta compression prior to compression.
pcompress -D -E -c bzip2 -l6 -s64m -t4 file.tar
Compress "file.tar" using extreme compression mode of LZMA and a chunk size of
of 1GB. Allow pcompress to detect the number of CPU cores and use as many threads.
pcompress -c lzma -l14 -s1g file.tar

View file

@ -96,7 +96,7 @@ static struct bufentry **htable;
static pthread_mutex_t *hbucket_locks; static pthread_mutex_t *hbucket_locks;
static pthread_mutex_t htable_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t htable_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t slab_table_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t slab_table_lock = PTHREAD_MUTEX_INITIALIZER;
static int inited = 0; static int inited = 0, bypass = 0;
static uint64_t total_allocs, oversize_allocs, hash_collisions, hash_entries; static uint64_t total_allocs, oversize_allocs, hash_collisions, hash_entries;
@ -124,6 +124,12 @@ slab_init()
size_t slab_sz; size_t slab_sz;
int nprocs; int nprocs;
/* Check bypass env variable. */
if (getenv("ALLOCATOR_BYPASS") != NULL) {
bypass = 1;
return;
}
/* Initialize first NUM_POW2 power of 2 slots. */ /* Initialize first NUM_POW2 power of 2 slots. */
slab_sz = SLAB_START_SZ; slab_sz = SLAB_START_SZ;
for (i = 0; i < NUM_POW2; i++) { for (i = 0; i < NUM_POW2; i++) {
@ -177,6 +183,7 @@ slab_cleanup(int quiet)
uint64_t nonfreed_oversize; uint64_t nonfreed_oversize;
if (!inited) return; if (!inited) return;
if (bypass) return;
if (!quiet) { if (!quiet) {
fprintf(stderr, "Slab Allocation Stats\n"); fprintf(stderr, "Slab Allocation Stats\n");
@ -276,6 +283,7 @@ void *
slab_calloc(void *p, size_t items, size_t size) { slab_calloc(void *p, size_t items, size_t size) {
void *ptr; void *ptr;
if (bypass) return(calloc(items, size));
ptr = slab_alloc(p, items * size); ptr = slab_alloc(p, items * size);
memset(ptr, 0, items * size); memset(ptr, 0, items * size);
return (ptr); return (ptr);
@ -338,6 +346,7 @@ slab_cache_add(size_t size)
{ {
uint32_t sindx; uint32_t sindx;
struct slabentry *slab; struct slabentry *slab;
if (bypass) return (0);
if (try_dynamic_slab(size)) return (0); /* Already added. */ if (try_dynamic_slab(size)) return (0); /* Already added. */
/* Locate the hash slot for the size. */ /* Locate the hash slot for the size. */
@ -375,6 +384,7 @@ slab_alloc(void *p, size_t size)
void *ptr; void *ptr;
struct slabentry *slab; struct slabentry *slab;
if (bypass) return (malloc(size));
ATOMIC_ADD(total_allocs, 1); ATOMIC_ADD(total_allocs, 1);
slab = NULL; slab = NULL;
@ -444,6 +454,7 @@ slab_free(void *p, void *address)
uint32_t hindx; uint32_t hindx;
if (!address) return; if (!address) return;
if (bypass) { free(address); return; }
hindx = hash6432shift((uint64_t)(address)) & (HTABLE_SZ - 1); hindx = hash6432shift((uint64_t)(address)) & (HTABLE_SZ - 1);
pthread_mutex_lock(&hbucket_locks[hindx]); pthread_mutex_lock(&hbucket_locks[hindx]);

21
main.c
View file

@ -45,6 +45,7 @@
#include <pcompress.h> #include <pcompress.h>
#include <allocator.h> #include <allocator.h>
#include <rabin_polynomial.h> #include <rabin_polynomial.h>
#include <zlib.h>
/* Needed for CLzmaEncprops. */ /* Needed for CLzmaEncprops. */
#include <LzmaEnc.h> #include <LzmaEnc.h>
@ -788,7 +789,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
rabin_context_t *rctx; rabin_context_t *rctx;
/* /*
* Compressed buffer size must include zlib scratch space and * Compressed buffer size must include zlib/dedup scratch space and
* chunk header space. * chunk header space.
* See http://www.zlib.net/manual.html#compress2 * See http://www.zlib.net/manual.html#compress2
* *
@ -799,19 +800,23 @@ start_compress(const char *filename, uint64_t chunksize, int level)
* See start_decompress() routine for details of chunk header. * See start_decompress() routine for details of chunk header.
* We also keep extra 8-byte space for the last chunk's size. * We also keep extra 8-byte space for the last chunk's size.
*/ */
compressed_chunksize = chunksize + (chunksize >> 6) + compressed_chunksize = chunksize + sizeof (chunksize) +
sizeof (chunksize) + sizeof (uint64_t) + sizeof (chunksize); sizeof (uint64_t) + sizeof (chunksize) + zlib_buf_extra(chunksize);
err = 0;
flags = 0; flags = 0;
if (enable_rabin_scan) {
flags |= FLAG_DEDUP;
/* Additional scratch space for dedup arrays. */
compressed_chunksize += (rabin_buf_extra(chunksize) -
(compressed_chunksize - chunksize));
}
err = 0;
thread = 0; thread = 0;
slab_cache_add(chunksize); slab_cache_add(chunksize);
slab_cache_add(compressed_chunksize + CHDR_SZ); slab_cache_add(compressed_chunksize + CHDR_SZ);
slab_cache_add(sizeof (struct cmp_data)); slab_cache_add(sizeof (struct cmp_data));
if (enable_rabin_scan) {
flags |= FLAG_DEDUP;
}
/* A host of sanity checks. */ /* A host of sanity checks. */
if (!pipe_mode) { if (!pipe_mode) {
if ((uncompfd = open(filename, O_RDWR, 0)) == -1) if ((uncompfd = open(filename, O_RDWR, 0)) == -1)

View file

@ -54,6 +54,7 @@ extern "C" {
extern uint64_t lzma_crc64(const uint8_t *buf, size_t size, uint64_t crc); extern uint64_t lzma_crc64(const uint8_t *buf, size_t size, uint64_t crc);
extern uint64_t lzma_crc64_8bchk(const uint8_t *buf, size_t size, extern uint64_t lzma_crc64_8bchk(const uint8_t *buf, size_t size,
uint64_t crc, uint64_t *cnt); uint64_t crc, uint64_t *cnt);
extern uint32_t zlib_buf_extra(ssize_t buflen);
extern int zlib_compress(void *src, size_t srclen, void *dst, extern int zlib_compress(void *src, size_t srclen, void *dst,
size_t *destlen, int level, uchar_t chdr, void *data); size_t *destlen, int level, uchar_t chdr, void *data);

View file

@ -81,6 +81,12 @@ extern int bspatch(u_char *pbuf, u_char *old, bsize_t oldsize, u_char *new,
uint32_t rabin_polynomial_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE; uint32_t rabin_polynomial_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE;
uint32_t
rabin_buf_extra(uint64_t chunksize)
{
return ((chunksize / RAB_POLYNOMIAL_MIN_BLOCK_SIZE2) * sizeof (uint32_t));
}
/* /*
* Initialize the algorithm with the default params. * Initialize the algorithm with the default params.
*/ */

View file

@ -168,5 +168,6 @@ extern void rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_i
extern void rabin_update_hdr(uchar_t *buf, ssize_t rabin_index_sz_cmp, extern void rabin_update_hdr(uchar_t *buf, ssize_t rabin_index_sz_cmp,
ssize_t rabin_data_sz_cmp); ssize_t rabin_data_sz_cmp);
extern void reset_rabin_context(rabin_context_t *ctx); extern void reset_rabin_context(rabin_context_t *ctx);
extern uint32_t rabin_buf_extra(uint64_t chunksize);
#endif /* _RABIN_POLY_H_ */ #endif /* _RABIN_0POLY_H_ */

View file

@ -43,6 +43,14 @@ slab_alloc_ui(void *p, unsigned int items, unsigned int size) {
return (ptr); return (ptr);
} }
uint32_t
zlib_buf_extra(ssize_t buflen)
{
if (buflen > SINGLE_CALL_MAX)
buflen = SINGLE_CALL_MAX;
return (compressBound(buflen) - buflen);
}
int int
zlib_init(void **data, int *level, ssize_t chunksize) zlib_init(void **data, int *level, ssize_t chunksize)
{ {