pcompress/main.c
Moinak Ghosh a873f92e41 Fix crash when decompressing deduped archive.
Ensure correct level is passed to lzma.
Avoid branch when wrapping rabin window position and check for rabin window size to be power of 2.
Update rabin parameters check for adaptive modes.
Add detection of 7-bit text/8-bit binary data for later use.
2012-07-10 20:14:23 +05:30

1351 lines
36 KiB
C

/*
* This file is a part of Pcompress, a chunked parallel multi-
* algorithm lossless compression and decompression program.
*
* Copyright (C) 2012 Moinak Ghosh. All rights reserved.
* Use is subject to license terms.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* moinakg@belenix.org, http://moinakg.wordpress.com/
*
* This program includes partly-modified public domain/LGPL source
* code from the LZMA SDK: http://www.7-zip.org/sdk.html
*/
/*
* pcompress - Do a chunked parallel compression/decompression of a file.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/param.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <strings.h>
#include <limits.h>
#include <unistd.h>
#if defined(sun) || defined(__sun)
#include <sys/byteorder.h>
#else
#include <byteswap.h>
#endif
#include <libgen.h>
#include <utils.h>
#include <pcompress.h>
#include <allocator.h>
#include <rabin_polynomial.h>
/* Needed for CLzmaEncprops. */
#include <LzmaEnc.h>
/*
* We use 5MB chunks by default.
*/
#define DEFAULT_CHUNKSIZE (5 * 1024 * 1024)
struct wdata {
struct cmp_data **dary;
int wfd;
int nprocs;
ssize_t chunksize;
};
static void * writer_thread(void *dat);
static int init_algo(const char *algo, int bail);
static compress_func_ptr _compress_func;
static compress_func_ptr _decompress_func;
static init_func_ptr _init_func;
static deinit_func_ptr _deinit_func;
static stats_func_ptr _stats_func;
static int main_cancel;
static int adapt_mode = 0;
static int pipe_mode = 0;
static int nthreads = 0;
static int hide_mem_stats = 1;
static int hide_cmp_stats = 1;
static int enable_rabin_scan = 0;
static int enable_rabin_split = 1;
static unsigned int chunk_num;
static uint64_t largest_chunk, smallest_chunk, avg_chunk;
static const char *exec_name;
static const char *algo = NULL;
static int do_compress = 0;
static int do_uncompress = 0;
static rabin_context_t *rctx;
static void
usage(void)
{
fprintf(stderr,
"Usage:\n"
"1) To compress a file:\n"
" %s -c <algorithm> [-l <compress level>] [-s <chunk size>] <file>\n"
" Where <algorithm> can be the folowing:\n"
" zlib - The base Zlib format compression (not Gzip).\n"
" lzma - The LZMA (Lempel-Ziv Markov) algorithm from 7Zip.\n"
" bzip2 - Bzip2 Algorithm from libbzip2.\n"
" ppmd - The PPMd algorithm excellent for textual data. PPMd requires\n"
" at least 64MB X CPUs more memory than the other modes.\n"
" adapt - Adaptive mode where ppmd or bzip2 will be used per chunk,\n"
" depending on which one produces better compression. This mode\n"
" is obviously fairly slow and requires lots of memory.\n"
" adapt2 - Adaptive mode which includes ppmd and lzma. This requires\n"
" more memory than adapt mode, is slower and potentially gives\n"
" the best compression.\n"
" <chunk_size> - This can be in bytes or can use the following suffixes:\n"
" g - Gigabyte, m - Megabyte, k - Kilobyte.\n"
" Larger chunks produce better compression at the cost of memory.\n"
" <compress_level> - Can be a number from 0 meaning minimum and 14 meaning\n"
" maximum compression.\n\n"
"2) To decompress a file compressed using above command:\n"
" %s -d <compressed file> <target file>\n"
"3) To operate as a pipe, read from stdin and write to stdout:\n"
" %s -p ...\n"
"4) Attempt Rabin fingerprinting based deduplication on chunks:\n"
" %s -D ...\n"
" %s -D -r ... - Do NOT split chunks at a rabin boundary. Default is to split.\n"
"5) Number of threads can optionally be specified: -t <1 - 256 count>\n"
"6) Pass '-M' to display memory allocator statistics\n"
"7) Pass '-C' to display compression statistics\n\n",
exec_name, exec_name, exec_name, exec_name);
}
void
show_compression_stats(uint64_t chunksize)
{
fprintf(stderr, "\nCompression Statistics\n");
fprintf(stderr, "======================\n");
fprintf(stderr, "Total chunks : %u\n", chunk_num);
fprintf(stderr, "Best compressed chunk : %s(%.2f%%)\n",
bytes_to_size(smallest_chunk), (double)smallest_chunk/(double)chunksize*100);
fprintf(stderr, "Worst compressed chunk : %s(%.2f%%)\n",
bytes_to_size(largest_chunk), (double)largest_chunk/(double)chunksize*100);
avg_chunk /= chunk_num;
fprintf(stderr, "Avg compressed chunk : %s(%.2f%%)\n\n",
bytes_to_size(avg_chunk), (double)avg_chunk/(double)chunksize*100);
}
/*
* This routine is called in multiple threads. Calls the decompression handler
* as encoded in the file header. For adaptive mode the handler adapt_decompress()
* in turns looks at the chunk header and call the actualy decompression
* routine.
*/
static void *
perform_decompress(void *dat)
{
struct cmp_data *tdat = (struct cmp_data *)dat;
ssize_t _chunksize;
ssize_t rabin_index_sz, rabin_data_sz, rabin_index_sz_cmp, rabin_data_sz_cmp;
int type, rv;
unsigned int blknum;
typeof (tdat->crc64) crc64;
uchar_t HDR;
uchar_t *cseg;
redo:
sem_wait(&tdat->start_sem);
if (unlikely(tdat->cancel)) {
tdat->len_cmp = 0;
sem_post(&tdat->cmp_done_sem);
return (0);
}
/*
* If the last read returned a 0 quit.
*/
if (tdat->rbytes == 0) {
tdat->len_cmp = 0;
tdat->crc64 = 0;
goto cont;
}
cseg = tdat->compressed_chunk + sizeof (crc64);
_chunksize = tdat->chunksize;
tdat->crc64 = htonll(*((typeof (crc64) *)(tdat->compressed_chunk)));
HDR = *cseg;
cseg += CHDR_SZ;
if (HDR & CHSIZE_MASK) {
uchar_t *rseg;
tdat->rbytes -= sizeof (ssize_t);
tdat->len_cmp -= sizeof (ssize_t);
rseg = tdat->compressed_chunk + tdat->rbytes;
_chunksize = ntohll(*((ssize_t *)rseg));
}
if (HDR & COMPRESSED) {
if (enable_rabin_scan && (HDR & CHUNK_FLAG_DEDUP)) {
uchar_t *cmpbuf, *ubuf;
/* Extract various sizes from rabin header. */
rabin_parse_hdr(cseg, &blknum, &rabin_index_sz, &rabin_data_sz,
&rabin_index_sz_cmp, &rabin_data_sz_cmp, &_chunksize);
memcpy(tdat->uncompressed_chunk, cseg, RABIN_HDR_SIZE);
/*
* Uncompress the data chunk first and then uncompress the index.
* The uncompress routines can use extra bytes at the end for temporary
* state/dictionary info. Since data chunk directly follows index
* uncompressing index first corrupts the data.
*/
cmpbuf = cseg + RABIN_HDR_SIZE + rabin_index_sz_cmp;
ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE + rabin_index_sz;
rv = tdat->decompress(cmpbuf, rabin_data_sz_cmp, ubuf, &_chunksize,
tdat->level, HDR, tdat->data);
if (rv == -1) {
tdat->len_cmp = 0;
fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id);
goto cont;
}
cmpbuf = cseg + RABIN_HDR_SIZE;
ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE;
rv = lzma_decompress(cmpbuf, rabin_index_sz_cmp, ubuf, &rabin_index_sz,
tdat->rctx->level, 0, tdat->rctx->lzma_data);
} else {
rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk,
&_chunksize, tdat->level, HDR, tdat->data);
}
} else {
memcpy(cseg + CHDR_SZ, tdat->uncompressed_chunk, _chunksize);
}
tdat->len_cmp = _chunksize;
if (rv == -1) {
tdat->len_cmp = 0;
fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id);
goto cont;
}
/* Rebuild chunk from dedup blocks. */
if (enable_rabin_scan && (HDR & CHUNK_FLAG_DEDUP)) {
rabin_context_t *rctx;
uchar_t *tmp;
rctx = tdat->rctx;
reset_rabin_context(tdat->rctx);
rctx->cbuf = tdat->compressed_chunk;
rabin_inverse_dedup(rctx, tdat->uncompressed_chunk, &(tdat->len_cmp));
if (!rctx->valid) {
fprintf(stderr, "ERROR: Chunk %d, dedup recovery failed.\n", tdat->id);
rv = -1;
tdat->len_cmp = 0;
goto cont;
}
_chunksize = tdat->len_cmp;
tmp = tdat->uncompressed_chunk;
tdat->uncompressed_chunk = tdat->compressed_chunk;
tdat->compressed_chunk = tmp;
tdat->cmp_seg = tdat->uncompressed_chunk;
}
/*
* Re-compute checksum of original uncompressed chunk.
* If it does not match we set length of chunk to 0 to indicate
* exit to the writer thread.
*/
crc64 = lzma_crc64(tdat->uncompressed_chunk, _chunksize, 0);
if (crc64 != tdat->crc64) {
tdat->len_cmp = 0;
fprintf(stderr, "ERROR: Chunk %d, checksums do not match.\n", tdat->id);
}
cont:
sem_post(&tdat->cmp_done_sem);
goto redo;
}
/*
* File decompression routine.
*
* Compressed file Format
* ----------------------
* File Header:
* Algorithm string: 8 bytes.
* Version number: 2 bytes.
* Global Flags: 2 bytes.
* Chunk size: 8 bytes.
* Compression Level: 4 bytes.
*
* Chunk Header:
* Compressed length: 8 bytes.
* CRC64 Checksum: 8 bytes.
* Chunk flags: 1 byte.
*
* Chunk Flags, 8 bits:
* I I I I I I I I
* | | | | |
* | '-----' | `- 0 - Uncompressed
* | | | 1 - Compressed
* | | |
* | | `---- 1 - Chunk was Deduped
* | |
* | | 1 - Bzip2 (Adaptive Mode)
* | `---------------- 2 - Lzma (Adaptive Mode)
* | 3 - PPMD (Adaptive Mode)
* |
* `---------------------- 1 - Last Chunk flag
*
* A file trailer to indicate end.
* Zero Compressed length: 8 zero bytes.
*/
#define UNCOMP_BAIL err = 1; goto uncomp_done
static void
start_decompress(const char *filename, const char *to_filename)
{
char tmpfile[MAXPATHLEN];
char algorithm[ALGO_SZ];
struct stat sbuf;
struct wdata w;
int compfd = -1, i, p;
int uncompfd = -1, err, np, bail;
int nprocs, thread = 0, level;
short version, flags;
ssize_t chunksize, compressed_chunksize;
struct cmp_data **dary, *tdat;
pthread_t writer_thr;
err = 0;
flags = 0;
thread = 0;
/*
* Open files and do sanity checks.
*/
if (!pipe_mode) {
if ((compfd = open(filename, O_RDONLY, 0)) == -1)
err_exit(1, "Cannot open: %s", filename);
if (fstat(compfd, &sbuf) == -1)
err_exit(1, "Cannot stat: %s", filename);
if (sbuf.st_size == 0)
return;
if ((uncompfd = open(to_filename, O_WRONLY|O_CREAT|O_TRUNC, 0)) == -1) {
close(compfd);
err_exit(1, "Cannot open: %s", to_filename);
}
} else {
compfd = fileno(stdin);
if (compfd == -1) {
perror("fileno ");
UNCOMP_BAIL;
}
uncompfd = fileno(stdout);
if (uncompfd == -1) {
perror("fileno ");
UNCOMP_BAIL;
}
}
/*
* Read file header pieces and verify.
*/
if (Read(compfd, algorithm, ALGO_SZ) < ALGO_SZ) {
perror("Read: ");
UNCOMP_BAIL;
}
if (init_algo(algorithm, 0) != 0) {
fprintf(stderr, "%s is not a pcompressed file.\n", filename);
UNCOMP_BAIL;
}
algo = algorithm;
if (Read(compfd, &version, sizeof (version)) < sizeof (version) ||
Read(compfd, &flags, sizeof (flags)) < sizeof (flags) ||
Read(compfd, &chunksize, sizeof (chunksize)) < sizeof (chunksize) ||
Read(compfd, &level, sizeof (level)) < sizeof (level)) {
perror("Read: ");
UNCOMP_BAIL;
}
version = ntohs(version);
flags = ntohs(flags);
chunksize = ntohll(chunksize);
level = ntohl(level);
if (version != VERSION) {
fprintf(stderr, "Unsupported version: %d\n", version);
err = 1;
goto uncomp_done;
}
compressed_chunksize = chunksize + (chunksize >> 6) + sizeof (uint64_t)
+ sizeof (chunksize);
if (flags & FLAG_DEDUP) {
enable_rabin_scan = 1;
}
nprocs = sysconf(_SC_NPROCESSORS_ONLN);
if (nthreads > 0 && nthreads < nprocs)
nprocs = nthreads;
fprintf(stderr, "Scaling to %d threads\n", nprocs);
slab_cache_add(compressed_chunksize + CHDR_SZ);
slab_cache_add(chunksize);
slab_cache_add(sizeof (struct cmp_data));
dary = (struct cmp_data **)slab_alloc(NULL, sizeof (struct cmp_data *) * nprocs);
for (i = 0; i < nprocs; i++) {
dary[i] = (struct cmp_data *)slab_alloc(NULL, sizeof (struct cmp_data));
if (!dary[i]) {
fprintf(stderr, "Out of memory\n");
UNCOMP_BAIL;
}
tdat = dary[i];
tdat->compressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
if (!tdat->compressed_chunk) {
fprintf(stderr, "Out of memory\n");
UNCOMP_BAIL;
}
if (enable_rabin_scan)
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
else
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize);
if (!tdat->uncompressed_chunk) {
fprintf(stderr, "Out of memory\n");
UNCOMP_BAIL;
}
tdat->cmp_seg = tdat->uncompressed_chunk;
tdat->chunksize = chunksize;
tdat->compress = _compress_func;
tdat->decompress = _decompress_func;
tdat->cancel = 0;
tdat->level = level;
sem_init(&(tdat->start_sem), 0, 0);
sem_init(&(tdat->cmp_done_sem), 0, 0);
sem_init(&(tdat->write_done_sem), 0, 1);
if (_init_func)
_init_func(&(tdat->data), &(tdat->level), chunksize);
if (enable_rabin_scan)
tdat->rctx = create_rabin_context(chunksize, compressed_chunksize, algo);
else
tdat->rctx = NULL;
if (pthread_create(&(tdat->thr), NULL, perform_decompress,
(void *)tdat) != 0) {
perror("Error in thread creation: ");
UNCOMP_BAIL;
}
}
thread = 1;
w.dary = dary;
w.wfd = uncompfd;
w.nprocs = nprocs;
w.chunksize = chunksize;
if (pthread_create(&writer_thr, NULL, writer_thread, (void *)(&w)) != 0) {
perror("Error in thread creation: ");
UNCOMP_BAIL;
}
/*
* Now read from the compressed file in variable compressed chunk size.
* First the size is read from the chunk header and then as many bytes +
* CRC64 checksum size are read and passed to decompression thread.
* Chunk sequencing is ensured.
*/
chunk_num = 0;
np = 0;
bail = 0;
while (!bail) {
ssize_t rb;
if (main_cancel) break;
for (p = 0; p < nprocs; p++) {
np = p;
tdat = dary[p];
sem_wait(&tdat->write_done_sem);
if (main_cancel) break;
tdat->id = chunk_num;
/*
* First read length of compressed chunk.
*/
rb = Read(compfd, &tdat->len_cmp, sizeof (tdat->len_cmp));
if (rb != sizeof (tdat->len_cmp)) {
if (rb < 0) perror("Read: ");
else
fprintf(stderr, "Incomplete chunk %d header,"
"file corrupt\n", chunk_num);
UNCOMP_BAIL;
}
tdat->len_cmp = htonll(tdat->len_cmp);
/*
* Zero compressed len means end of file.
*/
if (tdat->len_cmp == 0) {
bail = 1;
break;
}
if (tdat->len_cmp > largest_chunk)
largest_chunk = tdat->len_cmp;
if (tdat->len_cmp < smallest_chunk)
smallest_chunk = tdat->len_cmp;
avg_chunk += tdat->len_cmp;
/*
* Now read compressed chunk including the crc64 checksum.
*/
tdat->rbytes = Read(compfd, tdat->compressed_chunk,
tdat->len_cmp + sizeof(tdat->crc64) + CHDR_SZ);
if (main_cancel) break;
if (tdat->rbytes < tdat->len_cmp + sizeof(tdat->crc64) + CHDR_SZ) {
if (tdat->rbytes < 0) {
perror("Read: ");
UNCOMP_BAIL;
} else {
fprintf(stderr, "Incomplete chunk %d, file corrupt.\n",
chunk_num);
UNCOMP_BAIL;
}
}
sem_post(&tdat->start_sem);
chunk_num++;
}
}
if (!main_cancel) {
for (p = 0; p < nprocs; p++) {
if (p == np) continue;
tdat = dary[p];
sem_wait(&tdat->write_done_sem);
}
thread = 0;
}
uncomp_done:
if (thread) {
for (i = 0; i < nprocs; i++) {
tdat = dary[i];
tdat->cancel = 1;
tdat->len_cmp = 0;
sem_post(&tdat->start_sem);
sem_post(&tdat->cmp_done_sem);
pthread_join(tdat->thr, NULL);
}
pthread_join(writer_thr, NULL);
}
/*
* Ownership and mode of target should be same as original.
*/
fchmod(uncompfd, sbuf.st_mode);
if (fchown(uncompfd, sbuf.st_uid, sbuf.st_gid) == -1)
perror("Chown ");
if (dary != NULL) {
for (i = 0; i < nprocs; i++) {
slab_free(NULL, dary[i]->uncompressed_chunk);
slab_free(NULL, dary[i]->compressed_chunk);
if (_deinit_func)
_deinit_func(&(tdat->data));
if (enable_rabin_scan) {
destroy_rabin_context(dary[i]->rctx);
}
slab_free(NULL, dary[i]);
}
slab_free(NULL, dary);
}
if (!pipe_mode) {
if (compfd != -1) close(compfd);
if (uncompfd != -1) close(uncompfd);
}
if (!hide_cmp_stats) show_compression_stats(chunksize);
slab_cleanup(hide_mem_stats);
}
static void *
perform_compress(void *dat) {
struct cmp_data *tdat = (struct cmp_data *)dat;
typeof (tdat->chunksize) _chunksize, len_cmp, rabin_index_sz, index_size_cmp;
int type, rv;
uchar_t *compressed_chunk;
redo:
sem_wait(&tdat->start_sem);
if (unlikely(tdat->cancel)) {
tdat->len_cmp = 0;
sem_post(&tdat->cmp_done_sem);
return (0);
}
compressed_chunk = tdat->compressed_chunk + CHDR_SZ;
/* Perform Dedup if enabled. */
if (enable_rabin_scan) {
rabin_context_t *rctx;
ssize_t rbytes;
/*
* Compute checksum of original uncompressed chunk.
*/
tdat->crc64 = lzma_crc64(tdat->cmp_seg, tdat->rbytes, 0);
rctx = tdat->rctx;
rbytes = tdat->rbytes;
reset_rabin_context(tdat->rctx);
rctx->cbuf = tdat->uncompressed_chunk;
rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0, NULL);
if (!rctx->valid) {
memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes);
tdat->rbytes = rbytes;
}
} else {
/*
* Compute checksum of original uncompressed chunk.
*/
tdat->crc64 = lzma_crc64(tdat->uncompressed_chunk, tdat->rbytes, 0);
}
/*
* If doing dedup we compress rabin index and deduped data separately.
* The rabin index array values can pollute the compressor's dictionary thereby
* reducing compression effectiveness of the data chunk. So we separate them.
*/
if (enable_rabin_scan && tdat->rctx->valid) {
_chunksize = tdat->rbytes - rabin_index_sz - RABIN_HDR_SIZE;
index_size_cmp = rabin_index_sz;
memcpy(compressed_chunk, tdat->uncompressed_chunk, RABIN_HDR_SIZE);
/* Compress index. */
rv = lzma_compress(tdat->uncompressed_chunk + RABIN_HDR_SIZE,
rabin_index_sz, compressed_chunk + RABIN_HDR_SIZE, &index_size_cmp,
tdat->rctx->level, 0, tdat->rctx->lzma_data);
index_size_cmp += RABIN_HDR_SIZE;
rabin_index_sz += RABIN_HDR_SIZE;
if (rv == 0) {
/* Compress data chunk. */
rv = tdat->compress(tdat->uncompressed_chunk + rabin_index_sz,
_chunksize, compressed_chunk + index_size_cmp, &_chunksize,
tdat->level, 0, tdat->data);
/* Now update rabin header with the compressed sizes. */
rabin_update_hdr(compressed_chunk, index_size_cmp - RABIN_HDR_SIZE,
_chunksize);
}
_chunksize += index_size_cmp;
} else {
_chunksize = tdat->rbytes;
rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes,
compressed_chunk, &_chunksize, tdat->level, 0, tdat->data);
}
/*
* Sanity check to ensure compressed data is lesser than original.
* If at all compression expands/does not shrink data then the chunk
* will be left uncompressed. Also if the compression errored the
* chunk will be left uncompressed.
*/
tdat->len_cmp = _chunksize;
if (_chunksize >= tdat->chunksize || rv < 0) {
memcpy(compressed_chunk, tdat->uncompressed_chunk, tdat->rbytes);
type = UNCOMPRESSED;
tdat->len_cmp = tdat->rbytes;
} else {
type = COMPRESSED;
}
if (enable_rabin_scan && tdat->rctx->valid) {
type |= CHUNK_FLAG_DEDUP;
}
/*
* Insert compressed chunk length and CRC64 checksum into
* chunk header.
*/
len_cmp = tdat->len_cmp;
*((typeof (len_cmp) *)(tdat->cmp_seg)) = htonll(tdat->len_cmp);
*((typeof (tdat->crc64) *)(tdat->cmp_seg + sizeof (tdat->len_cmp))) = htonll(tdat->crc64);
tdat->len_cmp += CHDR_SZ;
tdat->len_cmp += sizeof (len_cmp);
tdat->len_cmp += sizeof (tdat->crc64);
if (adapt_mode)
type |= (rv << 4);
/*
* If chunk is less than max chunksize, store this length as well.
*/
if (tdat->rbytes < tdat->chunksize) {
type |= CHSIZE_MASK;
*((typeof (tdat->rbytes) *)(tdat->cmp_seg + tdat->len_cmp)) = htonll(tdat->rbytes);
tdat->len_cmp += sizeof (tdat->rbytes);
len_cmp += sizeof (tdat->rbytes);
*((typeof (len_cmp) *)(tdat->cmp_seg)) = htonll(len_cmp);
}
/*
* Set the chunk header flags.
*/
*(tdat->compressed_chunk) = type;
cont:
sem_post(&tdat->cmp_done_sem);
goto redo;
}
static void *
writer_thread(void *dat) {
int p;
struct wdata *w = (struct wdata *)dat;
struct cmp_data *tdat;
ssize_t wbytes;
repeat:
for (p = 0; p < w->nprocs; p++) {
tdat = w->dary[p];
sem_wait(&tdat->cmp_done_sem);
if (tdat->len_cmp == 0) {
goto do_cancel;
}
if (do_compress) {
if (tdat->len_cmp > largest_chunk)
largest_chunk = tdat->len_cmp;
if (tdat->len_cmp < smallest_chunk)
smallest_chunk = tdat->len_cmp;
avg_chunk += tdat->len_cmp;
}
wbytes = Write(w->wfd, tdat->cmp_seg, tdat->len_cmp);
if (unlikely(wbytes != tdat->len_cmp)) {
int i;
perror("Chunk Write: ");
do_cancel:
main_cancel = 1;
for (i = 0; i < w->nprocs; i++) {
tdat->cancel = 1;
sem_post(&tdat->start_sem);
sem_post(&tdat->write_done_sem);
}
return (0);
}
sem_post(&tdat->write_done_sem);
}
goto repeat;
}
/*
* File compression routine. Can use as many threads as there are
* logical cores unless user specified something different. There is
* not much to gain from nthreads > n logical cores however.
*/
#define COMP_BAIL err = 1; goto comp_done
void
start_compress(const char *filename, uint64_t chunksize, int level)
{
struct wdata w;
char tmpfile1[MAXPATHLEN];
char to_filename[MAXPATHLEN];
ssize_t compressed_chunksize;
ssize_t n_chunksize, rbytes, rabin_count;
short version, flags;
struct stat sbuf;
int compfd = -1, uncompfd = -1, err;
int i, thread = 0, bail;
int nprocs, np, p;
struct cmp_data **dary = NULL, *tdat;
pthread_t writer_thr;
uchar_t *cread_buf, *pos;
rabin_context_t *rctx;
/*
* Compressed buffer size must include zlib scratch space and
* chunk header space.
* See http://www.zlib.net/manual.html#compress2
*
* We do this unconditionally whether user mentioned zlib or not
* to keep it simple. While zlib scratch space is only needed at
* runtime, chunk header is stored in the file.
*
* See start_decompress() routine for details of chunk header.
* We also keep extra 8-byte space for the last chunk's size.
*/
compressed_chunksize = chunksize + (chunksize >> 6) +
sizeof (chunksize) + sizeof (uint64_t) + sizeof (chunksize);
err = 0;
flags = 0;
thread = 0;
slab_cache_add(chunksize);
slab_cache_add(compressed_chunksize + CHDR_SZ);
slab_cache_add(sizeof (struct cmp_data));
if (enable_rabin_scan) {
flags |= FLAG_DEDUP;
}
/* A host of sanity checks. */
if (!pipe_mode) {
if ((uncompfd = open(filename, O_RDWR, 0)) == -1)
err_exit(1, "Cannot open: %s", filename);
if (fstat(uncompfd, &sbuf) == -1) {
close(uncompfd);
err_exit(1, "Cannot stat: %s", filename);
}
if (!S_ISREG(sbuf.st_mode)) {
close(uncompfd);
err_exit(0, "File %s is not a regular file.\n", filename);
}
if (sbuf.st_size == 0) {
close(uncompfd);
return;
}
/*
* Adjust chunk size for small files. We then get an archive with
* a single chunk for the entire file.
*/
if (sbuf.st_size < chunksize) {
chunksize = sbuf.st_size;
nthreads = 1;
} else {
if (nthreads == 0 || nthreads > sbuf.st_size / chunksize) {
nthreads = sbuf.st_size / chunksize;
if (sbuf.st_size % chunksize)
nthreads++;
}
}
/*
* Create a temporary file to hold compressed data which is renamed at
* the end. The target file name is same as original file with the '.pz'
* extension appended.
*/
strcpy(tmpfile1, filename);
strcpy(tmpfile1, dirname(tmpfile1));
strcat(tmpfile1, "/.pcompXXXXXX");
snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename);
if ((compfd = mkstemp(tmpfile1)) == -1) {
perror("mkstemp ");
COMP_BAIL;
}
} else {
/*
* Use stdin/stdout for pipe mode.
*/
compfd = fileno(stdout);
if (compfd == -1) {
perror("fileno ");
COMP_BAIL;
}
uncompfd = fileno(stdin);
if (uncompfd == -1) {
perror("fileno ");
COMP_BAIL;
}
}
nprocs = sysconf(_SC_NPROCESSORS_ONLN);
if (nthreads > 0 && nthreads < nprocs)
nprocs = nthreads;
fprintf(stderr, "Scaling to %d thread", nprocs);
if (nprocs > 1) fprintf(stderr, "s");
fprintf(stderr, "\n");
dary = (struct cmp_data **)slab_alloc(NULL, sizeof (struct cmp_data *) * nprocs);
if (enable_rabin_scan)
cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ);
else
cread_buf = (uchar_t *)slab_alloc(NULL, chunksize);
if (!cread_buf) {
fprintf(stderr, "Out of memory\n");
COMP_BAIL;
}
for (i = 0; i < nprocs; i++) {
dary[i] = (struct cmp_data *)slab_alloc(NULL, sizeof (struct cmp_data));
if (!dary[i]) {
fprintf(stderr, "Out of memory\n");
COMP_BAIL;
}
tdat = dary[i];
tdat->cmp_seg = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ);
tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t);
if (!tdat->compressed_chunk) {
fprintf(stderr, "Out of memory\n");
COMP_BAIL;
}
if (enable_rabin_scan)
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ);
else
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize);
if (!tdat->uncompressed_chunk) {
fprintf(stderr, "Out of memory\n");
COMP_BAIL;
}
tdat->chunksize = chunksize;
tdat->compress = _compress_func;
tdat->decompress = _decompress_func;
tdat->cancel = 0;
tdat->level = level;
sem_init(&(tdat->start_sem), 0, 0);
sem_init(&(tdat->cmp_done_sem), 0, 0);
sem_init(&(tdat->write_done_sem), 0, 1);
if (_init_func)
_init_func(&(tdat->data), &(tdat->level), chunksize);
if (enable_rabin_scan)
tdat->rctx = create_rabin_context(chunksize, compressed_chunksize, algo);
else
tdat->rctx = NULL;
if (pthread_create(&(tdat->thr), NULL, perform_compress,
(void *)tdat) != 0) {
perror("Error in thread creation: ");
COMP_BAIL;
}
}
thread = 1;
w.dary = dary;
w.wfd = compfd;
w.nprocs = nprocs;
if (pthread_create(&writer_thr, NULL, writer_thread, (void *)(&w)) != 0) {
perror("Error in thread creation: ");
COMP_BAIL;
}
/*
* Write out file header. First insert hdr elements into mem buffer
* then write out the full hdr in one shot.
*/
memset(cread_buf, 0, ALGO_SZ);
strncpy(cread_buf, algo, ALGO_SZ);
version = htons(VERSION);
flags = htons(flags);
n_chunksize = htonll(chunksize);
level = htonl(level);
pos = cread_buf + ALGO_SZ;
memcpy(pos, &version, sizeof (version));
pos += sizeof (version);
memcpy(pos, &flags, sizeof (flags));
pos += sizeof (flags);
memcpy(pos, &n_chunksize, sizeof (n_chunksize));
pos += sizeof (n_chunksize);
memcpy(pos, &level, sizeof (level));
pos += sizeof (level);
if (Write(compfd, cread_buf, pos - cread_buf) != pos - cread_buf) {
perror("Write ");
COMP_BAIL;
}
/*
* Now read from the uncompressed file in 'chunksize' sized chunks, independently
* compress each chunk and write it out. Chunk sequencing is ensured.
*/
chunk_num = 0;
np = 0;
bail = 0;
largest_chunk = 0;
smallest_chunk = chunksize;
avg_chunk = 0;
rabin_count = 0;
/*
* Read the first chunk into a spare buffer (a simple double-buffering).
*/
if (enable_rabin_split) {
rctx = create_rabin_context(chunksize, 0, algo);
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx);
} else {
rbytes = Read(uncompfd, cread_buf, chunksize);
}
while (!bail) {
uchar_t *tmp;
if (main_cancel) break;
for (p = 0; p < nprocs; p++) {
np = p;
tdat = dary[p];
if (main_cancel) break;
/* Wait for previous chunk compression to complete. */
sem_wait(&tdat->write_done_sem);
if (main_cancel) break;
/*
* Once previous chunk is done swap already read buffer and
* it's size into the thread data.
* Normally it goes into uncompressed_chunk, because that's what it is.
* With dedup enabled however, we do some jugglery to save additional
* memory usage and avoid a memcpy, so it goes into the compressed_chunk
* area:
* cmp_seg -> dedup -> uncompressed_chunk -> compression -> cmp_seg
*/
tdat->id = chunk_num;
tdat->rbytes = rbytes;
if (enable_rabin_scan) {
tmp = tdat->cmp_seg;
tdat->cmp_seg = cread_buf;
cread_buf = tmp;
tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t);
/*
* If there is data after the last rabin boundary in the chunk, then
* rabin_count will be non-zero. We carry over the data to the beginning
* of the next chunk.
*/
if (rabin_count) {
memcpy(cread_buf,
tdat->cmp_seg + rabin_count, rbytes - rabin_count);
tdat->rbytes = rabin_count;
rabin_count = rbytes - rabin_count;
}
} else {
tmp = tdat->uncompressed_chunk;
tdat->uncompressed_chunk = cread_buf;
cread_buf = tmp;
}
if (rbytes < chunksize) {
if (rbytes < 0) {
bail = 1;
perror("Read: ");
COMP_BAIL;
} else if (tdat->rbytes == 0) { /* EOF */
bail = 1;
break;
}
}
/* Signal the compression thread to start */
sem_post(&tdat->start_sem);
chunk_num++;
/*
* Read the next buffer we want to process while previous
* buffer is in progress.
*/
if (enable_rabin_split) {
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx);
} else {
rbytes = Read(uncompfd, cread_buf, chunksize);
}
}
}
if (!main_cancel) {
/* Wait for all remaining chunks to finish. */
for (p = 0; p < nprocs; p++) {
if (p == np) continue;
tdat = dary[p];
sem_wait(&tdat->write_done_sem);
}
} else {
err = 1;
}
comp_done:
if (thread) {
for (i = 0; i < nprocs; i++) {
tdat = dary[i];
tdat->cancel = 1;
tdat->len_cmp = 0;
sem_post(&tdat->start_sem);
sem_post(&tdat->cmp_done_sem);
pthread_join(tdat->thr, NULL);
}
pthread_join(writer_thr, NULL);
}
if (err) {
if (compfd != -1 && !pipe_mode)
unlink(tmpfile1);
fprintf(stderr, "Error compressing file: %s\n", filename);
} else {
/*
* Write a trailer of zero chunk length.
*/
compressed_chunksize = 0;
if (Write(compfd, &compressed_chunksize,
sizeof (compressed_chunksize)) < 0) {
perror("Write ");
err = 1;
}
/*
* Rename the temporary file to the actual compressed file
* unless we are in a pipe.
*/
if (!pipe_mode) {
/*
* Ownership and mode of target should be same as original.
*/
fchmod(compfd, sbuf.st_mode);
if (fchown(compfd, sbuf.st_uid, sbuf.st_gid) == -1)
perror("chown ");
if (rename(tmpfile1, to_filename) == -1) {
perror("Cannot rename temporary file ");
unlink(tmpfile1);
}
}
}
if (dary != NULL) {
for (i = 0; i < nprocs; i++) {
slab_free(NULL, dary[i]->uncompressed_chunk);
slab_free(NULL, dary[i]->cmp_seg);
if (enable_rabin_scan) {
destroy_rabin_context(dary[i]->rctx);
}
if (_deinit_func)
_deinit_func(&(dary[i]->data));
slab_free(NULL, dary[i]);
}
slab_free(NULL, dary);
}
if (enable_rabin_split) destroy_rabin_context(rctx);
slab_free(NULL, cread_buf);
if (!pipe_mode) {
if (compfd != -1) close(compfd);
if (uncompfd != -1) close(uncompfd);
}
if (!hide_cmp_stats) show_compression_stats(chunksize);
_stats_func(!hide_cmp_stats);
slab_cleanup(hide_mem_stats);
}
/*
* Check the algorithm requested and set the callback routine pointers.
*/
static int
init_algo(const char *algo, int bail)
{
int rv = 1, i;
char algorithm[8];
/* Copy given string into known length buffer to avoid memcmp() overruns. */
strncpy(algorithm, algo, 8);
if (memcmp(algorithm, "zlib", 4) == 0) {
_compress_func = zlib_compress;
_decompress_func = zlib_decompress;
_init_func = zlib_init;
_deinit_func = NULL;
_stats_func = zlib_stats;
rv = 0;
} else if (memcmp(algorithm, "lzma", 4) == 0) {
_compress_func = lzma_compress;
_decompress_func = lzma_decompress;
_init_func = lzma_init;
_deinit_func = lzma_deinit;
_stats_func = lzma_stats;
rv = 0;
} else if (memcmp(algorithm, "bzip2", 5) == 0) {
_compress_func = bzip2_compress;
_decompress_func = bzip2_decompress;
_init_func = bzip2_init;
_deinit_func = NULL;
_stats_func = bzip2_stats;
rv = 0;
} else if (memcmp(algorithm, "ppmd", 4) == 0) {
_compress_func = ppmd_compress;
_decompress_func = ppmd_decompress;
_init_func = ppmd_init;
_deinit_func = ppmd_deinit;
_stats_func = ppmd_stats;
rv = 0;
/* adapt2 and adapt ordering of the checks matter here. */
} else if (memcmp(algorithm, "adapt2", 6) == 0) {
_compress_func = adapt_compress;
_decompress_func = adapt_decompress;
_init_func = adapt2_init;
_deinit_func = adapt_deinit;
_stats_func = adapt_stats;
adapt_mode = 1;
rv = 0;
} else if (memcmp(algorithm, "adapt", 5) == 0) {
_compress_func = adapt_compress;
_decompress_func = adapt_decompress;
_init_func = adapt_init;
_deinit_func = adapt_deinit;
_stats_func = adapt_stats;
adapt_mode = 1;
rv = 0;
}
return (rv);
}
int
main(int argc, char *argv[])
{
char *filename = NULL;
char *to_filename = NULL;
ssize_t chunksize = DEFAULT_CHUNKSIZE;
int opt, level, num_rem;
exec_name = get_execname(argv[0]);
level = 6;
slab_init();
while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDr")) != -1) {
int ovr;
switch (opt) {
case 'd':
do_uncompress = 1;
break;
case 'c':
do_compress = 1;
algo = optarg;
if (init_algo(algo, 1) != 0) {
err_exit(1, "Invalid algorithm %s\n", optarg);
}
break;
case 's':
ovr = parse_numeric(&chunksize, optarg);
if (ovr == 1)
err_exit(0, "Chunk size too large %s", optarg);
else if (ovr == 2)
err_exit(0, "Invalid number %s", optarg);
if (chunksize < MIN_CHUNK) {
err_exit(0, "Minimum chunk size is %ld\n", MIN_CHUNK);
}
break;
case 'l':
level = atoi(optarg);
if (level < 0 || level > 14)
err_exit(0, "Compression level should be in range 0 - 14\n");
break;
case 'p':
pipe_mode = 1;
break;
case 't':
nthreads = atoi(optarg);
if (nthreads < 1 || nthreads > 256)
err_exit(0, "Thread count should be in range 1 - 256\n");
break;
case 'M':
hide_mem_stats = 0;
break;
case 'C':
hide_cmp_stats = 0;
break;
case 'D':
enable_rabin_scan = 1;
break;
case 'r':
enable_rabin_split = 0;
break;
case '?':
default:
usage();
exit(1);
break;
}
}
if ((do_compress && do_uncompress) || (!do_compress && !do_uncompress)) {
usage();
exit(1);
}
/*
* Remaining mandatory arguments are the filenames.
*/
num_rem = argc - optind;
if (pipe_mode && num_rem > 0 ) {
fprintf(stderr, "Filename(s) unexpected for pipe mode\n");
usage();
exit(1);
}
if (enable_rabin_scan && !do_compress) {
fprintf(stderr, "Rabin Deduplication is only used during compression.\n");
usage();
exit(1);
}
if (!enable_rabin_scan)
enable_rabin_split = 0;
if (num_rem == 0 && !pipe_mode) {
usage(); /* At least 1 filename needed. */
exit(1);
} else if (num_rem == 1) {
if (do_compress) {
char apath[MAXPATHLEN];
if ((filename = realpath(argv[optind], NULL)) == NULL)
err_exit(1, "%s", argv[optind]);
/* Check if compressed file exists */
strcpy(apath, filename);
strcat(apath, COMP_EXTN);
if ((to_filename = realpath(apath, NULL)) != NULL) {
free(filename);
err_exit(0, "Compressed file %s exists\n", to_filename);
}
} else {
usage();
exit(1);
}
} else if (num_rem == 2) {
if (do_uncompress) {
if ((filename = realpath(argv[optind], NULL)) == NULL)
err_exit(1, "%s", argv[optind]);
optind++;
if ((to_filename = realpath(argv[optind], NULL)) != NULL) {
free(filename);
free(to_filename);
err_exit(0, "File %s exists\n", argv[optind]);
}
to_filename = argv[optind];
} else {
usage();
exit(1);
}
} else if (num_rem > 2) {
fprintf(stderr, "Too many filenames.\n");
usage();
exit(1);
}
main_cancel = 0;
/*
* Start the main routines.
*/
if (do_compress)
start_compress(filename, chunksize, level);
else if (do_uncompress)
start_decompress(filename, to_filename);
free(filename);
free((void *)exec_name);
return (0);
}