Archiving support using Libarchive: Work in progress changes #3.

Make log_msg() add newline by default.
This commit is contained in:
Moinak Ghosh 2013-10-24 00:16:04 +05:30
parent bc451aba36
commit 46b11def08
4 changed files with 224 additions and 96 deletions

View file

@ -33,7 +33,10 @@
#include <errno.h>
#include <limits.h>
#include <utils.h>
#include <pthread.h>
#include <sys/mman.h>
#include <archive.h>
#include <archive_entry.h>
#include "pc_archive.h"
#undef _FEATURES_H
@ -52,6 +55,10 @@ AE_IFIFO Named pipe (fifo)
*/
#define ARC_ENTRY_OVRHEAD 500
#define ARC_SCRATCH_BUFF_SIZE (64 *1024)
#define DATA_BLOCK_SIZE (8 * 1024)
#define MMAP_SIZE (1024 * 1024)
static struct arc_list_state {
uchar_t *pbuf;
uint64_t bufsiz, bufpos, arc_size;
@ -68,7 +75,6 @@ add_pathname(const char *fpath, const struct stat *sb,
{
short len;
uchar_t *buf;
struct hdr ehdr;
if (tflag == FTW_DP) return (0);
if (tflag == FTW_DNR || tflag == FTW_NS) {
@ -150,6 +156,8 @@ setup_archive(pc_ctx_t *pctx, struct stat *sbuf)
pthread_mutex_unlock(&nftw_mutex);
lseek(fd, 0, SEEK_SET);
free(pbuf);
sbuf->st_uid = geteuid();
sbuf->st_gid = getegid();
if (pipe(pipefd) == -1) {
log_msg(LOG_ERR, 1, "Unable to create archiver pipe.\n");
@ -174,19 +182,108 @@ setup_archive(pc_ctx_t *pctx, struct stat *sbuf)
return (0);
}
/*
* Routines to archive members and write the archive to pipe. Most of the following
* code is adapted from some of the Libarchive bsdtar code.
*/
static int
copy_file_data(pc_ctx_t *pctx, struct archive *arc,
struct archive *in_arc, struct archive_entry *entry)
{
size_t sz, offset, len;
ssize_t bytes_to_write;
uchar_t *mapbuf;
int rv, fd;
const char *fpath;
offset = 0;
rv = 0;
sz = archive_entry_size(entry);
bytes_to_write = sz;
fpath = archive_entry_sourcepath(entry);
fd = open(fpath, O_RDONLY);
if (fd == -1) {
log_msg(LOG_ERR, 1, "Failed to open %s.", fpath);
return (-1);
}
while (bytes_to_write > 0) {
uchar_t *src;
size_t wlen, w;
ssize_t wrtn;
if (bytes_to_write < MMAP_SIZE)
len = bytes_to_write;
else
len = MMAP_SIZE;
mapbuf = mmap(NULL, len, PROT_READ, MAP_SHARED, fd, offset);
offset += len;
src = mapbuf;
wlen = len;
/*
* Write data in blocks.
*/
while (wlen > 0) {
if (wlen < DATA_BLOCK_SIZE)
w = wlen;
else
w = DATA_BLOCK_SIZE;
wrtn = archive_write_data(arc, src, w);
if (wrtn < w) {
/* Write failed; this is bad */
log_msg(LOG_ERR, 0, "Data write error: %s", archive_error_string(arc));
rv = -1;
break;
}
bytes_to_write -= wrtn;
wlen -= wrtn;
}
if (rv == -1) break;
munmap(mapbuf, len);
}
close(fd);
return (rv);
}
static int
write_entry(pc_ctx_t *pctx, struct archive *arc, struct archive *in_arc,
struct archive_entry *entry)
{
int rv;
rv = archive_write_header(arc, entry);
if (rv != ARCHIVE_OK) {
if (rv == ARCHIVE_FATAL || rv == ARCHIVE_FAILED) {
log_msg(LOG_ERR, 0, "%s: %s",
archive_entry_sourcepath(entry), archive_error_string(arc));
return (-1);
} else {
log_msg(LOG_WARN, 0, "%s: %s",
archive_entry_sourcepath(entry), archive_error_string(arc));
}
}
if (archive_entry_size(entry) > 0) {
return (copy_file_data(pctx, arc, in_arc, entry));
}
return (0);
}
/*
* Thread function. Archive members and write to pipe. The dispatcher thread
* reads from the other end and compresses.
*/
static void *
archiver_thread(void *dat) {
archive_thread_func(void *dat) {
pc_ctx_t *pctx = (pc_ctx_t *)dat;
char fpath[PATH_MAX], *name;
ssize_t rbytes;
short namelen;
int warn;
struct stat sbuf;
struct archive_entry *entry;
struct archive_entry *entry, *spare_entry, *ent;
struct archive *arc, *ard;
struct archive_entry_linkresolver *resolver;
@ -208,14 +305,19 @@ archiver_thread(void *dat) {
* Read next path entry from list file.
*/
while ((rbytes = Read(pctx->archive_members_fd, &namelen, sizeof(namelen))) != 0) {
int fd;
if (rbytes < 2) break;
if (rbytes < 2) {
log_msg(LOG_ERR, 1, "Error reading archive members file.");
break;
}
rbytes = Read(pctx->archive_members_fd, fpath, namelen);
if (rbytes < namelen) break;
if (rbytes < namelen) {
log_msg(LOG_ERR, 1, "Error reading archive members file.");
break;
}
fpath[namelen] = '\0';
archive_entry_copy_sourcepath(entry, fpath);
if (archive_read_disk_entry_from_file(ard, entry, 0, NULL) != ARCHIVE_OK) {
log_msg(LOG_WARN, 0, "%s", archive_error_string(ard);
if (archive_read_disk_entry_from_file(ard, entry, -1, NULL) != ARCHIVE_OK) {
log_msg(LOG_WARN, 1, "archive_read_disk_entry_from_file:\n %s", archive_error_string(ard));
archive_entry_clear(entry);
continue;
}
@ -230,7 +332,7 @@ archiver_thread(void *dat) {
warn = 0;
}
if (name[1] == '.' && name[2] == '.' && (name[3] == '/' || name[3] == '\\')) {
name += 4; /* /.. is removed here and / is removed next. */
name += 3; /* /.. is removed here and / is removed next. */
} else {
name += 1;
}
@ -238,16 +340,33 @@ archiver_thread(void *dat) {
if (name != archive_entry_pathname(entry))
archive_entry_copy_pathname(entry, name);
if (archive_entry_filetype(entry) != AE_IFREG)
if (archive_entry_filetype(entry) != AE_IFREG) {
archive_entry_set_size(entry, 0);
archive_entry_linkify(bsdtar->resolver, &entry, &spare_entry);
archive_entry_write_header(arc, entry);
}
archive_entry_linkify(resolver, &entry, &spare_entry);
ent = entry;
while (ent != NULL) {
if (write_entry(pctx, arc, ard, ent) != 0) {
goto done;
}
ent = spare_entry;
spare_entry = NULL;
}
archive_entry_clear(entry);
}
done:
archive_entry_free(entry);
archive_entry_linkresolver_free(resolver);
archive_read_free(ard);
archive_write_free(arc);
close(pctx->archive_members_fd);
close(pctx->archive_data_fd);
unlink(pctx->archive_members_file);
return (NULL);
}
int
start_archiver(pc_ctx_t *pctx) {
return (0);
return (pthread_create(&(pctx->archive_thread), NULL, archive_thread_func, (void *)pctx));
}

View file

@ -171,18 +171,18 @@ usage(pc_ctx_t *pctx)
static void
show_compression_stats(pc_ctx_t *pctx)
{
log_msg(LOG_INFO, 0, "\nCompression Statistics\n");
log_msg(LOG_INFO, 0, "======================\n");
log_msg(LOG_INFO, 0, "Total chunks : %u\n", pctx->chunk_num);
log_msg(LOG_INFO, 0, "\nCompression Statistics");
log_msg(LOG_INFO, 0, "======================");
log_msg(LOG_INFO, 0, "Total chunks : %u", pctx->chunk_num);
if (pctx->chunk_num == 0) {
log_msg(LOG_INFO, 0, "No statistics to display.\n");
log_msg(LOG_INFO, 0, "No statistics to display.");
} else {
log_msg(LOG_INFO, 0, "Best compressed chunk : %s(%.2f%%)\n",
log_msg(LOG_INFO, 0, "Best compressed chunk : %s(%.2f%%)",
bytes_to_size(pctx->smallest_chunk), (double)pctx->smallest_chunk/(double)pctx->chunksize*100);
log_msg(LOG_INFO, 0, "Worst compressed chunk : %s(%.2f%%)\n",
log_msg(LOG_INFO, 0, "Worst compressed chunk : %s(%.2f%%)",
bytes_to_size(pctx->largest_chunk), (double)pctx->largest_chunk/(double)pctx->chunksize*100);
pctx->avg_chunk /= pctx->chunk_num;
log_msg(LOG_INFO, 0, "Avg compressed chunk : %s(%.2f%%)\n\n",
log_msg(LOG_INFO, 0, "Avg compressed chunk : %s(%.2f%%)\n",
bytes_to_size(pctx->avg_chunk), (double)pctx->avg_chunk/(double)pctx->chunksize*100);
}
}
@ -227,7 +227,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t
/*
* Execution won't come here but just in case ...
*/
log_msg(LOG_ERR, 0, "Invalid preprocessing mode\n");
log_msg(LOG_ERR, 0, "Invalid preprocessing mode");
return (-1);
}
@ -315,14 +315,14 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64
result = lzp_decompress((const uchar_t *)src, (uchar_t *)dst, srclen,
hashsize, LZP_DEFAULT_LZPMINLEN, 0);
if (result < 0) {
log_msg(LOG_ERR, 0, "LZP decompression failed.\n");
log_msg(LOG_ERR, 0, "LZP decompression failed.");
return (-1);
}
*dstlen = result;
}
if (!(type & (PREPROC_COMPRESSED | PREPROC_TYPE_DELTA2 | PREPROC_TYPE_LZP)) && type > 0) {
log_msg(LOG_ERR, 0, "Invalid preprocessing flags: %d\n", type);
log_msg(LOG_ERR, 0, "Invalid preprocessing flags: %d", type);
return (-1);
}
return (0);
@ -402,7 +402,7 @@ redo:
/*
* HMAC verification failure is fatal.
*/
log_msg(LOG_ERR, 0, "Chunk %d, HMAC verification failed\n", tdat->id);
log_msg(LOG_ERR, 0, "Chunk %d, HMAC verification failed", tdat->id);
pctx->main_cancel = 1;
tdat->len_cmp = 0;
pctx->t_errored = 1;
@ -410,7 +410,7 @@ redo:
return (NULL);
}
DEBUG_STAT_EN(en = get_wtime_millis());
DEBUG_STAT_EN(fprintf(stderr, "HMAC Verification speed %.3f MB/s\n",
DEBUG_STAT_EN(fprintf(stderr, "HMAC Verification speed %.3f MB/s",
get_mb_s(tdat->rbytes + sizeof (tdat->len_cmp_be), strt, en)));
/*
@ -452,7 +452,7 @@ redo:
/*
* Header CRC32 verification failure is fatal.
*/
log_msg(LOG_ERR, 0, "Chunk %d, Header CRC verification failed\n", tdat->id);
log_msg(LOG_ERR, 0, "Chunk %d, Header CRC verification failed", tdat->id);
pctx->main_cancel = 1;
tdat->len_cmp = 0;
pctx->t_errored = 1;
@ -500,7 +500,7 @@ redo:
}
if (rv == -1) {
tdat->len_cmp = 0;
log_msg(LOG_ERR, 0, "ERROR: Chunk %d, decompression failed.\n", tdat->id);
log_msg(LOG_ERR, 0, "ERROR: Chunk %d, decompression failed.", tdat->id);
pctx->t_errored = 1;
goto cont;
}
@ -550,7 +550,7 @@ redo:
if (rv == -1) {
tdat->len_cmp = 0;
log_msg(LOG_ERR, 0, "ERROR: Chunk %d, decompression failed.\n", tdat->id);
log_msg(LOG_ERR, 0, "ERROR: Chunk %d, decompression failed.", tdat->id);
pctx->t_errored = 1;
goto cont;
}
@ -564,7 +564,7 @@ redo:
rctx->cbuf = tdat->compressed_chunk;
dedupe_decompress(rctx, tdat->uncompressed_chunk, &(tdat->len_cmp));
if (!rctx->valid) {
log_msg(LOG_ERR, 0, "ERROR: Chunk %d, dedup recovery failed.\n", tdat->id);
log_msg(LOG_ERR, 0, "ERROR: Chunk %d, dedup recovery failed.", tdat->id);
rv = -1;
tdat->len_cmp = 0;
pctx->t_errored = 1;
@ -596,7 +596,7 @@ redo:
compute_checksum(checksum, pctx->cksum, tdat->uncompressed_chunk, _chunksize, tdat->cksum_mt, 1);
if (memcmp(checksum, tdat->checksum, pctx->cksum_bytes) != 0) {
tdat->len_cmp = 0;
log_msg(LOG_ERR, 0, "ERROR: Chunk %d, checksums do not match.\n", tdat->id);
log_msg(LOG_ERR, 0, "ERROR: Chunk %d, checksums do not match.", tdat->id);
pctx->t_errored = 1;
}
}
@ -717,9 +717,9 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
}
if (init_algo(pctx, algorithm, 0) != 0) {
if (pctx->pipe_mode || filename == NULL)
log_msg(LOG_ERR, 0, "Input stream is not pcompressed.\n");
log_msg(LOG_ERR, 0, "Input stream is not pcompressed.");
else
log_msg(LOG_ERR, 0, "%s is not a pcompressed file.\n", filename);
log_msg(LOG_ERR, 0, "%s is not a pcompressed file.", filename);
UNCOMP_BAIL;
}
pctx->algo = algorithm;
@ -741,23 +741,23 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
* Check for ridiculous values (malicious tampering or otherwise).
*/
if (version > VERSION) {
log_msg(LOG_ERR, 0, "Cannot handle newer archive version %d, capability %d\n",
log_msg(LOG_ERR, 0, "Cannot handle newer archive version %d, capability %d",
version, VERSION);
err = 1;
goto uncomp_done;
}
if (chunksize > EIGHTY_PCT(get_total_ram())) {
log_msg(LOG_ERR, 0, "Chunk size must not exceed 80%% of total RAM.\n");
log_msg(LOG_ERR, 0, "Chunk size must not exceed 80%% of total RAM.");
err = 1;
goto uncomp_done;
}
if (level > MAX_LEVEL || level < 0) {
log_msg(LOG_ERR, 0, "Invalid compression level in header: %d\n", level);
log_msg(LOG_ERR, 0, "Invalid compression level in header: %d", level);
err = 1;
goto uncomp_done;
}
if (version < VERSION-3) {
log_msg(LOG_ERR, 0, "Unsupported version: %d\n", version);
log_msg(LOG_ERR, 0, "Unsupported version: %d", version);
err = 1;
goto uncomp_done;
}
@ -780,14 +780,14 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
if (flags & FLAG_DEDUP_FIXED) {
if (version > 7) {
if (pctx->pipe_mode) {
log_msg(LOG_ERR, 0, "Global Deduplication is not supported with pipe mode.\n");
log_msg(LOG_ERR, 0, "Global Deduplication is not supported with pipe mode.");
err = 1;
goto uncomp_done;
}
pctx->enable_rabin_global = 1;
dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL;
} else {
log_msg(LOG_ERR, 0, "Invalid file deduplication flags.\n");
log_msg(LOG_ERR, 0, "Invalid file deduplication flags.");
err = 1;
goto uncomp_done;
}
@ -812,7 +812,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
if (pctx->cksum == CKSUM_BLAKE512) pctx->cksum = CKSUM_SKEIN512;
}
if (get_checksum_props(NULL, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 1) == -1) {
log_msg(LOG_ERR, 0, "Invalid checksum algorithm code: %d. File corrupt ?\n", pctx->cksum);
log_msg(LOG_ERR, 0, "Invalid checksum algorithm code: %d. File corrupt ?", pctx->cksum);
UNCOMP_BAIL;
}
@ -854,7 +854,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
} else if (pctx->encrypt_type == CRYPTO_ALG_SALSA20) {
noncelen = XSALSA20_CRYPTO_NONCEBYTES;
} else {
log_msg(LOG_ERR, 0, "Invalid Encryption algorithm code: %d. File corrupt ?\n",
log_msg(LOG_ERR, 0, "Invalid Encryption algorithm code: %d. File corrupt ?",
pctx->encrypt_type);
UNCOMP_BAIL;
}
@ -918,7 +918,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
free(salt2);
memset(salt1, 0, saltlen);
free(salt1);
log_msg(LOG_ERR, 0, "Failed to get password.\n");
log_msg(LOG_ERR, 0, "Failed to get password.");
UNCOMP_BAIL;
}
} else if (!pctx->user_pw) {
@ -956,7 +956,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
memset(salt1, 0, saltlen);
free(salt1);
close(uncompfd); unlink(to_filename);
log_msg(LOG_ERR, 0, "Failed to get password.\n");
log_msg(LOG_ERR, 0, "Failed to get password.");
UNCOMP_BAIL;
}
close(fd);
@ -971,7 +971,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
free(salt1);
memset(pctx->user_pw, 0, pctx->user_pw_len);
close(uncompfd); unlink(to_filename);
log_msg(LOG_ERR, 0, "Failed to initialize crypto\n");
log_msg(LOG_ERR, 0, "Failed to initialize crypto");
UNCOMP_BAIL;
}
memset(pctx->user_pw, 0, pctx->user_pw_len);
@ -986,7 +986,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
free(salt1);
memset(pw, 0, MAX_PW_LEN);
close(uncompfd); unlink(to_filename);
log_msg(LOG_ERR, 0, "Failed to initialize crypto\n");
log_msg(LOG_ERR, 0, "Failed to initialize crypto");
UNCOMP_BAIL;
}
memset(pw, 0, MAX_PW_LEN);
@ -1000,7 +1000,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
*/
if (hmac_init(&hdr_mac, pctx->cksum, &(pctx->crypto_ctx)) == -1) {
close(uncompfd); unlink(to_filename);
log_msg(LOG_ERR, 0, "Cannot initialize header hmac.\n");
log_msg(LOG_ERR, 0, "Cannot initialize header hmac.");
UNCOMP_BAIL;
}
hmac_update(&hdr_mac, (uchar_t *)pctx->algo, ALGO_SZ);
@ -1027,7 +1027,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
memset(n1, 0, noncelen);
if (memcmp(hdr_hash2, hdr_hash1, pctx->mac_bytes) != 0) {
close(uncompfd); unlink(to_filename);
log_msg(LOG_ERR, 0, "Header verification failed! File tampered or wrong password.\n");
log_msg(LOG_ERR, 0, "Header verification failed! File tampered or wrong password.");
UNCOMP_BAIL;
}
} else if (version >= 5) {
@ -1057,7 +1057,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
crc2 = lzma_crc32((uchar_t *)&d2, sizeof (level), crc2);
if (crc1 != crc2) {
close(uncompfd); unlink(to_filename);
log_msg(LOG_ERR, 0, "Header verification failed! File tampered or wrong password.\n");
log_msg(LOG_ERR, 0, "Header verification failed! File tampered or wrong password.");
UNCOMP_BAIL;
}
}
@ -1073,7 +1073,6 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
pctx->nthreads = 1;
log_msg(LOG_INFO, 0, "Scaling to %d thread", pctx->nthreads * props.nthreads);
if (pctx->nthreads * props.nthreads > 1) log_msg(LOG_INFO, 0, "s");
log_msg(LOG_INFO, 0, "\n");
nprocs = pctx->nthreads;
slab_cache_add(compressed_chunksize);
slab_cache_add(chunksize);
@ -1083,7 +1082,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
for (i = 0; i < nprocs; i++) {
dary[i] = (struct cmp_data *)slab_alloc(NULL, sizeof (struct cmp_data));
if (!dary[i]) {
log_msg(LOG_ERR, 0, "1: Out of memory\n");
log_msg(LOG_ERR, 0, "1: Out of memory");
UNCOMP_BAIL;
}
tdat = dary[i];
@ -1137,7 +1136,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
if (pctx->encrypt_type) {
if (hmac_init(&tdat->chunk_hmac, pctx->cksum, &(pctx->crypto_ctx)) == -1) {
log_msg(LOG_ERR, 0, "Cannot initialize chunk hmac.\n");
log_msg(LOG_ERR, 0, "Cannot initialize chunk hmac.");
UNCOMP_BAIL;
}
}
@ -1202,7 +1201,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
if (rb < 0) log_msg(LOG_ERR, 1, "Read: ");
else
log_msg(LOG_ERR, 0, "Incomplete chunk %d header,"
"file corrupt\n", pctx->chunk_num);
"file corrupt", pctx->chunk_num);
UNCOMP_BAIL;
}
tdat->len_cmp_be = tdat->len_cmp; // Needed for HMAC
@ -1212,7 +1211,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
* Check for ridiculous length.
*/
if (tdat->len_cmp > chunksize + 256) {
log_msg(LOG_ERR, 0, "Compressed length too big for chunk: %d\n",
log_msg(LOG_ERR, 0, "Compressed length too big for chunk: %d",
pctx->chunk_num);
UNCOMP_BAIL;
}
@ -1242,7 +1241,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL,
chunksize);
if (!tdat->compressed_chunk || !tdat->uncompressed_chunk) {
log_msg(LOG_ERR, 0, "2: Out of memory\n");
log_msg(LOG_ERR, 0, "2: Out of memory");
UNCOMP_BAIL;
}
tdat->cmp_seg = tdat->uncompressed_chunk;
@ -1265,7 +1264,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
log_msg(LOG_ERR, 1, "Read: ");
UNCOMP_BAIL;
} else {
log_msg(LOG_ERR, 0, "Incomplete chunk %d, file corrupt.\n",
log_msg(LOG_ERR, 0, "Incomplete chunk %d, file corrupt.",
pctx->chunk_num);
UNCOMP_BAIL;
}
@ -1694,7 +1693,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
pw_len = get_pw_string(pw,
"Please enter encryption password", 1);
if (pw_len == -1) {
log_msg(LOG_ERR, 0, "Failed to get password.\n");
log_msg(LOG_ERR, 0, "Failed to get password.");
return (1);
}
} else if (!pctx->user_pw) {
@ -1724,7 +1723,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
}
}
if (pw_len == -1) {
log_msg(LOG_ERR, 1, "Failed to get password.\n");
log_msg(LOG_ERR, 1, "Failed to get password.");
return (1);
}
close(fd);
@ -1733,7 +1732,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
if (init_crypto(&(pctx->crypto_ctx), pctx->user_pw, pctx->user_pw_len, pctx->encrypt_type,
NULL, 0, pctx->keylen, 0, ENCRYPT_FLAG) == -1) {
memset(pctx->user_pw, 0, pctx->user_pw_len);
log_msg(LOG_ERR, 0, "Failed to initialize crypto\n");
log_msg(LOG_ERR, 0, "Failed to initialize crypto.");
return (1);
}
memset(pctx->user_pw, 0, pctx->user_pw_len);
@ -1743,7 +1742,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
if (init_crypto(&(pctx->crypto_ctx), pw, pw_len, pctx->encrypt_type, NULL,
0, pctx->keylen, 0, ENCRYPT_FLAG) == -1) {
memset(pw, 0, MAX_PW_LEN);
log_msg(LOG_ERR, 0, "Failed to initialize crypto\n");
log_msg(LOG_ERR, 0, "Failed to initialize crypto.");
return (1);
}
memset(pw, 0, MAX_PW_LEN);
@ -1776,7 +1775,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
if (!S_ISREG(sbuf.st_mode)) {
close(uncompfd);
log_msg(LOG_ERR, 0, "File %s is not a regular file.\n", filename);
log_msg(LOG_ERR, 0, "File %s is not a regular file.", filename);
return (1);
}
@ -1786,7 +1785,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
}
} else {
if (setup_archive(pctx, &sbuf) == -1) {
log_msg(LOG_ERR, 0, "Setup archive failed for %s\n", pctx->filename);
log_msg(LOG_ERR, 0, "Setup archive failed for %s", pctx->filename);
return (1);
}
@ -1795,7 +1794,6 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
* the rest of the compression stuff.
*/
uncompfd = pctx->uncompfd;
exit(0);
}
/*
@ -1958,21 +1956,20 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
log_msg(LOG_INFO, 0, "Scaling to %d thread", pctx->nthreads * props.nthreads);
if (pctx->nthreads * props.nthreads > 1) log_msg(LOG_INFO, 0, "s");
nprocs = pctx->nthreads;
log_msg(LOG_INFO, 0, "\n");
dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *));
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan))
cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize);
else
cread_buf = (uchar_t *)slab_alloc(NULL, chunksize);
if (!cread_buf) {
log_msg(LOG_ERR, 0, "3: Out of memory\n");
log_msg(LOG_ERR, 0, "3: Out of memory");
COMP_BAIL;
}
for (i = 0; i < nprocs; i++) {
dary[i] = (struct cmp_data *)slab_alloc(NULL, sizeof (struct cmp_data));
if (!dary[i]) {
log_msg(LOG_ERR, 0, "4: Out of memory\n");
log_msg(LOG_ERR, 0, "4: Out of memory");
COMP_BAIL;
}
tdat = dary[i];
@ -2006,7 +2003,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
if (pctx->encrypt_type) {
if (hmac_init(&tdat->chunk_hmac, pctx->cksum, &(pctx->crypto_ctx)) == -1) {
log_msg(LOG_ERR, 0, "Cannot initialize chunk hmac.\n");
log_msg(LOG_ERR, 0, "Cannot initialize chunk hmac.");
COMP_BAIL;
}
}
@ -2119,7 +2116,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
unsigned int hlen;
if (hmac_init(&hdr_mac, pctx->cksum, &(pctx->crypto_ctx)) == -1) {
log_msg(LOG_ERR, 0, "Cannot initialize header hmac.\n");
log_msg(LOG_ERR, 0, "Cannot initialize header hmac.");
COMP_BAIL;
}
hmac_update(&hdr_mac, cread_buf, pos - cread_buf);
@ -2213,7 +2210,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
tdat->compressed_chunk = tdat->cmp_seg + COMPRESSED_CHUNKSZ +
pctx->cksum_bytes + pctx->mac_bytes;
if (!tdat->cmp_seg || !tdat->uncompressed_chunk) {
log_msg(LOG_ERR, 0, "5: Out of memory\n");
log_msg(LOG_ERR, 0, "5: Out of memory");
COMP_BAIL;
}
}
@ -2295,6 +2292,14 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
}
comp_done:
/*
* First close the input fd of uncompressed data. If archiving this will cause
* the archive thread to exit and cleanup.
*/
if (!pctx->pipe_mode) {
if (uncompfd != -1) close(uncompfd);
}
if (pctx->t_errored) err = pctx->t_errored;
if (thread) {
for (i = 0; i < nprocs; i++) {
@ -2317,9 +2322,9 @@ comp_done:
rm_fname(tmpfile1);
}
if (filename)
log_msg(LOG_ERR, 0, "Error compressing file: %s\n", filename);
log_msg(LOG_ERR, 0, "Error compressing file: %s", filename);
else
log_msg(LOG_ERR, 0, "Error compressing\n");
log_msg(LOG_ERR, 0, "Error compressing");
} else {
/*
* Write a trailer of zero chunk length.
@ -2376,9 +2381,9 @@ comp_done:
slab_free(NULL, cread_buf);
if (!pctx->pipe_mode) {
if (compfd != -1) close(compfd);
if (uncompfd != -1) close(uncompfd);
}
if (pctx->archive_mode) pthread_join(pctx->archive_thread, NULL);
if (!pctx->hide_cmp_stats) show_compression_stats(pctx);
pctx->_stats_func(!pctx->hide_cmp_stats);
@ -2586,7 +2591,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
pctx->do_compress = 1;
pctx->algo = optarg;
if (init_algo(pctx, pctx->algo, 1) != 0) {
log_msg(LOG_ERR, 0, "Invalid algorithm %s\n", optarg);
log_msg(LOG_ERR, 0, "Invalid algorithm %s", optarg);
return (1);
}
break;
@ -2594,21 +2599,21 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
case 's':
ovr = parse_numeric(&chunksize, optarg);
if (ovr == 1) {
log_msg(LOG_ERR, 0, "Chunk size too large %s\n", optarg);
log_msg(LOG_ERR, 0, "Chunk size too large %s", optarg);
return (1);
} else if (ovr == 2) {
log_msg(LOG_ERR, 0, "Invalid number %s\n", optarg);
log_msg(LOG_ERR, 0, "Invalid number %s", optarg);
return (1);
}
pctx->chunksize = chunksize;
if (pctx->chunksize < MIN_CHUNK) {
log_msg(LOG_ERR, 0, "Minimum chunk size is %ld\n", MIN_CHUNK);
log_msg(LOG_ERR, 0, "Minimum chunk size is %ld", MIN_CHUNK);
return (1);
}
if (pctx->chunksize > EIGHTY_PCT(get_total_ram())) {
log_msg(LOG_ERR, 0, "Chunk size must not exceed 80%% of total RAM.\n");
log_msg(LOG_ERR, 0, "Chunk size must not exceed 80%% of total RAM.");
return (1);
}
break;
@ -2616,7 +2621,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
case 'l':
pctx->level = atoi(optarg);
if (pctx->level < 0 || pctx->level > MAX_LEVEL) {
log_msg(LOG_ERR, 0, "Compression level should be in range 0 - 14\n");
log_msg(LOG_ERR, 0, "Compression level should be in range 0 - 14");
return (1);
}
break;
@ -2624,7 +2629,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
case 'B':
pctx->rab_blk_size = atoi(optarg);
if (pctx->rab_blk_size < 0 || pctx->rab_blk_size > 5) {
log_msg(LOG_ERR, 0, "Average Dedupe block size must be in range 0 (2k), 1 (4k) .. 5 (64k)\n");
log_msg(LOG_ERR, 0, "Average Dedupe block size must be in range 0 (2k), 1 (4k) .. 5 (64k)");
return (1);
}
break;
@ -2636,7 +2641,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
case 't':
pctx->nthreads = atoi(optarg);
if (pctx->nthreads < 1 || pctx->nthreads > 256) {
log_msg(LOG_ERR, 0, "Thread count should be in range 1 - 256\n");
log_msg(LOG_ERR, 0, "Thread count should be in range 1 - 256");
return (1);
}
break;
@ -2668,7 +2673,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
case 'e':
pctx->encrypt_type = get_crypto_alg(optarg);
if (pctx->encrypt_type == 0) {
log_msg(LOG_ERR, 0, "Invalid encryption algorithm. Should be AES or SALSA20.\n", optarg);
log_msg(LOG_ERR, 0, "Invalid encryption algorithm. Should be AES or SALSA20.", optarg);
return (1);
}
break;
@ -2693,7 +2698,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
case 'k':
pctx->keylen = atoi(optarg);
if ((pctx->keylen != 16 && pctx->keylen != 32) || pctx->keylen > MAX_KEYLEN) {
log_msg(LOG_ERR, 0, "Encryption KEY length should be 16 or 32.\n", optarg);
log_msg(LOG_ERR, 0, "Encryption KEY length should be 16 or 32.", optarg);
return (1);
}
break;
@ -2701,7 +2706,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
case 'S':
if (get_checksum_props(optarg, &(pctx->cksum), &(pctx->cksum_bytes),
&(pctx->mac_bytes), 0) == -1) {
log_msg(LOG_ERR, 0, "Invalid checksum type %s\n", optarg);
log_msg(LOG_ERR, 0, "Invalid checksum type %s", optarg);
return (1);
}
break;
@ -2740,28 +2745,30 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
*/
num_rem = argc - my_optind;
if (pctx->pipe_mode && num_rem > 0 ) {
log_msg(LOG_ERR, 0, "Filename(s) unexpected for pipe mode\n");
log_msg(LOG_ERR, 0, "Filename(s) unexpected for pipe mode");
return (1);
}
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan) && !pctx->do_compress) {
log_msg(LOG_ERR, 0, "Deduplication is only used during compression.\n");
log_msg(LOG_ERR, 0, "Deduplication is only used during compression.");
return (1);
}
if (!pctx->enable_rabin_scan)
pctx->enable_rabin_split = 0;
if (pctx->enable_fixed_scan && (pctx->enable_rabin_scan || pctx->enable_delta_encode || pctx->enable_rabin_split)) {
log_msg(LOG_ERR, 0, "Rabin Deduplication and Fixed block Deduplication are mutually exclusive\n");
if (pctx->enable_fixed_scan && (pctx->enable_rabin_scan ||
pctx->enable_delta_encode || pctx->enable_rabin_split)) {
log_msg(LOG_ERR, 0, "Rabin Deduplication and Fixed block Deduplication"
"are mutually exclusive");
return (1);
}
if (!pctx->do_compress && pctx->encrypt_type) {
log_msg(LOG_ERR, 0, "Encryption only makes sense when compressing!\n");
log_msg(LOG_ERR, 0, "Encryption only makes sense when compressing!");
return (1);
} else if (pctx->pipe_mode && pctx->encrypt_type && !pctx->pwd_file) {
log_msg(LOG_ERR, 0, "Pipe mode requires password to be provided in a file.\n");
log_msg(LOG_ERR, 0, "Pipe mode requires password to be provided in a file.");
return (1);
}
@ -2775,12 +2782,12 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
}
if (pctx->enable_rabin_global && pctx->enable_delta_encode) {
log_msg(LOG_ERR, 0, "Global Deduplication does not support Delta Compression.\n");
log_msg(LOG_ERR, 0, "Global Deduplication does not support Delta Compression.");
return (1);
}
if (num_rem == 0 && !pctx->pipe_mode) {
log_msg(LOG_ERR, 0, "Expected at least one filename.\n");
log_msg(LOG_ERR, 0, "Expected at least one filename.");
return (1);
} else if (num_rem == 1 || num_rem == 2) {
@ -2805,7 +2812,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
/* Check if compressed file exists */
if (pctx->to_filename != NULL) {
log_msg(LOG_ERR, 0, "Compressed file %s exists\n", pctx->to_filename);
log_msg(LOG_ERR, 0, "Compressed file %s exists", pctx->to_filename);
free((void *)(pctx->to_filename));
return (1);
}
@ -2818,7 +2825,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
/* Check if compressed file exists */
if (pctx->to_filename != NULL) {
log_msg(LOG_ERR, 0, "Compressed file %s exists\n", pctx->to_filename);
log_msg(LOG_ERR, 0, "Compressed file %s exists", pctx->to_filename);
free((void *)(pctx->to_filename));
return (1);
}
@ -2838,7 +2845,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
my_optind++;
if ((pctx->to_filename = realpath(argv[my_optind], NULL)) != NULL) {
free((void *)(pctx->to_filename));
log_msg(LOG_ERR, 0, "File %s exists\n", argv[my_optind]);
log_msg(LOG_ERR, 0, "File %s exists", argv[my_optind]);
return (1);
}
pctx->to_filename = argv[my_optind];
@ -2846,7 +2853,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
return (1);
}
} else if (num_rem > 2) {
log_msg(LOG_ERR, 0, "Too many filenames.\n");
log_msg(LOG_ERR, 0, "Too many filenames.");
return (1);
}
pctx->main_cancel = 0;
@ -2855,7 +2862,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
get_checksum_props(DEFAULT_CKSUM, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 0);
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan) && pctx->cksum == CKSUM_CRC64) {
log_msg(LOG_ERR, 0, "CRC64 checksum is not suitable for Deduplication.\n");
log_msg(LOG_ERR, 0, "CRC64 checksum is not suitable for Deduplication.");
return (1);
}

View file

@ -201,6 +201,7 @@ typedef struct pc_ctx {
char archive_members_file[MAXPATHLEN];
int archive_members_fd, archive_data_fd;
void *archive_ctx;
pthread_t archive_thread;
int uncompfd, compfd;
unsigned int chunk_num;
uint64_t largest_chunk, smallest_chunk, avg_chunk;

View file

@ -458,6 +458,7 @@ log_msg(log_level_t log_level, int show_errno, const char *format, ...)
written = vsnprintf(msg, 1024, format, args);
va_end(args);
written += snprintf(msg + written, 1024 - written, "\n");
if (written < 1024 && show_errno) {
snprintf(msg + written, 1024 - written, "\nError: %s\n", strerror(err));
}