From 46b11def08a3a6ea1c28df3f0cf70fa81cdc7006 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Thu, 24 Oct 2013 00:16:04 +0530 Subject: [PATCH] Archiving support using Libarchive: Work in progress changes #3. Make log_msg() add newline by default. --- archive/pc_archive.c | 149 ++++++++++++++++++++++++++++++++++---- pcompress.c | 169 ++++++++++++++++++++++--------------------- pcompress.h | 1 + utils/utils.c | 1 + 4 files changed, 224 insertions(+), 96 deletions(-) diff --git a/archive/pc_archive.c b/archive/pc_archive.c index e754b34..87920b2 100644 --- a/archive/pc_archive.c +++ b/archive/pc_archive.c @@ -33,7 +33,10 @@ #include #include #include +#include +#include #include +#include #include "pc_archive.h" #undef _FEATURES_H @@ -52,6 +55,10 @@ AE_IFIFO Named pipe (fifo) */ #define ARC_ENTRY_OVRHEAD 500 +#define ARC_SCRATCH_BUFF_SIZE (64 *1024) +#define DATA_BLOCK_SIZE (8 * 1024) +#define MMAP_SIZE (1024 * 1024) + static struct arc_list_state { uchar_t *pbuf; uint64_t bufsiz, bufpos, arc_size; @@ -68,7 +75,6 @@ add_pathname(const char *fpath, const struct stat *sb, { short len; uchar_t *buf; - struct hdr ehdr; if (tflag == FTW_DP) return (0); if (tflag == FTW_DNR || tflag == FTW_NS) { @@ -150,6 +156,8 @@ setup_archive(pc_ctx_t *pctx, struct stat *sbuf) pthread_mutex_unlock(&nftw_mutex); lseek(fd, 0, SEEK_SET); free(pbuf); + sbuf->st_uid = geteuid(); + sbuf->st_gid = getegid(); if (pipe(pipefd) == -1) { log_msg(LOG_ERR, 1, "Unable to create archiver pipe.\n"); @@ -174,19 +182,108 @@ setup_archive(pc_ctx_t *pctx, struct stat *sbuf) return (0); } +/* + * Routines to archive members and write the archive to pipe. Most of the following + * code is adapted from some of the Libarchive bsdtar code. + */ +static int +copy_file_data(pc_ctx_t *pctx, struct archive *arc, + struct archive *in_arc, struct archive_entry *entry) +{ + size_t sz, offset, len; + ssize_t bytes_to_write; + uchar_t *mapbuf; + int rv, fd; + const char *fpath; + + offset = 0; + rv = 0; + sz = archive_entry_size(entry); + bytes_to_write = sz; + fpath = archive_entry_sourcepath(entry); + fd = open(fpath, O_RDONLY); + if (fd == -1) { + log_msg(LOG_ERR, 1, "Failed to open %s.", fpath); + return (-1); + } + + while (bytes_to_write > 0) { + uchar_t *src; + size_t wlen, w; + ssize_t wrtn; + + if (bytes_to_write < MMAP_SIZE) + len = bytes_to_write; + else + len = MMAP_SIZE; + mapbuf = mmap(NULL, len, PROT_READ, MAP_SHARED, fd, offset); + offset += len; + src = mapbuf; + wlen = len; + + /* + * Write data in blocks. + */ + while (wlen > 0) { + if (wlen < DATA_BLOCK_SIZE) + w = wlen; + else + w = DATA_BLOCK_SIZE; + wrtn = archive_write_data(arc, src, w); + if (wrtn < w) { + /* Write failed; this is bad */ + log_msg(LOG_ERR, 0, "Data write error: %s", archive_error_string(arc)); + rv = -1; + break; + } + bytes_to_write -= wrtn; + wlen -= wrtn; + } + if (rv == -1) break; + munmap(mapbuf, len); + } + close(fd); + + return (rv); +} + +static int +write_entry(pc_ctx_t *pctx, struct archive *arc, struct archive *in_arc, + struct archive_entry *entry) +{ + int rv; + + rv = archive_write_header(arc, entry); + if (rv != ARCHIVE_OK) { + if (rv == ARCHIVE_FATAL || rv == ARCHIVE_FAILED) { + log_msg(LOG_ERR, 0, "%s: %s", + archive_entry_sourcepath(entry), archive_error_string(arc)); + return (-1); + } else { + log_msg(LOG_WARN, 0, "%s: %s", + archive_entry_sourcepath(entry), archive_error_string(arc)); + } + } + + if (archive_entry_size(entry) > 0) { + return (copy_file_data(pctx, arc, in_arc, entry)); + } + + return (0); +} + /* * Thread function. Archive members and write to pipe. The dispatcher thread * reads from the other end and compresses. */ static void * -archiver_thread(void *dat) { +archive_thread_func(void *dat) { pc_ctx_t *pctx = (pc_ctx_t *)dat; char fpath[PATH_MAX], *name; ssize_t rbytes; short namelen; int warn; - struct stat sbuf; - struct archive_entry *entry; + struct archive_entry *entry, *spare_entry, *ent; struct archive *arc, *ard; struct archive_entry_linkresolver *resolver; @@ -208,14 +305,19 @@ archiver_thread(void *dat) { * Read next path entry from list file. */ while ((rbytes = Read(pctx->archive_members_fd, &namelen, sizeof(namelen))) != 0) { - int fd; - - if (rbytes < 2) break; + if (rbytes < 2) { + log_msg(LOG_ERR, 1, "Error reading archive members file."); + break; + } rbytes = Read(pctx->archive_members_fd, fpath, namelen); - if (rbytes < namelen) break; + if (rbytes < namelen) { + log_msg(LOG_ERR, 1, "Error reading archive members file."); + break; + } + fpath[namelen] = '\0'; archive_entry_copy_sourcepath(entry, fpath); - if (archive_read_disk_entry_from_file(ard, entry, 0, NULL) != ARCHIVE_OK) { - log_msg(LOG_WARN, 0, "%s", archive_error_string(ard); + if (archive_read_disk_entry_from_file(ard, entry, -1, NULL) != ARCHIVE_OK) { + log_msg(LOG_WARN, 1, "archive_read_disk_entry_from_file:\n %s", archive_error_string(ard)); archive_entry_clear(entry); continue; } @@ -230,7 +332,7 @@ archiver_thread(void *dat) { warn = 0; } if (name[1] == '.' && name[2] == '.' && (name[3] == '/' || name[3] == '\\')) { - name += 4; /* /.. is removed here and / is removed next. */ + name += 3; /* /.. is removed here and / is removed next. */ } else { name += 1; } @@ -238,16 +340,33 @@ archiver_thread(void *dat) { if (name != archive_entry_pathname(entry)) archive_entry_copy_pathname(entry, name); - if (archive_entry_filetype(entry) != AE_IFREG) + if (archive_entry_filetype(entry) != AE_IFREG) { archive_entry_set_size(entry, 0); - archive_entry_linkify(bsdtar->resolver, &entry, &spare_entry); - archive_entry_write_header(arc, entry); + } + archive_entry_linkify(resolver, &entry, &spare_entry); + ent = entry; + while (ent != NULL) { + if (write_entry(pctx, arc, ard, ent) != 0) { + goto done; + } + ent = spare_entry; + spare_entry = NULL; + } archive_entry_clear(entry); } + +done: + archive_entry_free(entry); + archive_entry_linkresolver_free(resolver); + archive_read_free(ard); + archive_write_free(arc); + close(pctx->archive_members_fd); + close(pctx->archive_data_fd); + unlink(pctx->archive_members_file); return (NULL); } int start_archiver(pc_ctx_t *pctx) { - return (0); + return (pthread_create(&(pctx->archive_thread), NULL, archive_thread_func, (void *)pctx)); } diff --git a/pcompress.c b/pcompress.c index 3c64fe4..8ff5318 100644 --- a/pcompress.c +++ b/pcompress.c @@ -171,18 +171,18 @@ usage(pc_ctx_t *pctx) static void show_compression_stats(pc_ctx_t *pctx) { - log_msg(LOG_INFO, 0, "\nCompression Statistics\n"); - log_msg(LOG_INFO, 0, "======================\n"); - log_msg(LOG_INFO, 0, "Total chunks : %u\n", pctx->chunk_num); + log_msg(LOG_INFO, 0, "\nCompression Statistics"); + log_msg(LOG_INFO, 0, "======================"); + log_msg(LOG_INFO, 0, "Total chunks : %u", pctx->chunk_num); if (pctx->chunk_num == 0) { - log_msg(LOG_INFO, 0, "No statistics to display.\n"); + log_msg(LOG_INFO, 0, "No statistics to display."); } else { - log_msg(LOG_INFO, 0, "Best compressed chunk : %s(%.2f%%)\n", + log_msg(LOG_INFO, 0, "Best compressed chunk : %s(%.2f%%)", bytes_to_size(pctx->smallest_chunk), (double)pctx->smallest_chunk/(double)pctx->chunksize*100); - log_msg(LOG_INFO, 0, "Worst compressed chunk : %s(%.2f%%)\n", + log_msg(LOG_INFO, 0, "Worst compressed chunk : %s(%.2f%%)", bytes_to_size(pctx->largest_chunk), (double)pctx->largest_chunk/(double)pctx->chunksize*100); pctx->avg_chunk /= pctx->chunk_num; - log_msg(LOG_INFO, 0, "Avg compressed chunk : %s(%.2f%%)\n\n", + log_msg(LOG_INFO, 0, "Avg compressed chunk : %s(%.2f%%)\n", bytes_to_size(pctx->avg_chunk), (double)pctx->avg_chunk/(double)pctx->chunksize*100); } } @@ -227,7 +227,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t /* * Execution won't come here but just in case ... */ - log_msg(LOG_ERR, 0, "Invalid preprocessing mode\n"); + log_msg(LOG_ERR, 0, "Invalid preprocessing mode"); return (-1); } @@ -315,14 +315,14 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 result = lzp_decompress((const uchar_t *)src, (uchar_t *)dst, srclen, hashsize, LZP_DEFAULT_LZPMINLEN, 0); if (result < 0) { - log_msg(LOG_ERR, 0, "LZP decompression failed.\n"); + log_msg(LOG_ERR, 0, "LZP decompression failed."); return (-1); } *dstlen = result; } if (!(type & (PREPROC_COMPRESSED | PREPROC_TYPE_DELTA2 | PREPROC_TYPE_LZP)) && type > 0) { - log_msg(LOG_ERR, 0, "Invalid preprocessing flags: %d\n", type); + log_msg(LOG_ERR, 0, "Invalid preprocessing flags: %d", type); return (-1); } return (0); @@ -402,7 +402,7 @@ redo: /* * HMAC verification failure is fatal. */ - log_msg(LOG_ERR, 0, "Chunk %d, HMAC verification failed\n", tdat->id); + log_msg(LOG_ERR, 0, "Chunk %d, HMAC verification failed", tdat->id); pctx->main_cancel = 1; tdat->len_cmp = 0; pctx->t_errored = 1; @@ -410,7 +410,7 @@ redo: return (NULL); } DEBUG_STAT_EN(en = get_wtime_millis()); - DEBUG_STAT_EN(fprintf(stderr, "HMAC Verification speed %.3f MB/s\n", + DEBUG_STAT_EN(fprintf(stderr, "HMAC Verification speed %.3f MB/s", get_mb_s(tdat->rbytes + sizeof (tdat->len_cmp_be), strt, en))); /* @@ -452,7 +452,7 @@ redo: /* * Header CRC32 verification failure is fatal. */ - log_msg(LOG_ERR, 0, "Chunk %d, Header CRC verification failed\n", tdat->id); + log_msg(LOG_ERR, 0, "Chunk %d, Header CRC verification failed", tdat->id); pctx->main_cancel = 1; tdat->len_cmp = 0; pctx->t_errored = 1; @@ -500,7 +500,7 @@ redo: } if (rv == -1) { tdat->len_cmp = 0; - log_msg(LOG_ERR, 0, "ERROR: Chunk %d, decompression failed.\n", tdat->id); + log_msg(LOG_ERR, 0, "ERROR: Chunk %d, decompression failed.", tdat->id); pctx->t_errored = 1; goto cont; } @@ -550,7 +550,7 @@ redo: if (rv == -1) { tdat->len_cmp = 0; - log_msg(LOG_ERR, 0, "ERROR: Chunk %d, decompression failed.\n", tdat->id); + log_msg(LOG_ERR, 0, "ERROR: Chunk %d, decompression failed.", tdat->id); pctx->t_errored = 1; goto cont; } @@ -564,7 +564,7 @@ redo: rctx->cbuf = tdat->compressed_chunk; dedupe_decompress(rctx, tdat->uncompressed_chunk, &(tdat->len_cmp)); if (!rctx->valid) { - log_msg(LOG_ERR, 0, "ERROR: Chunk %d, dedup recovery failed.\n", tdat->id); + log_msg(LOG_ERR, 0, "ERROR: Chunk %d, dedup recovery failed.", tdat->id); rv = -1; tdat->len_cmp = 0; pctx->t_errored = 1; @@ -596,7 +596,7 @@ redo: compute_checksum(checksum, pctx->cksum, tdat->uncompressed_chunk, _chunksize, tdat->cksum_mt, 1); if (memcmp(checksum, tdat->checksum, pctx->cksum_bytes) != 0) { tdat->len_cmp = 0; - log_msg(LOG_ERR, 0, "ERROR: Chunk %d, checksums do not match.\n", tdat->id); + log_msg(LOG_ERR, 0, "ERROR: Chunk %d, checksums do not match.", tdat->id); pctx->t_errored = 1; } } @@ -717,9 +717,9 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) } if (init_algo(pctx, algorithm, 0) != 0) { if (pctx->pipe_mode || filename == NULL) - log_msg(LOG_ERR, 0, "Input stream is not pcompressed.\n"); + log_msg(LOG_ERR, 0, "Input stream is not pcompressed."); else - log_msg(LOG_ERR, 0, "%s is not a pcompressed file.\n", filename); + log_msg(LOG_ERR, 0, "%s is not a pcompressed file.", filename); UNCOMP_BAIL; } pctx->algo = algorithm; @@ -741,23 +741,23 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) * Check for ridiculous values (malicious tampering or otherwise). */ if (version > VERSION) { - log_msg(LOG_ERR, 0, "Cannot handle newer archive version %d, capability %d\n", + log_msg(LOG_ERR, 0, "Cannot handle newer archive version %d, capability %d", version, VERSION); err = 1; goto uncomp_done; } if (chunksize > EIGHTY_PCT(get_total_ram())) { - log_msg(LOG_ERR, 0, "Chunk size must not exceed 80%% of total RAM.\n"); + log_msg(LOG_ERR, 0, "Chunk size must not exceed 80%% of total RAM."); err = 1; goto uncomp_done; } if (level > MAX_LEVEL || level < 0) { - log_msg(LOG_ERR, 0, "Invalid compression level in header: %d\n", level); + log_msg(LOG_ERR, 0, "Invalid compression level in header: %d", level); err = 1; goto uncomp_done; } if (version < VERSION-3) { - log_msg(LOG_ERR, 0, "Unsupported version: %d\n", version); + log_msg(LOG_ERR, 0, "Unsupported version: %d", version); err = 1; goto uncomp_done; } @@ -780,14 +780,14 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) if (flags & FLAG_DEDUP_FIXED) { if (version > 7) { if (pctx->pipe_mode) { - log_msg(LOG_ERR, 0, "Global Deduplication is not supported with pipe mode.\n"); + log_msg(LOG_ERR, 0, "Global Deduplication is not supported with pipe mode."); err = 1; goto uncomp_done; } pctx->enable_rabin_global = 1; dedupe_flag = RABIN_DEDUPE_FILE_GLOBAL; } else { - log_msg(LOG_ERR, 0, "Invalid file deduplication flags.\n"); + log_msg(LOG_ERR, 0, "Invalid file deduplication flags."); err = 1; goto uncomp_done; } @@ -812,7 +812,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) if (pctx->cksum == CKSUM_BLAKE512) pctx->cksum = CKSUM_SKEIN512; } if (get_checksum_props(NULL, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 1) == -1) { - log_msg(LOG_ERR, 0, "Invalid checksum algorithm code: %d. File corrupt ?\n", pctx->cksum); + log_msg(LOG_ERR, 0, "Invalid checksum algorithm code: %d. File corrupt ?", pctx->cksum); UNCOMP_BAIL; } @@ -854,7 +854,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) } else if (pctx->encrypt_type == CRYPTO_ALG_SALSA20) { noncelen = XSALSA20_CRYPTO_NONCEBYTES; } else { - log_msg(LOG_ERR, 0, "Invalid Encryption algorithm code: %d. File corrupt ?\n", + log_msg(LOG_ERR, 0, "Invalid Encryption algorithm code: %d. File corrupt ?", pctx->encrypt_type); UNCOMP_BAIL; } @@ -918,7 +918,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) free(salt2); memset(salt1, 0, saltlen); free(salt1); - log_msg(LOG_ERR, 0, "Failed to get password.\n"); + log_msg(LOG_ERR, 0, "Failed to get password."); UNCOMP_BAIL; } } else if (!pctx->user_pw) { @@ -956,7 +956,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) memset(salt1, 0, saltlen); free(salt1); close(uncompfd); unlink(to_filename); - log_msg(LOG_ERR, 0, "Failed to get password.\n"); + log_msg(LOG_ERR, 0, "Failed to get password."); UNCOMP_BAIL; } close(fd); @@ -971,7 +971,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) free(salt1); memset(pctx->user_pw, 0, pctx->user_pw_len); close(uncompfd); unlink(to_filename); - log_msg(LOG_ERR, 0, "Failed to initialize crypto\n"); + log_msg(LOG_ERR, 0, "Failed to initialize crypto"); UNCOMP_BAIL; } memset(pctx->user_pw, 0, pctx->user_pw_len); @@ -986,7 +986,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) free(salt1); memset(pw, 0, MAX_PW_LEN); close(uncompfd); unlink(to_filename); - log_msg(LOG_ERR, 0, "Failed to initialize crypto\n"); + log_msg(LOG_ERR, 0, "Failed to initialize crypto"); UNCOMP_BAIL; } memset(pw, 0, MAX_PW_LEN); @@ -1000,7 +1000,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) */ if (hmac_init(&hdr_mac, pctx->cksum, &(pctx->crypto_ctx)) == -1) { close(uncompfd); unlink(to_filename); - log_msg(LOG_ERR, 0, "Cannot initialize header hmac.\n"); + log_msg(LOG_ERR, 0, "Cannot initialize header hmac."); UNCOMP_BAIL; } hmac_update(&hdr_mac, (uchar_t *)pctx->algo, ALGO_SZ); @@ -1027,7 +1027,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) memset(n1, 0, noncelen); if (memcmp(hdr_hash2, hdr_hash1, pctx->mac_bytes) != 0) { close(uncompfd); unlink(to_filename); - log_msg(LOG_ERR, 0, "Header verification failed! File tampered or wrong password.\n"); + log_msg(LOG_ERR, 0, "Header verification failed! File tampered or wrong password."); UNCOMP_BAIL; } } else if (version >= 5) { @@ -1057,7 +1057,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) crc2 = lzma_crc32((uchar_t *)&d2, sizeof (level), crc2); if (crc1 != crc2) { close(uncompfd); unlink(to_filename); - log_msg(LOG_ERR, 0, "Header verification failed! File tampered or wrong password.\n"); + log_msg(LOG_ERR, 0, "Header verification failed! File tampered or wrong password."); UNCOMP_BAIL; } } @@ -1073,7 +1073,6 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) pctx->nthreads = 1; log_msg(LOG_INFO, 0, "Scaling to %d thread", pctx->nthreads * props.nthreads); if (pctx->nthreads * props.nthreads > 1) log_msg(LOG_INFO, 0, "s"); - log_msg(LOG_INFO, 0, "\n"); nprocs = pctx->nthreads; slab_cache_add(compressed_chunksize); slab_cache_add(chunksize); @@ -1083,7 +1082,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) for (i = 0; i < nprocs; i++) { dary[i] = (struct cmp_data *)slab_alloc(NULL, sizeof (struct cmp_data)); if (!dary[i]) { - log_msg(LOG_ERR, 0, "1: Out of memory\n"); + log_msg(LOG_ERR, 0, "1: Out of memory"); UNCOMP_BAIL; } tdat = dary[i]; @@ -1137,7 +1136,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) if (pctx->encrypt_type) { if (hmac_init(&tdat->chunk_hmac, pctx->cksum, &(pctx->crypto_ctx)) == -1) { - log_msg(LOG_ERR, 0, "Cannot initialize chunk hmac.\n"); + log_msg(LOG_ERR, 0, "Cannot initialize chunk hmac."); UNCOMP_BAIL; } } @@ -1202,7 +1201,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) if (rb < 0) log_msg(LOG_ERR, 1, "Read: "); else log_msg(LOG_ERR, 0, "Incomplete chunk %d header," - "file corrupt\n", pctx->chunk_num); + "file corrupt", pctx->chunk_num); UNCOMP_BAIL; } tdat->len_cmp_be = tdat->len_cmp; // Needed for HMAC @@ -1212,7 +1211,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) * Check for ridiculous length. */ if (tdat->len_cmp > chunksize + 256) { - log_msg(LOG_ERR, 0, "Compressed length too big for chunk: %d\n", + log_msg(LOG_ERR, 0, "Compressed length too big for chunk: %d", pctx->chunk_num); UNCOMP_BAIL; } @@ -1242,7 +1241,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); if (!tdat->compressed_chunk || !tdat->uncompressed_chunk) { - log_msg(LOG_ERR, 0, "2: Out of memory\n"); + log_msg(LOG_ERR, 0, "2: Out of memory"); UNCOMP_BAIL; } tdat->cmp_seg = tdat->uncompressed_chunk; @@ -1265,7 +1264,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename) log_msg(LOG_ERR, 1, "Read: "); UNCOMP_BAIL; } else { - log_msg(LOG_ERR, 0, "Incomplete chunk %d, file corrupt.\n", + log_msg(LOG_ERR, 0, "Incomplete chunk %d, file corrupt.", pctx->chunk_num); UNCOMP_BAIL; } @@ -1694,7 +1693,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev pw_len = get_pw_string(pw, "Please enter encryption password", 1); if (pw_len == -1) { - log_msg(LOG_ERR, 0, "Failed to get password.\n"); + log_msg(LOG_ERR, 0, "Failed to get password."); return (1); } } else if (!pctx->user_pw) { @@ -1724,7 +1723,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev } } if (pw_len == -1) { - log_msg(LOG_ERR, 1, "Failed to get password.\n"); + log_msg(LOG_ERR, 1, "Failed to get password."); return (1); } close(fd); @@ -1733,7 +1732,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev if (init_crypto(&(pctx->crypto_ctx), pctx->user_pw, pctx->user_pw_len, pctx->encrypt_type, NULL, 0, pctx->keylen, 0, ENCRYPT_FLAG) == -1) { memset(pctx->user_pw, 0, pctx->user_pw_len); - log_msg(LOG_ERR, 0, "Failed to initialize crypto\n"); + log_msg(LOG_ERR, 0, "Failed to initialize crypto."); return (1); } memset(pctx->user_pw, 0, pctx->user_pw_len); @@ -1743,7 +1742,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev if (init_crypto(&(pctx->crypto_ctx), pw, pw_len, pctx->encrypt_type, NULL, 0, pctx->keylen, 0, ENCRYPT_FLAG) == -1) { memset(pw, 0, MAX_PW_LEN); - log_msg(LOG_ERR, 0, "Failed to initialize crypto\n"); + log_msg(LOG_ERR, 0, "Failed to initialize crypto."); return (1); } memset(pw, 0, MAX_PW_LEN); @@ -1776,7 +1775,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev if (!S_ISREG(sbuf.st_mode)) { close(uncompfd); - log_msg(LOG_ERR, 0, "File %s is not a regular file.\n", filename); + log_msg(LOG_ERR, 0, "File %s is not a regular file.", filename); return (1); } @@ -1786,7 +1785,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev } } else { if (setup_archive(pctx, &sbuf) == -1) { - log_msg(LOG_ERR, 0, "Setup archive failed for %s\n", pctx->filename); + log_msg(LOG_ERR, 0, "Setup archive failed for %s", pctx->filename); return (1); } @@ -1795,7 +1794,6 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev * the rest of the compression stuff. */ uncompfd = pctx->uncompfd; - exit(0); } /* @@ -1958,21 +1956,20 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev log_msg(LOG_INFO, 0, "Scaling to %d thread", pctx->nthreads * props.nthreads); if (pctx->nthreads * props.nthreads > 1) log_msg(LOG_INFO, 0, "s"); nprocs = pctx->nthreads; - log_msg(LOG_INFO, 0, "\n"); dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *)); if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize); else cread_buf = (uchar_t *)slab_alloc(NULL, chunksize); if (!cread_buf) { - log_msg(LOG_ERR, 0, "3: Out of memory\n"); + log_msg(LOG_ERR, 0, "3: Out of memory"); COMP_BAIL; } for (i = 0; i < nprocs; i++) { dary[i] = (struct cmp_data *)slab_alloc(NULL, sizeof (struct cmp_data)); if (!dary[i]) { - log_msg(LOG_ERR, 0, "4: Out of memory\n"); + log_msg(LOG_ERR, 0, "4: Out of memory"); COMP_BAIL; } tdat = dary[i]; @@ -2006,7 +2003,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev if (pctx->encrypt_type) { if (hmac_init(&tdat->chunk_hmac, pctx->cksum, &(pctx->crypto_ctx)) == -1) { - log_msg(LOG_ERR, 0, "Cannot initialize chunk hmac.\n"); + log_msg(LOG_ERR, 0, "Cannot initialize chunk hmac."); COMP_BAIL; } } @@ -2119,7 +2116,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev unsigned int hlen; if (hmac_init(&hdr_mac, pctx->cksum, &(pctx->crypto_ctx)) == -1) { - log_msg(LOG_ERR, 0, "Cannot initialize header hmac.\n"); + log_msg(LOG_ERR, 0, "Cannot initialize header hmac."); COMP_BAIL; } hmac_update(&hdr_mac, cread_buf, pos - cread_buf); @@ -2213,7 +2210,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev tdat->compressed_chunk = tdat->cmp_seg + COMPRESSED_CHUNKSZ + pctx->cksum_bytes + pctx->mac_bytes; if (!tdat->cmp_seg || !tdat->uncompressed_chunk) { - log_msg(LOG_ERR, 0, "5: Out of memory\n"); + log_msg(LOG_ERR, 0, "5: Out of memory"); COMP_BAIL; } } @@ -2295,6 +2292,14 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev } comp_done: + /* + * First close the input fd of uncompressed data. If archiving this will cause + * the archive thread to exit and cleanup. + */ + if (!pctx->pipe_mode) { + if (uncompfd != -1) close(uncompfd); + } + if (pctx->t_errored) err = pctx->t_errored; if (thread) { for (i = 0; i < nprocs; i++) { @@ -2317,9 +2322,9 @@ comp_done: rm_fname(tmpfile1); } if (filename) - log_msg(LOG_ERR, 0, "Error compressing file: %s\n", filename); + log_msg(LOG_ERR, 0, "Error compressing file: %s", filename); else - log_msg(LOG_ERR, 0, "Error compressing\n"); + log_msg(LOG_ERR, 0, "Error compressing"); } else { /* * Write a trailer of zero chunk length. @@ -2376,9 +2381,9 @@ comp_done: slab_free(NULL, cread_buf); if (!pctx->pipe_mode) { if (compfd != -1) close(compfd); - if (uncompfd != -1) close(uncompfd); } + if (pctx->archive_mode) pthread_join(pctx->archive_thread, NULL); if (!pctx->hide_cmp_stats) show_compression_stats(pctx); pctx->_stats_func(!pctx->hide_cmp_stats); @@ -2586,7 +2591,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) pctx->do_compress = 1; pctx->algo = optarg; if (init_algo(pctx, pctx->algo, 1) != 0) { - log_msg(LOG_ERR, 0, "Invalid algorithm %s\n", optarg); + log_msg(LOG_ERR, 0, "Invalid algorithm %s", optarg); return (1); } break; @@ -2594,21 +2599,21 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) case 's': ovr = parse_numeric(&chunksize, optarg); if (ovr == 1) { - log_msg(LOG_ERR, 0, "Chunk size too large %s\n", optarg); + log_msg(LOG_ERR, 0, "Chunk size too large %s", optarg); return (1); } else if (ovr == 2) { - log_msg(LOG_ERR, 0, "Invalid number %s\n", optarg); + log_msg(LOG_ERR, 0, "Invalid number %s", optarg); return (1); } pctx->chunksize = chunksize; if (pctx->chunksize < MIN_CHUNK) { - log_msg(LOG_ERR, 0, "Minimum chunk size is %ld\n", MIN_CHUNK); + log_msg(LOG_ERR, 0, "Minimum chunk size is %ld", MIN_CHUNK); return (1); } if (pctx->chunksize > EIGHTY_PCT(get_total_ram())) { - log_msg(LOG_ERR, 0, "Chunk size must not exceed 80%% of total RAM.\n"); + log_msg(LOG_ERR, 0, "Chunk size must not exceed 80%% of total RAM."); return (1); } break; @@ -2616,7 +2621,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) case 'l': pctx->level = atoi(optarg); if (pctx->level < 0 || pctx->level > MAX_LEVEL) { - log_msg(LOG_ERR, 0, "Compression level should be in range 0 - 14\n"); + log_msg(LOG_ERR, 0, "Compression level should be in range 0 - 14"); return (1); } break; @@ -2624,7 +2629,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) case 'B': pctx->rab_blk_size = atoi(optarg); if (pctx->rab_blk_size < 0 || pctx->rab_blk_size > 5) { - log_msg(LOG_ERR, 0, "Average Dedupe block size must be in range 0 (2k), 1 (4k) .. 5 (64k)\n"); + log_msg(LOG_ERR, 0, "Average Dedupe block size must be in range 0 (2k), 1 (4k) .. 5 (64k)"); return (1); } break; @@ -2636,7 +2641,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) case 't': pctx->nthreads = atoi(optarg); if (pctx->nthreads < 1 || pctx->nthreads > 256) { - log_msg(LOG_ERR, 0, "Thread count should be in range 1 - 256\n"); + log_msg(LOG_ERR, 0, "Thread count should be in range 1 - 256"); return (1); } break; @@ -2668,7 +2673,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) case 'e': pctx->encrypt_type = get_crypto_alg(optarg); if (pctx->encrypt_type == 0) { - log_msg(LOG_ERR, 0, "Invalid encryption algorithm. Should be AES or SALSA20.\n", optarg); + log_msg(LOG_ERR, 0, "Invalid encryption algorithm. Should be AES or SALSA20.", optarg); return (1); } break; @@ -2693,7 +2698,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) case 'k': pctx->keylen = atoi(optarg); if ((pctx->keylen != 16 && pctx->keylen != 32) || pctx->keylen > MAX_KEYLEN) { - log_msg(LOG_ERR, 0, "Encryption KEY length should be 16 or 32.\n", optarg); + log_msg(LOG_ERR, 0, "Encryption KEY length should be 16 or 32.", optarg); return (1); } break; @@ -2701,7 +2706,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) case 'S': if (get_checksum_props(optarg, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 0) == -1) { - log_msg(LOG_ERR, 0, "Invalid checksum type %s\n", optarg); + log_msg(LOG_ERR, 0, "Invalid checksum type %s", optarg); return (1); } break; @@ -2740,28 +2745,30 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) */ num_rem = argc - my_optind; if (pctx->pipe_mode && num_rem > 0 ) { - log_msg(LOG_ERR, 0, "Filename(s) unexpected for pipe mode\n"); + log_msg(LOG_ERR, 0, "Filename(s) unexpected for pipe mode"); return (1); } if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan) && !pctx->do_compress) { - log_msg(LOG_ERR, 0, "Deduplication is only used during compression.\n"); + log_msg(LOG_ERR, 0, "Deduplication is only used during compression."); return (1); } if (!pctx->enable_rabin_scan) pctx->enable_rabin_split = 0; - if (pctx->enable_fixed_scan && (pctx->enable_rabin_scan || pctx->enable_delta_encode || pctx->enable_rabin_split)) { - log_msg(LOG_ERR, 0, "Rabin Deduplication and Fixed block Deduplication are mutually exclusive\n"); + if (pctx->enable_fixed_scan && (pctx->enable_rabin_scan || + pctx->enable_delta_encode || pctx->enable_rabin_split)) { + log_msg(LOG_ERR, 0, "Rabin Deduplication and Fixed block Deduplication" + "are mutually exclusive"); return (1); } if (!pctx->do_compress && pctx->encrypt_type) { - log_msg(LOG_ERR, 0, "Encryption only makes sense when compressing!\n"); + log_msg(LOG_ERR, 0, "Encryption only makes sense when compressing!"); return (1); } else if (pctx->pipe_mode && pctx->encrypt_type && !pctx->pwd_file) { - log_msg(LOG_ERR, 0, "Pipe mode requires password to be provided in a file.\n"); + log_msg(LOG_ERR, 0, "Pipe mode requires password to be provided in a file."); return (1); } @@ -2775,12 +2782,12 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) } if (pctx->enable_rabin_global && pctx->enable_delta_encode) { - log_msg(LOG_ERR, 0, "Global Deduplication does not support Delta Compression.\n"); + log_msg(LOG_ERR, 0, "Global Deduplication does not support Delta Compression."); return (1); } if (num_rem == 0 && !pctx->pipe_mode) { - log_msg(LOG_ERR, 0, "Expected at least one filename.\n"); + log_msg(LOG_ERR, 0, "Expected at least one filename."); return (1); } else if (num_rem == 1 || num_rem == 2) { @@ -2805,7 +2812,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) /* Check if compressed file exists */ if (pctx->to_filename != NULL) { - log_msg(LOG_ERR, 0, "Compressed file %s exists\n", pctx->to_filename); + log_msg(LOG_ERR, 0, "Compressed file %s exists", pctx->to_filename); free((void *)(pctx->to_filename)); return (1); } @@ -2818,7 +2825,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) /* Check if compressed file exists */ if (pctx->to_filename != NULL) { - log_msg(LOG_ERR, 0, "Compressed file %s exists\n", pctx->to_filename); + log_msg(LOG_ERR, 0, "Compressed file %s exists", pctx->to_filename); free((void *)(pctx->to_filename)); return (1); } @@ -2838,7 +2845,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) my_optind++; if ((pctx->to_filename = realpath(argv[my_optind], NULL)) != NULL) { free((void *)(pctx->to_filename)); - log_msg(LOG_ERR, 0, "File %s exists\n", argv[my_optind]); + log_msg(LOG_ERR, 0, "File %s exists", argv[my_optind]); return (1); } pctx->to_filename = argv[my_optind]; @@ -2846,7 +2853,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) return (1); } } else if (num_rem > 2) { - log_msg(LOG_ERR, 0, "Too many filenames.\n"); + log_msg(LOG_ERR, 0, "Too many filenames."); return (1); } pctx->main_cancel = 0; @@ -2855,7 +2862,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) get_checksum_props(DEFAULT_CKSUM, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 0); if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan) && pctx->cksum == CKSUM_CRC64) { - log_msg(LOG_ERR, 0, "CRC64 checksum is not suitable for Deduplication.\n"); + log_msg(LOG_ERR, 0, "CRC64 checksum is not suitable for Deduplication."); return (1); } diff --git a/pcompress.h b/pcompress.h index 1d6235f..2f082b9 100644 --- a/pcompress.h +++ b/pcompress.h @@ -201,6 +201,7 @@ typedef struct pc_ctx { char archive_members_file[MAXPATHLEN]; int archive_members_fd, archive_data_fd; void *archive_ctx; + pthread_t archive_thread; int uncompfd, compfd; unsigned int chunk_num; uint64_t largest_chunk, smallest_chunk, avg_chunk; diff --git a/utils/utils.c b/utils/utils.c index c6be5d4..3dce495 100644 --- a/utils/utils.c +++ b/utils/utils.c @@ -458,6 +458,7 @@ log_msg(log_level_t log_level, int show_errno, const char *format, ...) written = vsnprintf(msg, 1024, format, args); va_end(args); + written += snprintf(msg + written, 1024 - written, "\n"); if (written < 1024 && show_errno) { snprintf(msg + written, 1024 - written, "\nError: %s\n", strerror(err)); }