diff --git a/Makefile.in b/Makefile.in index 8a71942..d48e687 100644 --- a/Makefile.in +++ b/Makefile.in @@ -32,10 +32,10 @@ LIBVER=1 MAINSRCS = utils/utils.c allocator.c lzma_compress.c ppmd_compress.c \ adaptive_compress.c lzfx_compress.c lz4_compress.c none_compress.c \ utils/xxhash_base.c utils/heap.c utils/cpuid.c filters/analyzer/analyzer.c \ - pcompress.c + meta_stream.c pcompress.c MAINHDRS = allocator.h pcompress.h utils/utils.h utils/xxhash.h utils/heap.h \ utils/cpuid.h utils/xxhash.h archive/pc_archive.h filters/dispack/dis.hpp \ - filters/analyzer/analyzer.h + meta_stream.h filters/analyzer/analyzer.h MAINOBJS = $(MAINSRCS:.c=.o) PROGSRCS = main.c diff --git a/README.md b/README.md index cf1a162..f8f93f7 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,10 @@ Standard Usage The fastest checksum is the BLAKE2 family. + -T + Disable Metadata Streams. Pathname metadata is normally packed into separate + chunks distinct from file data. With this option this behavior is disabled. + Pathname of the resulting archive. A '.pz' extension is automatically added if not already present. This can also be specified as '-' in order to send diff --git a/archive/libarchive/libarchive/archive.h b/archive/libarchive/libarchive/archive.h index 7c5adef..9c99d63 100644 --- a/archive/libarchive/libarchive/archive.h +++ b/archive/libarchive/libarchive/archive.h @@ -518,6 +518,12 @@ __LA_DECL int archive_read_set_options(struct archive *_a, */ __LA_DECL int archive_request_is_metadata(struct archive *a); +/* + * Indicate whether separate metadata handling is being done by the + * callbacks. This triggers special behavior during archive read. + */ +__LA_DECL int archive_set_metadata_streaming(struct archive *a, int flag); + /*- * Convenience function to recreate the current entry (whose header * has just been read) on disk. diff --git a/archive/libarchive/libarchive/archive_private.h b/archive/libarchive/libarchive/archive_private.h index 720de77..614f47f 100644 --- a/archive/libarchive/libarchive/archive_private.h +++ b/archive/libarchive/libarchive/archive_private.h @@ -127,6 +127,15 @@ struct archive { * separately store/handle metadata and data. */ int cb_is_metadata; + + /* + * Set the metadata handling flag. This indicates to libarchive + * that callback routines are processing metadata as a separate + * stream. This means that cb_is_metadata flag is set during + * read/write. In addition, archive reads are handled differently + * using a shadow filter structure with separate copy buffer. + */ + int is_metadata_streaming; }; /* Check magic value and state; return(ARCHIVE_FATAL) if it isn't valid. */ diff --git a/archive/libarchive/libarchive/archive_read.c b/archive/libarchive/libarchive/archive_read.c index 048c316..9fdbd1a 100644 --- a/archive/libarchive/libarchive/archive_read.c +++ b/archive/libarchive/libarchive/archive_read.c @@ -453,7 +453,7 @@ int archive_read_open1(struct archive *_a) { struct archive_read *a = (struct archive_read *)_a; - struct archive_read_filter *filter, *tmp; + struct archive_read_filter *filter, *tmp = NULL; /* silence compiler */ int slot, e; unsigned int i; @@ -461,6 +461,8 @@ archive_read_open1(struct archive *_a) "archive_read_open"); archive_clear_error(&a->archive); + if (_a->is_metadata_streaming) + _a->cb_is_metadata = 1; if (a->client.reader == NULL) { archive_set_error(&a->archive, EINVAL, "No reader function provided to archive_read_open"); @@ -485,6 +487,13 @@ archive_read_open1(struct archive *_a) filter = calloc(1, sizeof(*filter)); if (filter == NULL) return (ARCHIVE_FATAL); + if (_a->is_metadata_streaming) { + tmp = calloc(1, sizeof(*filter)); + if (tmp == NULL) { + free(filter); + return (ARCHIVE_FATAL); + } + } filter->bidder = NULL; filter->upstream = NULL; filter->archive = a; @@ -497,6 +506,10 @@ archive_read_open1(struct archive *_a) filter->sswitch = client_switch_proxy; filter->name = "none"; filter->code = ARCHIVE_FILTER_NONE; + if (_a->is_metadata_streaming) { + memcpy(tmp, filter, sizeof (*filter)); + filter->shadow = tmp; + } a->client.dataset[0].begin_position = 0; if (!a->filter || !a->bypass_filter_bidding) @@ -533,6 +546,8 @@ archive_read_open1(struct archive *_a) /* Ensure libarchive starts from the first node in a multivolume set */ client_switch_proxy(a->filter, 0); + if (_a->is_metadata_streaming) + _a->cb_is_metadata = 0; return (e); } @@ -546,7 +561,7 @@ choose_filters(struct archive_read *a) { int number_bidders, i, bid, best_bid; struct archive_read_filter_bidder *bidder, *best_bidder; - struct archive_read_filter *filter; + struct archive_read_filter *filter, *tmp; ssize_t avail; int r; @@ -585,9 +600,20 @@ choose_filters(struct archive_read *a) = (struct archive_read_filter *)calloc(1, sizeof(*filter)); if (filter == NULL) return (ARCHIVE_FATAL); + if (a->archive.is_metadata_streaming) { + tmp = calloc(1, sizeof(*filter)); + if (tmp == NULL) { + free(filter); + return (ARCHIVE_FATAL); + } + } filter->bidder = best_bidder; filter->archive = a; filter->upstream = a->filter; + if (a->archive.is_metadata_streaming) { + memcpy(tmp, filter, sizeof (*filter)); + filter->shadow = tmp; + } a->filter = filter; r = (best_bidder->init)(a->filter); if (r != ARCHIVE_OK) { @@ -933,6 +959,8 @@ __archive_read_free_filters(struct archive_read *a) { while (a->filter != NULL) { struct archive_read_filter *t = a->filter->upstream; + if (a->archive.is_metadata_streaming) + free(a->filter->shadow); free(a->filter); a->filter = t; } @@ -1221,6 +1249,15 @@ __archive_read_filter_ahead(struct archive_read_filter *filter, return (NULL); } + /* + * Switch to shadow filter for metadata reads when using separate + * metadata stream. + */ + if (filter->archive->archive.is_metadata_streaming && + filter->archive->archive.cb_is_metadata) { + filter = filter->shadow; + } + /* * Keep pulling more data until we can satisfy the request. */ @@ -1397,6 +1434,15 @@ __archive_read_filter_consume(struct archive_read_filter * filter, if (request == 0) return 0; + /* + * Switch to shadow filter for metadata reads when using separate + * metadata stream. + */ + if (filter->archive->archive.is_metadata_streaming && + filter->archive->archive.cb_is_metadata) { + filter = filter->shadow; + } + skipped = advance_file_pointer(filter, request); if (skipped == request) return (skipped); @@ -1522,6 +1568,13 @@ __archive_read_filter_seek(struct archive_read_filter *filter, int64_t offset, if (filter->seek == NULL) return (ARCHIVE_FAILED); + /* + * No seeking when metadata streams are enabled. + */ + if (filter->archive->archive.is_metadata_streaming) { + return (ARCHIVE_FAILED); + } + client = &(filter->archive->client); switch (whence) { case SEEK_CUR: diff --git a/archive/libarchive/libarchive/archive_read_append_filter.c b/archive/libarchive/libarchive/archive_read_append_filter.c index 017d7c6..32fea06 100644 --- a/archive/libarchive/libarchive/archive_read_append_filter.c +++ b/archive/libarchive/libarchive/archive_read_append_filter.c @@ -40,7 +40,7 @@ archive_read_append_filter(struct archive *_a, int code) int r1, r2, number_bidders, i; char str[20]; struct archive_read_filter_bidder *bidder; - struct archive_read_filter *filter; + struct archive_read_filter *filter, *tmp; struct archive_read *a = (struct archive_read *)_a; r1 = r2 = (ARCHIVE_OK); @@ -126,6 +126,18 @@ archive_read_append_filter(struct archive *_a, int code) filter->bidder = bidder; filter->archive = a; filter->upstream = a->filter; + if (_a->is_metadata_streaming) + { + tmp = calloc(1, sizeof(*filter)); + if (tmp == NULL) + { + free(filter); + archive_set_error(&a->archive, ENOMEM, "Out of memory"); + return (ARCHIVE_FATAL); + } + memcpy(tmp, filter, sizeof (*filter)); + filter->shadow = tmp; + } a->filter = filter; r2 = (bidder->init)(a->filter); if (r2 != ARCHIVE_OK) { diff --git a/archive/libarchive/libarchive/archive_read_private.h b/archive/libarchive/libarchive/archive_read_private.h index 8a6c859..be0a890 100644 --- a/archive/libarchive/libarchive/archive_read_private.h +++ b/archive/libarchive/libarchive/archive_read_private.h @@ -114,6 +114,12 @@ struct archive_read_filter { char end_of_file; char closed; char fatal; + + /* + * Shadow structure for keeping track of metadata requests if + * metadata streaming is enabled. + */ + struct archive_read_filter *shadow; }; /* diff --git a/archive/libarchive/libarchive/archive_virtual.c b/archive/libarchive/libarchive/archive_virtual.c index 5ab26b6..7edbf52 100644 --- a/archive/libarchive/libarchive/archive_virtual.c +++ b/archive/libarchive/libarchive/archive_virtual.c @@ -113,9 +113,11 @@ archive_write_header(struct archive *a, struct archive_entry *entry) int rv; ++a->file_count; - a->cb_is_metadata = 1; + if (a->is_metadata_streaming) + a->cb_is_metadata = 1; rv = (a->vtable->archive_write_header)(a, entry); - a->cb_is_metadata = 0; + if (a->is_metadata_streaming) + a->cb_is_metadata = 0; return (rv); } @@ -148,9 +150,11 @@ archive_read_next_header(struct archive *a, struct archive_entry **entry) { int rv; - a->cb_is_metadata = 1; + if (a->is_metadata_streaming) + a->cb_is_metadata = 1; rv = (a->vtable->archive_read_next_header)(a, entry); - a->cb_is_metadata = 0; + if (a->is_metadata_streaming) + a->cb_is_metadata = 0; return (rv); } @@ -159,9 +163,11 @@ archive_read_next_header2(struct archive *a, struct archive_entry *entry) { int rv; - a->cb_is_metadata = 1; + if (a->is_metadata_streaming) + a->cb_is_metadata = 1; rv = (a->vtable->archive_read_next_header2)(a, entry); - a->cb_is_metadata = 0; + if (a->is_metadata_streaming) + a->cb_is_metadata = 0; return (rv); } @@ -177,3 +183,10 @@ archive_request_is_metadata(struct archive *a) { return (a->cb_is_metadata); } + +int +archive_set_metadata_streaming(struct archive *a, int flag) +{ + a->is_metadata_streaming = flag; + return (0); +} diff --git a/archive/pc_archive.c b/archive/pc_archive.c index cd527f4..21cb58a 100644 --- a/archive/pc_archive.c +++ b/archive/pc_archive.c @@ -48,7 +48,8 @@ #include #include #include -#include "pc_archive.h" +#include "archive/pc_archive.h" +#include "meta_stream.h" #undef _FEATURES_H #define _XOPEN_SOURCE 700 @@ -149,6 +150,24 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len return (-1); } + if (archive_request_is_metadata(arc) && pctx->meta_stream) { + int rv; + + /* + * Send the buf pointer over to the metadata thread. + */ + rv = meta_ctx_send(pctx->meta_ctx, &buf, &len); + if (rv == 0) { + archive_set_error(arc, ARCHIVE_EOF, "Metadata Thread communication error."); + return (-1); + + } else if (rv == -1) { + archive_set_error(arc, ARCHIVE_EOF, "Error reported by Metadata Thread."); + return (-1); + } + return (len); + } + if (!pctx->arc_writing) { Sem_Wait(&(pctx->write_sem)); } @@ -178,7 +197,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len } else { if (pctx->arc_buf_pos < pctx->min_chunk) { int diff = pctx->min_chunk - (int)(pctx->arc_buf_pos); - if (len > diff) + if (len >= diff) pctx->btype = pctx->ctype; else pctx->ctype = pctx->btype; @@ -282,6 +301,26 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf) return (-1); } + if (archive_request_is_metadata(arc) && pctx->meta_stream) { + int rv; + size_t len; + + /* + * Send the buf pointer over to the metadata thread. + */ + len = 0; + rv = meta_ctx_send(pctx->meta_ctx, buf, &len); + if (rv == 0) { + archive_set_error(arc, ARCHIVE_EOF, "Metadata Thread communication error."); + return (-1); + + } else if (rv == -1) { + archive_set_error(arc, ARCHIVE_EOF, "Error reported by Metadata Thread."); + return (-1); + } + return (len); + } + if (!pctx->arc_writing) { Sem_Wait(&(pctx->read_sem)); } else { @@ -295,6 +334,7 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf) archive_set_error(arc, ARCHIVE_EOF, "End of file when extracting archive."); return (-1); } + pctx->arc_writing = 1; *buf = pctx->arc_buf; @@ -817,6 +857,9 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) unlink(tmpfile); return (-1); } + + if (pctx->meta_stream) + archive_set_metadata_streaming(arc, 1); archive_write_set_format_pax_restricted(arc); archive_write_set_bytes_per_block(arc, 0); archive_write_open(arc, pctx, arc_open_callback, @@ -861,6 +904,8 @@ setup_extractor(pc_ctx_t *pctx) close(pipefd[0]); close(pipefd[1]); return (-1); } + if (pctx->meta_stream) + archive_set_metadata_streaming(arc, 1); archive_read_support_format_all(arc); pctx->archive_ctx = arc; pctx->arc_writing = 0; @@ -1169,7 +1214,7 @@ archiver_thread_func(void *dat) { } else { archive_entry_set_size(entry, archive_entry_size(entry)); } - log_msg(LOG_VERBOSE, 0, "%5d/%5d %8" PRIu64 " %s", ctr, pctx->archive_members_count, + log_msg(LOG_VERBOSE, 0, "%5d/%d %8" PRIu64 " %s", ctr, pctx->archive_members_count, archive_entry_size(entry), name); archive_entry_linkify(resolver, &entry, &spare_entry); diff --git a/archive/wavpack_helper.c b/archive/wavpack_helper.c index 81f32fa..4681d89 100644 --- a/archive/wavpack_helper.c +++ b/archive/wavpack_helper.c @@ -608,7 +608,7 @@ wavpack_filter_decode(uchar_t *in_buf, size_t len, uchar_t **out_buf, ssize_t ou } WavpackFreeWrapper(wpc); } else { - log_msg(LOG_ERR, 0, "Wavpack: RIFF wrapper size if zero. File corrupt?"); + log_msg(LOG_ERR, 0, "Wavpack: RIFF wrapper size is zero. File corrupt?"); WavpackCloseFile(wpc); return (0); } diff --git a/bzip2_compress.c b/bzip2_compress.c index 86b4537..3152f00 100644 --- a/bzip2_compress.c +++ b/bzip2_compress.c @@ -105,6 +105,19 @@ bzip2_compress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, char *dst1 = (char *)dst; char *src1 = (char *)src; + /* + * If the data is known to be compressed then certain types less compressed data + * can be attempted to be compressed again for a possible gain. For others it is + * a waste of time. + */ + if (PC_TYPE(btype) == TYPE_COMPRESSED && level < 7) { + int subtype = PC_SUBTYPE(btype); + + if (subtype != TYPE_COMPRESSED_LZW && subtype != TYPE_COMPRESSED_GZ && + subtype != TYPE_COMPRESSED_LZ && subtype != TYPE_COMPRESSED_LZO) { + return (-1); + } + } bzs.bzalloc = slab_alloc_i; bzs.bzfree = slab_free; bzs.opaque = NULL; @@ -174,19 +187,6 @@ bzip2_decompress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, char *dst1 = (char *)dst; char *src1 = (char *)src; - /* - * If the data is known to be compressed then certain types less compressed data - * can be attempted to be compressed again for a possible gain. For others it is - * a waste of time. - */ - if (PC_TYPE(btype) == TYPE_COMPRESSED && level < 7) { - int subtype = PC_SUBTYPE(btype); - - if (subtype != TYPE_COMPRESSED_LZW && subtype != TYPE_COMPRESSED_GZ && - subtype != TYPE_COMPRESSED_LZ && subtype != TYPE_COMPRESSED_LZO) { - return (-1); - } - } bzs.bzalloc = slab_alloc_i; bzs.bzfree = slab_free; bzs.opaque = NULL; diff --git a/filters/dict/DictFilter.cpp b/filters/dict/DictFilter.cpp index 759bd73..299b661 100644 --- a/filters/dict/DictFilter.cpp +++ b/filters/dict/DictFilter.cpp @@ -156,7 +156,7 @@ DictFilter::Forward_Dict(u8 *src, u32 size, u8 *dst, u32 *dstsize) u32 i,j,treePos = 0; u32 lastSymbol = 0; u32 dstSize = 0; - u32 idx; + int idx; for(i = 0; i < size-5;) { @@ -268,12 +268,20 @@ int dict_encode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) { DictFilter *df = static_cast(dict_ctx); - u32 fl = fromlen; - u32 dl = *dstlen; + u32 fl; + u32 dl; int atype; uchar_t *dst; DEBUG_STAT_EN(double strt, en); + /* + * Dict can't handle > 4GB buffers :-O + */ + if (fromlen > UINT32_MAX) + return (-1); + + fl = (u32)fromlen; + dl = (u32)(*dstlen); DEBUG_STAT_EN(strt = get_wtime_millis()); atype = analyze_buffer(from, fromlen); if (PC_TYPE(atype) == TYPE_TEXT) { @@ -298,11 +306,17 @@ int dict_decode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) { DictFilter *df = static_cast(dict_ctx); - u32 fl = fromlen; + u32 fl; u32 dl; u8 *src; DEBUG_STAT_EN(double strt, en); + if (fromlen > UINT32_MAX) { + log_msg(LOG_ERR, 0, "Dict decode buffer too big!"); + return (-1); + } + + fl = (u32)fromlen; DEBUG_STAT_EN(strt = get_wtime_millis()); dl = U32_P(from); if (dl > *dstlen) { diff --git a/filters/dispack/dis.cpp b/filters/dispack/dis.cpp index 669a0a0..781fe25 100644 --- a/filters/dispack/dis.cpp +++ b/filters/dispack/dis.cpp @@ -150,8 +150,8 @@ using namespace std; #define DISFILTER_BLOCK (32768) #define DISFILTERED 1 -#define ORIGSIZE 2 -#define CLEAR_DISFILTER 0xfe +#define ORIGSIZE 2 +#define E8E9 4 #define NORMAL_HDR (1 + 2) #define EXTENDED_HDR (1 + 2 + 2) // Dispack min reduction should be 8%, otherwise we abort @@ -927,6 +927,135 @@ is_x86_code(uchar_t *buf, int len) return (freq[0x8b] > avgFreq && freq[0x00] > avgFreq * 2 && freq[0xE8] > 6); } +/* + * E8E9 Filter from CSC 3.2 (Fu Siyuan). This is applied to blocks that can't + * be Disfiltered. + */ +class EFilter +{ +public: + static void Forward_E89(sU8 *src, sU32 size) + { + sU32 i,j; + sS32 c; + + E89init(); + for(i=0, j=0; i < size; i++) { + c = E89forward(src[i]); + if (c >= 0) src[j++]=c; + } + while((c = E89flush()) >= 0) src[j++] = c; + } + + static void Inverse_E89( sU8* src, sU32 size) + { + sU32 i,j; + sS32 c; + + E89init(); + for(i=0, j=0; i < size; i++) { + c = E89inverse(src[i]); + if (c >= 0) src[j++]=c; + } + while((c = E89flush()) >= 0) src[j++] = c; + } + +protected: + static sU32 x0,x1; + static sU32 i,k; + static sU8 cs; // cache size, F8 - 5 bytes + + ~EFilter() {} + EFilter() {} + + static void E89init(void) + { + cs = 0xFF; + x0 = x1 = 0; + i = 0; + k = 5; + } + + static sS32 E89cache_byte(sS32 c) + { + sS32 d = cs&0x80 ? -1 : (sU8)(x1); + x1 >>= 8; + x1 |= (x0<<24); + x0 >>= 8; + x0 |= (c<<24); + cs <<= 1; i++; + return d; + } + + static sU32 E89xswap(sU32 x) + { + x<<=7; + return (x>>24)|((sU8)(x>>16)<<8)|((sU8)(x>>8)<<16)|((sU8)(x)<<(24-7)); + } + + static sU32 E89yswap(sU32 x) + { + x = ((sU8)(x>>24)<<7)|((sU8)(x>>16)<<8)|((sU8)(x>>8)<<16)|(x<<24); + return x>>7; + } + + static sS32 E89forward(sS32 c) + { + sU32 x; + if(i >= k) { + if((x1&0xFE000000) == 0xE8000000) { + k = i+4; + x= x0 - 0xFF000000; + if( x<0x02000000 ) { + x = (x+i) & 0x01FFFFFF; + x = E89xswap(x); + x0 = x + 0xFF000000; + } + } + } + return E89cache_byte(c); + } + + static sS32 E89inverse(sS32 c) + { + sU32 x; + if(i >= k) { + if((x1&0xFE000000) == 0xE8000000) { + k = i+4; + x = x0 - 0xFF000000; + if(x < 0x02000000) { + x = E89yswap(x); + x = (x-i) & 0x01FFFFFF; + x0 = x + 0xFF000000; + } + } + } + return E89cache_byte(c); + } + + static sS32 E89flush(void) + { + sS32 d; + if(cs != 0xFF) { + while(cs & 0x80) E89cache_byte(0),++cs; + d = E89cache_byte(0); ++cs; + return d; + } else { + E89init(); + return -1; + } + } +}; + +/* + * Linker weirdo! + */ +sU32 EFilter::x0; +sU32 EFilter::x1; +sU32 EFilter::i; +sU32 EFilter::k; +sU8 EFilter::cs; + #ifdef __cplusplus extern "C" { #endif @@ -941,11 +1070,14 @@ int dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) { uchar_t *pos, *hdr, type, *pos_to, *to_last; - uint64_t len; + sU32 len; #ifdef DEBUG_STATS double strt, en; #endif + if (fromlen > UINT32_MAX) + return (-1); + if (fromlen < DISFILTER_BLOCK) return (-1); @@ -953,7 +1085,7 @@ dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) strt = get_wtime_millis(); #endif pos = from; - len = fromlen; + len = (sU32)fromlen; pos_to = to; to_last = to + *dstlen; while (len > 0) { @@ -962,6 +1094,7 @@ dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) sU16 origsize; sU32 out; sU8 *rv; + int dis_tried; if (len > DISFILTER_BLOCK) sz = DISFILTER_BLOCK; @@ -980,9 +1113,11 @@ dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) } out = sz; + dis_tried = 0; if (is_x86_code(pos, sz)) { ctx.ResetCtx(0, sz); rv = DisFilter(ctx, pos, sz, 0, pos_to, out); + dis_tried = 1; } else { rv = NULL; } @@ -990,11 +1125,19 @@ dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) if (pos_to + origsize >= to_last) { return (-1); } - type &= CLEAR_DISFILTER; + memcpy(pos_to, pos, origsize); + + /* + * If Dispack failed, we apply a simple E8E9 filter + * on the block. + */ + if (dis_tried) { + EFilter::Forward_E89(pos_to, origsize); + type |= E8E9; + } *hdr = type; hdr++; U16_P(hdr) = LE16(origsize); - memcpy(pos_to, pos, origsize); pos_to += origsize; } else { sU16 csize; @@ -1069,6 +1212,14 @@ dispack_decode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) return (-1); } memcpy(pos_to, pos, cmpsz); + + /* + * If E8E9 was applied on this block, apply the inverse transform. + * This only happens if this block was detected as x86 instruction + * stream and Dispack was tried but it failed. + */ + if (type & E8E9) + EFilter::Inverse_E89(pos_to, cmpsz); pos += cmpsz; pos_to += cmpsz; len -= cmpsz; diff --git a/lz4_compress.c b/lz4_compress.c index b0e5a36..dba0a66 100644 --- a/lz4_compress.c +++ b/lz4_compress.c @@ -20,7 +20,7 @@ * If not, see . * * moinakg@belenix.org, http://moinakg.wordpress.com/ - * + * */ #include @@ -89,7 +89,7 @@ int lz4_deinit(void **data) { struct lz4_params *lzdat = (struct lz4_params *)(*data); - + if (lzdat) { slab_free(NULL, lzdat); } diff --git a/meta_stream.c b/meta_stream.c index ba4212e..86f281e 100644 --- a/meta_stream.c +++ b/meta_stream.c @@ -34,31 +34,47 @@ #include #include #include +#include "pcompress.h" +#include "delta2/delta2.h" #include "utils/utils.h" +#include "lzma/lzma_crc.h" #include "allocator.h" #include "meta_stream.h" -extern int lz4_compress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, +#define METADATA_CHUNK_SIZE (2 * 1024 * 1024) + +extern int bzip2_compress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, int level, uchar_t chdr, int btype, void *data); +extern int bzip2_decompress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, + int level, uchar_t chdr, int btype, void *data); +extern void bzip2_props(algo_props_t *data, int level, uint64_t chunksize); enum { SRC_CHANNEL = 0, SINK_CHANNEL }; -/* - * Metadata chunk header format: - * 64-bit integer = 1: Compressed length: This indicates that this is a metadata chunk - * 64-bit integer: Real Compressed length - * 64-bit integer: Uncompressed original length - * 1 Byte: Chunk flag - * Upto 64-bytes: Checksum, HMAC if encrypting - * 32-bit integer: Header CRC32 if not encrypting - */ +struct _meta_ctx { + int meta_pipes[2]; + pc_ctx_t *pctx; + pthread_t meta_thread; + uchar_t *frombuf, *tobuf; + uint64_t frompos, topos, tosize; + uchar_t checksum[CKSUM_MAX_BYTES]; + void *bzip2_dat; + int comp_level, id; + int comp_fd; + int running; + int delta2_nstrides; + int do_compress; + mac_ctx_t chunk_hmac; + algo_props_t props; +}; + static int compress_and_write(meta_ctx_t *mctx) { - pc_ctx_t pctx = mctx->pctx; + pc_ctx_t *pctx = mctx->pctx; uchar_t type; uchar_t *comp_chunk, *tobuf; int rv; @@ -74,21 +90,42 @@ compress_and_write(meta_ctx_t *mctx) mctx->frompos, 0, 1); } - type = COMPRESSED; - U64_P(mctx->tobuf) = LE64(1); // Indicate metadata chunk - tobuf = mctx->tobuf + 8; - comp_chunk = mctx->tobuf + METADATA_HDR_SZ; - dstlen = METADATA_CHUNK_SIZE; + type = 0; + /* + * This is normally the compressed chunk size for data chunks. Here we + * set it to 1 to indicate that this is a metadata chunk. This value is + * always big-endian format. The next value is the real compressed + * chunk size. + */ + tobuf = mctx->tobuf; + U64_P(tobuf) = htonll(METADATA_INDICATOR); + comp_chunk = tobuf + METADATA_HDR_SZ; + dstlen = mctx->frompos; + + /* + * Apply Delta2 filter. + */ + rv = delta2_encode(mctx->frombuf, mctx->frompos, comp_chunk, &dstlen, + mctx->props.delta2_span, mctx->delta2_nstrides); + if (rv != -1) { + memcpy(mctx->frombuf, comp_chunk, dstlen); + mctx->frompos = dstlen; + type |= PREPROC_TYPE_DELTA2; + } else { + dstlen = mctx->frompos; + } /* * Ok, now compress. */ - rv = lz4_compress(mctx->frombuf, mctx->frompos, comp_chunk, &dstlen, mctx->level, - 0, TYPE_BINARY, mctx->lz4_dat); + rv = bzip2_compress(mctx->frombuf, mctx->frompos, comp_chunk, &dstlen, mctx->comp_level, + 0, TYPE_BINARY, mctx->bzip2_dat); if (rv < 0) { - type = UNCOMPRESSED; - memcpy(comp_chunk, mctx->frombuf, mctx->frompos); + dstlen = mctx->frompos; + memcpy(comp_chunk, mctx->frombuf, dstlen); + } else { + type |= PREPROC_COMPRESSED; } if (pctx->encrypt_type) { @@ -96,46 +133,47 @@ compress_and_write(meta_ctx_t *mctx) if (rv == -1) { pctx->main_cancel = 1; pctx->t_errored = 1; - log_msg(LOG_ERR, 0, "Metadata Encrypion failed") + log_msg(LOG_ERR, 0, "Metadata Encrypion failed"); return (0); } } /* - * Add header size to the compressed length minus the initial 64-bit value. - * That one is a flag value which is read separately during decompression. + * Store the compressed length of the data segment. While reading we have to account + * for the header. */ - dstlen += METADATA_HDR_SZ - COMPRESSED_CHUNKSIZE; - U64_P(mctx->tobuf + 8) = LE64(dstlen); - U64_P(mctx->tobuf + 16) = LE64(mctx->frompos); + U64_P(tobuf + 8) = LE64(dstlen); + U64_P(tobuf + 16) = LE64(mctx->frompos); + *(tobuf + 24) = type; + if (!pctx->encrypt_type) - serialize_checksum(mctx->checksum, mctx->tobuf + 24, pctx->cksum_bytes); + serialize_checksum(mctx->checksum, tobuf + 25, pctx->cksum_bytes); if (pctx->encrypt_type) { - uchar_t mac_ptr; uchar_t chash[pctx->mac_bytes]; unsigned int hlen; + uchar_t *mac_ptr; - mac_ptr = mctx->tobuf + 24 + pctx->cksum_bytes; // cksum_bytes will be 0 here but ... - memset(mac_ptr, 0, pctx->mac_bytes); + mac_ptr = tobuf + 25; + memset(mac_ptr, 0, CKSUM_MAX + CRC32_SIZE); hmac_reinit(&mctx->chunk_hmac); - hmac_update(&tdat->chunk_hmac, tobuf, dstlen); - hmac_final(&tdat->chunk_hmac, chash, &hlen); + hmac_update(&mctx->chunk_hmac, tobuf, dstlen + METADATA_HDR_SZ); + hmac_final(&mctx->chunk_hmac, chash, &hlen); serialize_checksum(chash, mac_ptr, hlen); } else { - uchar_t mac_ptr; uint32_t crc; + uchar_t *mac_ptr; - mac_ptr = mctx->tobuf + 24 + pctx->cksum_bytes; - memset(mac_ptr, 0, pctx->mac_bytes); - crc = lzma_crc32(tdat->cmp_seg, rbytes, 0); - U32_P(mac_ptr) = LE32(crc); + mac_ptr = tobuf + 25 + CKSUM_MAX; + memset(mac_ptr, 0, CRC32_SIZE); + crc = lzma_crc32(tobuf, METADATA_HDR_SZ, 0); + U32_P(tobuf + 25 + CKSUM_MAX) = LE32(crc); } /* * All done. Now grab lock and write. */ - dstlen += COMPRESSED_CHUNKSIZE; // The 'full' chunk now + dstlen += METADATA_HDR_SZ; // The 'full' chunk now pthread_mutex_lock(&pctx->write_mutex); wbytes = Write(mctx->comp_fd, mctx->tobuf, dstlen); pthread_mutex_unlock(&pctx->write_mutex); @@ -153,56 +191,74 @@ compress_and_write(meta_ctx_t *mctx) void meta_ctx_close_sink_channel(meta_ctx_t *mctx) { - if (mctx->pipes[SINK_CHANNEL]) { - close(mctx->pipes[SINK_CHANNEL]); - mctx->pipes[SINK_CHANNEL] = 0; + if (mctx->meta_pipes[SINK_CHANNEL]) { + close(mctx->meta_pipes[SINK_CHANNEL]); + mctx->meta_pipes[SINK_CHANNEL] = 0; } } void meta_ctx_close_src_channel(meta_ctx_t *mctx) { - if (mctx->pipes[SRC_CHANNEL]) { - close(mctx->pipes[SRC_CHANNEL]); - mctx->pipes[SRC_CHANNEL] = 0; + if (mctx->meta_pipes[SRC_CHANNEL]) { + close(mctx->meta_pipes[SRC_CHANNEL]); + mctx->meta_pipes[SRC_CHANNEL] = 0; } } +/* + * Accumulate metadata into a memory buffer. Once the buffer gets filled or + * data stream ends, the buffer is compressed and written out. + */ static void * metadata_compress(void *dat) { meta_ctx_t *mctx = (meta_ctx_t *)dat; - meta_msg_t msg; + meta_msg_t *msgp; int ack; - while (Read(mctx->meta_pipes[SINK_CHANNEL], &msg, sizeof (msg)) == sizeof (msg)) { + mctx->running = 1; + while (Read(mctx->meta_pipes[SINK_CHANNEL], &msgp, sizeof (msgp)) == sizeof (msgp)) { ack = 0; - if (mctx->frompos + msg.len > METADATA_CHUNK_SIZE) { + if (mctx->frompos + msgp->len > METADATA_CHUNK_SIZE) { + /* + * Accumulating the metadata block will overflow buffer. Compress + * and write the current buffer and then copy the new data into it. + */ if (!compress_and_write(mctx)) { Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); return (NULL); } mctx->frompos = 0; - memcpy(mctx->frombuf, msg.buf, msg.len); - mctx->frompos = msg.len; + memcpy(mctx->frombuf, msgp->buf, msgp->len); + mctx->frompos = msgp->len; - } else if (mctx->frompos + msg.len == METADATA_CHUNK_SIZE) { - memcpy(mctx->frombuf + mctx->frompos, msg.buf, msg.len); + } else if (mctx->frompos + msgp->len == METADATA_CHUNK_SIZE) { + /* + * Accumulating the metadata block fills the buffer. Fill it then + * compress and write the buffer. + */ + memcpy(mctx->frombuf + mctx->frompos, msgp->buf, msgp->len); if (!compress_and_write(mctx)) { Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); return (NULL); } mctx->frompos = 0; } else { - memcpy(mctx->frombuf + mctx->frompos, msg.buf, msg.len); - mctx->frompos += msg.len; + /* + * Accumulate the metadata block into the buffer for future + * compression. + */ + memcpy(mctx->frombuf + mctx->frompos, msgp->buf, msgp->len); + mctx->frompos += msgp->len; } ack = 1; - Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)) + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); } + mctx->running = 0; /* - * Flush pending chunk. + * Flush any accumulated data in the buffer. */ if (mctx->frompos) { if (!compress_and_write(mctx)) { @@ -212,16 +268,225 @@ metadata_compress(void *dat) } mctx->frompos = 0; } + return (NULL); +} + +static int +decompress_data(meta_ctx_t *mctx) +{ + uint64_t origlen, len_cmp, dstlen; + uchar_t *cbuf, *cseg, *ubuf, type; + pc_ctx_t *pctx = mctx->pctx; + uchar_t checksum[CKSUM_MAX_BYTES]; + int rv; + + cbuf = mctx->frombuf; + ubuf = mctx->tobuf; + len_cmp = LE64(U64_P(cbuf + 8)); + origlen = LE64(U64_P(cbuf + 16)); + type = *(cbuf + 24); + cseg = cbuf + METADATA_HDR_SZ; + dstlen = origlen; + + /* + * If this was encrypted: + * Verify HMAC first before anything else and then decrypt compressed data. + */ + if (pctx->encrypt_type) { + unsigned int len; + + len = pctx->mac_bytes; + deserialize_checksum(checksum, cbuf + 25, pctx->mac_bytes); + memset(cbuf + 25, 0, pctx->mac_bytes + CRC32_SIZE); + hmac_reinit(&mctx->chunk_hmac); + hmac_update(&mctx->chunk_hmac, cbuf, mctx->frompos); + hmac_final(&mctx->chunk_hmac, mctx->checksum, &len); + if (memcmp(checksum, mctx->checksum, len) != 0) { + log_msg(LOG_ERR, 0, "Metadata chunk %d, HMAC verification failed", + mctx->id); + return (0); + } + rv = crypto_buf(&(pctx->crypto_ctx), cseg, cseg, len_cmp, mctx->id); + if (rv == -1) { + /* + * Decryption failure is fatal. + */ + log_msg(LOG_ERR, 0, "Metadata chunk %d, Decryption failed", + mctx->id); + return (0); + } + } else { + uint32_t crc1, crc2; + + /* + * Verify Header CRC32 in non-crypto mode. + */ + crc1 = U32_P(cbuf + 25 + CKSUM_MAX); + memset(cbuf + 25 + CKSUM_MAX, 0, CRC32_SIZE); + crc2 = lzma_crc32(cbuf, METADATA_HDR_SZ, 0); + + if (crc1 != crc2) { + /* + * Header CRC32 verification failure is fatal. + */ + log_msg(LOG_ERR, 0, "Metadata chunk %d, Header CRC verification failed", + mctx->id); + return (0); + } + } + + if (type & PREPROC_COMPRESSED) { + rv = bzip2_decompress(cseg, len_cmp, ubuf, &dstlen, mctx->comp_level, + 0, TYPE_BINARY, mctx->bzip2_dat); + if (rv == -1) { + log_msg(LOG_ERR, 0, "Metadata chunk %d, decompression failed.", mctx->id); + return (0); + } + } else { + memcpy(ubuf, cseg, len_cmp); + } + + if (type & PREPROC_TYPE_DELTA2) { + dstlen = origlen; + rv = delta2_decode(ubuf, len_cmp, cseg, &dstlen); + if (rv == -1) { + log_msg(LOG_ERR, 0, "Metadata chunk %d, Delta2 decoding failed.", mctx->id); + return (0); + } + memcpy(ubuf, cseg, dstlen); + } + mctx->topos = 0; + mctx->tosize = dstlen; + return (1); } static void * metadata_decompress(void *dat) { + meta_ctx_t *mctx = (meta_ctx_t *)dat; + pc_ctx_t *pctx; + meta_msg_t *msgp; + int ack; + + pctx = mctx->pctx; + mctx->running = 1; + mctx->topos = mctx->tosize = 0; + mctx->id = -1; + while (Read(mctx->meta_pipes[SINK_CHANNEL], &msgp, sizeof (msgp)) == sizeof (msgp)) { + int64_t rb; + uint64_t len_cmp; + + /* + * Scan to the next metadata chunk and decompress it, if our in-memory data + * is fully consumed or not filled. + */ + if (mctx->topos == mctx->tosize) { + uchar_t *frombuf = mctx->frombuf; + + mctx->id++; + while ((rb = Read(mctx->comp_fd, &len_cmp, sizeof (len_cmp)) + == sizeof(len_cmp))) { + len_cmp = ntohll(len_cmp); + if (len_cmp != METADATA_INDICATOR) { + uint64_t skiplen; + + if (len_cmp == 0) { + /* + * We have reached the end of the file. + */ + msgp->len = 0; + ack = 1; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + skiplen = len_cmp + pctx->cksum_bytes + pctx->mac_bytes + + CHUNK_FLAG_SZ; + int64_t cpos = lseek(mctx->comp_fd, skiplen, SEEK_CUR); + if (cpos == -1) { + log_msg(LOG_ERR, 1, "Cannot find/seek next metadata block."); + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + } else { + break; + } + } + if (rb == -1) { + log_msg(LOG_ERR, 1, "Failed read from metadata fd: "); + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + + } else if (rb == 0) { + /* + * We have reached the end of the file. + */ + msgp->len = 0; + ack = 1; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + U64_P(frombuf) = htonll(len_cmp); + frombuf += 8; + + /* + * We are at the start of a metadata chunk. Read the size. + */ + if ((rb = Read(mctx->comp_fd, &len_cmp, sizeof (len_cmp))) + != sizeof(len_cmp)) { + log_msg(LOG_ERR, 1, "Failed to read size from metadata fd: %lld", rb); + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + U64_P(frombuf) = len_cmp; + frombuf += 8; + len_cmp = LE64(len_cmp); + + /* + * Now read the rest of the chunk. This is rest of the header plus the + * data segment. + */ + len_cmp = len_cmp + (METADATA_HDR_SZ - (frombuf - mctx->frombuf)); + rb = Read(mctx->comp_fd, frombuf, len_cmp); + if (rb != len_cmp) { + log_msg(LOG_ERR, 1, "Failed to read chunk from metadata fd: "); + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + mctx->topos = 0; + mctx->frompos = len_cmp + METADATA_HDR_SZ; + + /* + * Now decompress. + */ + if (!decompress_data(mctx)) { + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + } + + msgp->buf = mctx->tobuf; + msgp->len = mctx->tosize; + mctx->topos = mctx->tosize; + ack = 1; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + } + + return (NULL); } +/* + * Create the metadata thread and associated buffers. This writes out compressed + * metadata chunks into the archive. This is libarchive metadata. + */ meta_ctx_t * -meta_ctx_create(pc_ctx_t *pctx, int file_version, int comp_fd) +meta_ctx_create(void *pc, int file_version, int comp_fd) { + pc_ctx_t *pctx = (pc_ctx_t *)pc; meta_ctx_t *mctx; mctx = (meta_ctx_t *)malloc(sizeof (meta_ctx_t)); @@ -230,8 +495,10 @@ meta_ctx_create(pc_ctx_t *pctx, int file_version, int comp_fd) return (NULL); } + mctx->running = 0; if (pctx->encrypt_type) { - if (hmac_init(&mctx->chunk_hmac, pctx->cksum, &(pctx->crypto_ctx)) == -1) { + if (hmac_init(&mctx->chunk_hmac, pctx->cksum, + &(pctx->crypto_ctx)) == -1) { (void) free(mctx); log_msg(LOG_ERR, 0, "Cannot initialize metadata hmac."); return (NULL); @@ -255,6 +522,10 @@ meta_ctx_create(pc_ctx_t *pctx, int file_version, int comp_fd) return (NULL); } + /* + * The archiver passes metadata via this socketpair. Memory buffer pointers + * are passed through the socket for speed rather than the contents. + */ mctx->pctx = pctx; if (socketpair(AF_UNIX, SOCK_STREAM, 0, mctx->meta_pipes) == -1) { (void) free(mctx->frombuf); @@ -264,60 +535,69 @@ meta_ctx_create(pc_ctx_t *pctx, int file_version, int comp_fd) return (NULL); } + if (pctx->level > 9) + mctx->delta2_nstrides = NSTRIDES_EXTRA; + else + mctx->delta2_nstrides = NSTRIDES_STANDARD; if (pctx->do_compress) { - int rv, level; + int rv; mctx->comp_level = pctx->level; - rv = lz4_init(&mctx->lz4_dat, &mctx->comp_level, 1, METADATA_CHUNK_SIZE, + rv = bzip2_init(&mctx->bzip2_dat, &mctx->comp_level, 1, METADATA_CHUNK_SIZE, file_version, COMPRESS); + bzip2_props(&mctx->props, pctx->level, METADATA_CHUNK_SIZE); if (rv != 0 || pthread_create(&(mctx->meta_thread), NULL, metadata_compress, (void *)mctx) != 0) { - (void) close(pctx->meta_pipes[0]); - (void) close(pctx->meta_pipes[1]); + (void) close(mctx->meta_pipes[0]); + (void) close(mctx->meta_pipes[1]); (void) free(mctx->frombuf); (void) free(mctx->tobuf); (void) free(mctx); if (rv == 0) - log_msg(LOG_ERR, 0, "LZ4 init failed."); + log_msg(LOG_ERR, 0, "Lzma init failed."); else log_msg(LOG_ERR, 1, "Unable to create metadata thread."); return (NULL); } } else { - int rv, level; + int rv; mctx->comp_level = pctx->level; - rv = lz4_init(&mctx->lz4_dat, &mctx->comp_level, 1, METADATA_CHUNK_SIZE, + rv = bzip2_init(&mctx->bzip2_dat, &mctx->comp_level, 1, METADATA_CHUNK_SIZE, file_version, DECOMPRESS); if (rv != 0 || pthread_create(&(mctx->meta_thread), NULL, metadata_decompress, (void *)mctx) != 0) { - (void) close(pctx->meta_pipes[0]); - (void) close(pctx->meta_pipes[1]); + (void) close(mctx->meta_pipes[0]); + (void) close(mctx->meta_pipes[1]); (void) free(mctx->frombuf); (void) free(mctx->tobuf); (void) free(mctx); if (rv == 0) - log_msg(LOG_ERR, 0, "LZ4 init failed."); + log_msg(LOG_ERR, 0, "Lzma init failed."); else log_msg(LOG_ERR, 1, "Unable to create metadata thread."); return (NULL); } } + mctx->do_compress = pctx->do_compress; return (mctx); } - int -meta_ctx_write(meta_ctx_t *mctx, meta_msg_t *msg) +meta_ctx_send(meta_ctx_t *mctx, const void **buf, size_t *len) { int ack; - meta_msg_t msg; + meta_msg_t msg, *msgp; /* * Write the message buffer to the pipe. */ - if (Write(pctx->meta_pipes[SRC_CHANNEL], &msg, sizeof (msg)) < sizeof (meta_msg_t)) { + msg.buf = *buf; + msg.len = *len; + msgp = &msg; + if (Write(mctx->meta_pipes[SRC_CHANNEL], &msgp, sizeof (msgp)) < + sizeof (msgp)) { log_msg(LOG_ERR, 1, "Meta socket write error."); return (0); } @@ -325,7 +605,8 @@ meta_ctx_write(meta_ctx_t *mctx, meta_msg_t *msg) /* * Wait for ACK. */ - if (Read(pctx->meta_pipes[SRC_CHANNEL], &ack, sizeof (ack)) < sizeof (ack)) { + if (Read(mctx->meta_pipes[SRC_CHANNEL], &ack, sizeof (ack)) < + sizeof (ack)) { log_msg(LOG_ERR, 1, "Meta socket read error."); return (0); } @@ -334,16 +615,19 @@ meta_ctx_write(meta_ctx_t *mctx, meta_msg_t *msg) return (-1); } + *len = msg.len; + *buf = msg.buf; return (1); } -int -meta_ctx_read(meta_ctx_t *mctx, meta_msg_t *msg) -{ -} - int meta_ctx_done(meta_ctx_t *mctx) { + meta_ctx_close_src_channel(mctx); + meta_ctx_close_sink_channel(mctx); + if (!mctx->do_compress) + close(mctx->comp_fd); + pthread_join(mctx->meta_thread, NULL); + return (0); } diff --git a/meta_stream.h b/meta_stream.h index 65e34eb..d34cc17 100644 --- a/meta_stream.h +++ b/meta_stream.h @@ -23,39 +23,40 @@ * */ -#ifndef _PCOMPRESS_H -#define _PCOMPRESS_H - -#include -#include "utils/utils.h" +#ifndef _META_STREAM_H +#define _META_STREAM_H #ifdef __cplusplus extern "C" { #endif -#define METADATA_CHUNK_SIZE (2 * 1024 * 1024) -#define METADATA_HDR_SZ (CHUNK_HDR_SZ + COMPRESSED_CHUNKSIZE + pctx->mac_bytes) +/* + * The chunk size value which indicates a metadata chunk. + */ +#define METADATA_INDICATOR 1 -struct _meta_ctx { - int meta_pipes[2]; - pc_ctx_t *pctx; - pthread_t meta_thread; - uchar_t *frombuf, *tobuf; - uint64_t frompos; - uchar_t checksum[CKSUM_MAX_BYTES]; - void *lz4_dat; - int comp_level; - mac_ctx_t chunk_hmac; -} meta_ctx_t; +/* + * Metadata chunk header format: + * 64-bit integer = 1: Compressed length: This indicates that this is a metadata chunk + * 64-bit integer: Compressed length (data portion only) + * 64-bit integer: Uncompressed original length + * 1 Byte: Chunk flag + * Upto 64-bytes: Checksum. This is HMAC if encrypting + * 32-bit integer: Header CRC32 if not encrypting, otherwise empty. + */ +#define CKSUM_MAX 64 +#define CRC32_SIZE 4 +#define METADATA_HDR_SZ (8 * 3 + 1 + CKSUM_MAX + CRC32_SIZE) -struct _meta_msg { - uchar_t *buf; +typedef struct _meta_ctx meta_ctx_t; + +typedef struct _meta_msg { + const uchar_t *buf; size_t len; } meta_msg_t; -meta_ctx_t *meta_ctx_create(pc_ctx_t *pctx, int file_version int comp_fd); -int meta_ctx_write(meta_ctx_t *mctx, meta_msg_t *msg); -int meta_ctx_read(meta_ctx_t *mctx, meta_msg_t *msg); +meta_ctx_t *meta_ctx_create(void *pc, int file_version, int comp_fd); +int meta_ctx_send(meta_ctx_t *mctx, const void **buf, size_t *len); int meta_ctx_done(meta_ctx_t *mctx); void meta_ctx_close_sink_channel(meta_ctx_t *mctx); void meta_ctx_close_src_channel(meta_ctx_t *mctx); diff --git a/pcompress.c b/pcompress.c index ea05d12..9ab84f9 100644 --- a/pcompress.c +++ b/pcompress.c @@ -109,6 +109,7 @@ usage(pc_ctx_t *pctx) " -v Enables verbose mode.\n\n" " -t \n" " Sets the number of compression threads. Default: core count.\n" +" -T Disable separate metadata stream.\n" " -S \n" " The chunk verification checksum. Default: BLAKE256. Others are: CRC64, SHA256,\n" " SHA512, KECCAK256, KECCAK512, BLAKE256, BLAKE512.\n" @@ -206,7 +207,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t algo_props_t *props, int interesting) { uchar_t *dest = (uchar_t *)dst, type = 0; - int64_t result; + int result; uint64_t _dstlen, fromlen; uchar_t *from, *to; int stype, dict; @@ -264,6 +265,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t #ifndef _MPLV2_LICENSE_ if (pctx->lzp_preprocess && stype != TYPE_BMP && stype != TYPE_TIFF) { int hashsize; + int64_t result; hashsize = lzp_hash_size(level); result = lzp_compress((const uchar_t *)from, to, fromlen, @@ -310,7 +312,8 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t U64_P(dest + 1) = htonll(srclen); _dstlen = srclen; DEBUG_STAT_EN(strt = get_wtime_millis()); - result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, (dict?TYPE_TEXT:btype), data); + result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, + (dict?TYPE_TEXT:btype), data); DEBUG_STAT_EN(en = get_wtime_millis()); if (result > -1 && _dstlen < srclen) { @@ -341,7 +344,7 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 algo_props_t *props) { uchar_t *sorc = (uchar_t *)src, type; - int64_t result; + int result; uint64_t _dstlen = *dstlen, _dstlen1 = *dstlen; DEBUG_STAT_EN(double strt, en); @@ -380,6 +383,8 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 if (type & PREPROC_TYPE_LZP) { #ifndef _MPLV2_LICENSE_ int hashsize; + int64_t result; + hashsize = lzp_hash_size(level); result = lzp_decompress((const uchar_t *)src, (uchar_t *)dst, srclen, hashsize, LZP_DEFAULT_LZPMINLEN, 0); @@ -390,10 +395,11 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 _dstlen = result; } else { log_msg(LOG_ERR, 0, "LZP decompression failed."); - return (result); + return ((int)result); } #else - log_msg(LOG_ERR, 0, "LZP feature not available in this build (MPLv2). Aborting."); + log_msg(LOG_ERR, 0, "LZP feature not available in this build" + " (MPLv2). Aborting."); return (-1); #endif } @@ -422,7 +428,8 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 } } - if (!(type & (PREPROC_COMPRESSED|PREPROC_TYPE_DELTA2|PREPROC_TYPE_LZP|PREPROC_TYPE_DISPACK)) + if (!(type & (PREPROC_COMPRESSED|PREPROC_TYPE_DELTA2|PREPROC_TYPE_LZP| + PREPROC_TYPE_DISPACK|PREPROC_TYPE_DICT)) && type > 0) { log_msg(LOG_ERR, 0, "Invalid preprocessing flags: %d", type); return (-1); @@ -526,6 +533,7 @@ redo: /* * Decryption failure is fatal. */ + log_msg(LOG_ERR, 0, "Chunk %d, Decryption failed", tdat->id); pctx->main_cancel = 1; tdat->len_cmp = 0; Sem_Post(&tdat->cmp_done_sem); @@ -589,8 +597,9 @@ redo: ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE + dedupe_index_sz; if (HDR & COMPRESSED) { if (HDR & CHUNK_FLAG_PREPROC) { - rv = preproc_decompress(pctx, tdat->decompress, cmpbuf, dedupe_data_sz_cmp, - ubuf, &_chunksize, tdat->level, HDR, pctx->btype, tdat->data, tdat->props); + rv = preproc_decompress(pctx, tdat->decompress, cmpbuf, + dedupe_data_sz_cmp, ubuf, &_chunksize, tdat->level, + HDR, pctx->btype, tdat->data, tdat->props); } else { DEBUG_STAT_EN(double strt, en); @@ -753,10 +762,10 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) char algorithm[ALGO_SZ]; struct stat sbuf; struct wdata w; - int compfd = -1, p, dedupe_flag; + int compfd = -1, compfd2 = -1, p, dedupe_flag; int uncompfd = -1, err, np, bail; - int nprocs = 1, thread = 0, level; - unsigned int i; + int thread = 0, level; + uint32_t nprocs = 1, i; unsigned short version, flags; int64_t chunksize, compressed_chunksize; struct cmp_data **dary, *tdat; @@ -774,6 +783,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) */ if (!pctx->pipe_mode) { if (filename == NULL) { + pctx->pipe_mode = 1; compfd = fileno(stdin); if (compfd == -1) { log_msg(LOG_ERR, 1, "fileno "); @@ -849,7 +859,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) err = 1; goto uncomp_done; } - if (version < VERSION-3) { + if (version < VERSION-4) { log_msg(LOG_ERR, 0, "Unsupported version: %d", version); err = 1; goto uncomp_done; @@ -859,6 +869,17 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) * First check for archive mode. In that case the to_filename must be a directory. */ if (flags & FLAG_ARCHIVE) { + if (flags & FLAG_META_STREAM && version > 9) + pctx->meta_stream = 1; + + /* + * Archives with metadata streams cannot be decoded in pipe mode. + */ + if (pctx->pipe_mode && pctx->meta_stream) { + log_msg(LOG_ERR, 0, + "Cannot extract archive with metadata stream in pipe mode."); + } + /* * If to_filename is not set, we just use the current directory. */ @@ -873,13 +894,16 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) err = 1; goto uncomp_done; } - if (mkdir(to_filename, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH) == -1) { - log_msg(LOG_ERR, 1, "Unable to create target directory %s.", to_filename); + if (mkdir(to_filename, + S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH) == -1) { + log_msg(LOG_ERR, 1, "Unable to create target directory %s.", + to_filename); err = 1; goto uncomp_done; } if (stat(to_filename, &sbuf) == -1) { - log_msg(LOG_ERR, 1, "Unable to correctly create target directory %s.", to_filename); + log_msg(LOG_ERR, 1, "Unable to correctly create target directory %s.", + to_filename); err = 1; goto uncomp_done; } @@ -889,11 +913,24 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) err = 1; goto uncomp_done; } + + /* + * Open another fd to the compressed archive. This is used by the metadata + * thread. + */ + if (pctx->meta_stream) { + if ((compfd2 = open(filename, O_RDONLY, 0)) == -1) { + log_msg(LOG_ERR, 1, "Cannot open: %s", filename); + err = 1; + goto uncomp_done; + } + } } else { const char *origf; if (pctx->list_mode) { - log_msg(LOG_ERR, 0, "Nothing to list. The compressed file is not an archive."); + log_msg(LOG_ERR, 0, "Nothing to list. The compressed file " + "is not an archive."); err = 1; goto uncomp_done; } @@ -908,7 +945,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) pctx->to_filename = pctx->archive_members_file; pos = strrchr(filename, '.'); if (pos != NULL) { - if ((pos[0] == 'p' || pos[0] == 'P') && (pos[1] == 'z' || pos[1] == 'Z')) { + if ((pos[0] == 'p' || pos[0] == 'P') && + (pos[1] == 'z' || pos[1] == 'Z')) { memcpy(to_filename, filename, pos - filename); } else { pos = NULL; @@ -987,8 +1025,10 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (pctx->cksum == CKSUM_BLAKE256) pctx->cksum = CKSUM_SKEIN256; if (pctx->cksum == CKSUM_BLAKE512) pctx->cksum = CKSUM_SKEIN512; } - if (get_checksum_props(NULL, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 1) == -1) { - log_msg(LOG_ERR, 0, "Invalid checksum algorithm code: %d. File corrupt ?", pctx->cksum); + if (get_checksum_props(NULL, &(pctx->cksum), &(pctx->cksum_bytes), + &(pctx->mac_bytes), 1) == -1) { + log_msg(LOG_ERR, 0, "Invalid checksum algorithm code: %d. " + "File corrupt ?", pctx->cksum); UNCOMP_BAIL; } @@ -1111,7 +1151,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (pw_len != -1) { if (pw_len > MAX_PW_LEN) pw_len = MAX_PW_LEN-1; lseek(fd, 0, SEEK_SET); - len = Read(fd, pw, pw_len); + len = (int)Read(fd, pw, pw_len); if (len != -1 && len == pw_len) { pw[pw_len] = '\0'; if (isspace(pw[pw_len - 1])) @@ -1139,7 +1179,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (pctx->user_pw) { if (init_crypto(&(pctx->crypto_ctx), pctx->user_pw, pctx->user_pw_len, - pctx->encrypt_type, salt2, saltlen, pctx->keylen, nonce, DECRYPT_FLAG) == -1) { + pctx->encrypt_type, salt2, saltlen, pctx->keylen, nonce, + DECRYPT_FLAG) == -1) { memset(salt2, 0, saltlen); free(salt2); memset(salt1, 0, saltlen); @@ -1246,6 +1287,32 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) } add_fname(pctx->archive_temp_file); } + + /* + * If we are having a metadata stream, get the current position of the main + * fd. The secondary fd must be set to the same position so that metadata + * thread can start scanning for chunks after the header and any info chunks. + * + * NOTE: This is done here to allow setup_extractor() call later to work. + */ + if (pctx->meta_stream) { + off_t cpos = lseek(compfd, 0, SEEK_CUR); + cpos = lseek(compfd2, cpos, SEEK_SET); + if (cpos == -1) { + log_msg(LOG_ERR, 1, "Can't seek in metadata fd: "); + UNCOMP_BAIL; + } + + /* + * Finally create the metadata context. + */ + pctx->meta_ctx = meta_ctx_create(pctx, VERSION, compfd2); + if (pctx->meta_ctx == NULL) { + close(compfd2); + UNCOMP_BAIL; + } + } + uncompfd = -1; if (setup_extractor(pctx) == -1) { log_msg(LOG_ERR, 0, "Setup of extraction context failed."); @@ -1272,7 +1339,16 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) } } - nprocs = sysconf(_SC_NPROCESSORS_ONLN); + /* + * WARNING: NO Further file header/info chunk processing beyond this point. + * Doing so will BREAK Separate Metadata stream processing. + */ + + nprocs = (uint32_t)sysconf(_SC_NPROCESSORS_ONLN); + if (pctx->archive_mode) { + nprocs = nprocs > 1 ? nprocs-1:nprocs; + } + if (pctx->nthreads > 0 && pctx->nthreads < nprocs) nprocs = pctx->nthreads; else @@ -1333,9 +1409,9 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) * The last parameter is freeram. It is not needed during decompression. */ if (pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global) { - tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, pctx->rab_blk_size, - pctx->algo, &props, pctx->enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0, - NULL, pctx->pipe_mode, nprocs, 0); + tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, + pctx->rab_blk_size, pctx->algo, &props, pctx->enable_delta_encode, + dedupe_flag, version, DECOMPRESS, 0, NULL, pctx->pipe_mode, nprocs, 0); if (tdat->rctx == NULL) { UNCOMP_BAIL; } @@ -1348,7 +1424,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) UNCOMP_BAIL; } } else { - if ((tdat->rctx->out_fd = open(to_filename, O_RDONLY, 0)) == -1) { + if ((tdat->rctx->out_fd = open(to_filename, O_RDONLY, 0)) + == -1) { log_msg(LOG_ERR, 1, "Unable to get new read handle" " to output file"); UNCOMP_BAIL; @@ -1397,6 +1474,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) log_msg(LOG_ERR, 1, "Error in thread creation: "); UNCOMP_BAIL; } + thread = 2; /* * Now read from the compressed file in variable compressed chunk size. @@ -1419,6 +1497,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) tdat->id = pctx->chunk_num; if (tdat->rctx) tdat->rctx->id = tdat->id; +redo: /* * First read length of compressed chunk. */ @@ -1448,6 +1527,30 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (tdat->len_cmp == 0) { bail = 1; break; + + } else if (tdat->len_cmp == METADATA_INDICATOR) { + if (!pctx->meta_stream) { + log_msg(LOG_ERR, 0, "Invalid chunk %d length: %" PRIu64 "\n", + pctx->chunk_num, tdat->len_cmp); + UNCOMP_BAIL; + } + /* + * If compressed length indicates a metadata chunk. Read it's length + * and skip the chunk. + */ + rb = Read(compfd, &tdat->len_cmp_be, sizeof (tdat->len_cmp_be)); + if (rb != sizeof (tdat->len_cmp_be)) { + if (rb < 0) log_msg(LOG_ERR, 1, "Read: "); + else + log_msg(LOG_ERR, 0, "Incomplete chunk %d header," + "file corrupt", pctx->chunk_num); + UNCOMP_BAIL; + } + + /* + * We will be reading and skipping this chunk next. + */ + tdat->len_cmp_be = LE64(tdat->len_cmp_be); } /* @@ -1457,7 +1560,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) * decompression allows to avoid allocating per-thread chunks which will * never be used. This can happen if chunk count < thread count. */ - if (!tdat->compressed_chunk) { + if (!tdat->compressed_chunk && tdat->len_cmp != METADATA_INDICATOR) { tdat->compressed_chunk = (uchar_t *)slab_alloc(NULL, compressed_chunksize); if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) @@ -1473,19 +1576,30 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) tdat->cmp_seg = tdat->uncompressed_chunk; } - if (tdat->len_cmp > pctx->largest_chunk) - pctx->largest_chunk = tdat->len_cmp; - if (tdat->len_cmp < pctx->smallest_chunk) - pctx->smallest_chunk = tdat->len_cmp; - pctx->avg_chunk += tdat->len_cmp; + if (tdat->len_cmp != METADATA_INDICATOR) { + if (tdat->len_cmp > pctx->largest_chunk) + pctx->largest_chunk = tdat->len_cmp; + if (tdat->len_cmp < pctx->smallest_chunk) + pctx->smallest_chunk = tdat->len_cmp; + pctx->avg_chunk += tdat->len_cmp; - /* - * Now read compressed chunk including the checksum. - */ - tdat->rbytes = Read(compfd, tdat->compressed_chunk, - tdat->len_cmp + pctx->cksum_bytes + pctx->mac_bytes + CHUNK_FLAG_SZ); + /* + * Now read compressed chunk including the checksum. + */ + rb = tdat->len_cmp + pctx->cksum_bytes + pctx->mac_bytes + + CHUNK_FLAG_SZ; + tdat->rbytes = Read(compfd, tdat->compressed_chunk, rb); + } else { + int64_t cpos = lseek(compfd, 0, SEEK_CUR); + + /* Two values already read */ + rb = tdat->len_cmp_be + METADATA_HDR_SZ - 16; + tdat->rbytes = lseek(compfd, rb, SEEK_CUR); + if (tdat->rbytes > 0) + tdat->rbytes = tdat->rbytes - cpos; + } if (pctx->main_cancel) break; - if (tdat->rbytes < tdat->len_cmp + pctx->cksum_bytes + pctx->mac_bytes + CHUNK_FLAG_SZ) { + if (tdat->rbytes < rb) { if (tdat->rbytes < 0) { log_msg(LOG_ERR, 1, "Read: "); UNCOMP_BAIL; @@ -1495,6 +1609,13 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) UNCOMP_BAIL; } } + + /* + * If we just skipped a metadata chunk, redo the read to go to the next one. + */ + if (tdat->len_cmp == METADATA_INDICATOR) { + goto redo; + } Sem_Post(&tdat->start_sem); ++(pctx->chunk_num); } @@ -1519,7 +1640,8 @@ uncomp_done: Sem_Post(&tdat->cmp_done_sem); pthread_join(tdat->thr, NULL); } - pthread_join(writer_thr, NULL); + if (thread == 2) + pthread_join(writer_thr, NULL); } /* @@ -1556,6 +1678,8 @@ uncomp_done: if (uncompfd != -1) close(uncompfd); } if (pctx->archive_mode) { + if (pctx->meta_stream) + meta_ctx_done(pctx->meta_ctx); pthread_join(pctx->archive_thread, NULL); if (pctx->enable_rabin_global) { close(pctx->archive_temp_fd); @@ -1782,7 +1906,8 @@ plain_index: len_cmp = tdat->len_cmp; *((typeof (len_cmp) *)(tdat->cmp_seg)) = htonll(tdat->len_cmp); if (!pctx->encrypt_type) - serialize_checksum(tdat->checksum, tdat->cmp_seg + sizeof (tdat->len_cmp), pctx->cksum_bytes); + serialize_checksum(tdat->checksum, tdat->cmp_seg + sizeof (tdat->len_cmp), + pctx->cksum_bytes); tdat->len_cmp += CHUNK_FLAG_SZ; tdat->len_cmp += sizeof (len_cmp); tdat->len_cmp += (pctx->cksum_bytes + pctx->mac_bytes); @@ -1879,14 +2004,16 @@ repeat: if (pctx->archive_mode && tdat->decompressing) { wbytes = archiver_write(pctx, tdat->cmp_seg, tdat->len_cmp); } else { + pthread_mutex_lock(&pctx->write_mutex); wbytes = Write(w->wfd, tdat->cmp_seg, tdat->len_cmp); + pthread_mutex_unlock(&pctx->write_mutex); } if (pctx->archive_temp_fd != -1 && wbytes == tdat->len_cmp) { wbytes = Write(pctx->archive_temp_fd, tdat->cmp_seg, tdat->len_cmp); } if (unlikely(wbytes != tdat->len_cmp)) { - log_msg(LOG_ERR, 1, "Chunk Write (expected: %" PRIu64 ", written: %" PRIu64 ") : ", - tdat->len_cmp, wbytes); + log_msg(LOG_ERR, 1, "Chunk Write (expected: %" PRIu64 + ", written: %" PRId64 ") : ", tdat->len_cmp, wbytes); do_cancel: pctx->main_cancel = 1; tdat->cancel = 1; @@ -1922,7 +2049,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev unsigned short version, flags; struct stat sbuf; int compfd = -1, uncompfd = -1, err; - int thread, wthread, bail, single_chunk; + int thread, bail, single_chunk; uint32_t i, nprocs, np, p, dedupe_flag; struct cmp_data **dary = NULL, *tdat; pthread_t writer_thr; @@ -1940,7 +2067,6 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev sbuf.st_size = 0; err = 0; thread = 0; - wthread = 0; dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler compressed_chunksize = 0; @@ -1966,11 +2092,11 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev memset(zero, 0, MAX_PW_LEN); fd = open(pctx->pwd_file, O_RDWR); if (fd != -1) { - pw_len = lseek(fd, 0, SEEK_END); + pw_len = (int)lseek(fd, 0, SEEK_END); if (pw_len != -1) { if (pw_len > MAX_PW_LEN) pw_len = MAX_PW_LEN-1; lseek(fd, 0, SEEK_SET); - len = Read(fd, pw, pw_len); + len = (int)Read(fd, pw, pw_len); if (len != -1 && len == pw_len) { pw[pw_len] = '\0'; if (isspace(pw[pw_len - 1])) @@ -1989,8 +2115,8 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev close(fd); } if (pctx->user_pw) { - if (init_crypto(&(pctx->crypto_ctx), pctx->user_pw, pctx->user_pw_len, pctx->encrypt_type, - NULL, 0, pctx->keylen, 0, ENCRYPT_FLAG) == -1) { + if (init_crypto(&(pctx->crypto_ctx), pctx->user_pw, pctx->user_pw_len, + pctx->encrypt_type, NULL, 0, pctx->keylen, 0, ENCRYPT_FLAG) == -1) { memset(pctx->user_pw, 0, pctx->user_pw_len); log_msg(LOG_ERR, 0, "Failed to initialize crypto."); return (1); @@ -2016,7 +2142,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev * Get number of lCPUs. When archiving with advanced filters, we use one less * lCPU to reduce threads due to increased memory requirements. */ - nprocs = sysconf(_SC_NPROCESSORS_ONLN); + nprocs = (uint32_t)sysconf(_SC_NPROCESSORS_ONLN); if (pctx->archive_mode && (pctx->enable_packjpg || pctx->enable_wavpack)) { nprocs = nprocs > 1 ? nprocs-1:nprocs; } @@ -2093,7 +2219,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev } } else { if (pctx->nthreads == 0 || pctx->nthreads > sbuf.st_size / chunksize) { - pctx->nthreads = sbuf.st_size / chunksize; + pctx->nthreads = (int)(sbuf.st_size / chunksize); if (sbuf.st_size % chunksize) pctx->nthreads++; } @@ -2183,15 +2309,16 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev my_sysinfo msys_info; get_sys_limits(&msys_info); - global_dedupe_bufadjust(pctx->rab_blk_size, &chunksize, 0, pctx->algo, pctx->cksum, - CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, pctx->nthreads, pctx->pipe_mode); + global_dedupe_bufadjust(pctx->rab_blk_size, &chunksize, 0, pctx->algo, + pctx->cksum, CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, + pctx->nthreads, pctx->pipe_mode); } /* * Compressed buffer size must include zlib/dedup scratch space and * chunk header space. * See http://www.zlib.net/manual.html#compress2 - * + * * We do this unconditionally whether user mentioned zlib or not * to keep it simple. While zlib scratch space is only needed at * runtime, chunk header is stored in the file. @@ -2321,6 +2448,18 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev COMP_BAIL; } } + + /* + * Now create the metadata handler context. This is relevant in archive mode where + * the underlying libarchive metadata is compressed into a separate stream of + * metadata chunks. + */ + if (pctx->meta_stream) { + pctx->meta_ctx = meta_ctx_create(pctx, VERSION, compfd); + if (pctx->meta_ctx == NULL) { + COMP_BAIL; + } + } thread = 1; /* @@ -2373,7 +2512,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev log_msg(LOG_ERR, 1, "Error in thread creation: "); COMP_BAIL; } - wthread = 1; + thread = 2; /* * Start the archiver thread if needed. @@ -2420,7 +2559,8 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev pos += 8; } else if (pctx->encrypt_type == CRYPTO_ALG_SALSA20) { - serialize_checksum(crypto_nonce(&(pctx->crypto_ctx)), pos, XSALSA20_CRYPTO_NONCEBYTES); + serialize_checksum(crypto_nonce(&(pctx->crypto_ctx)), pos, + XSALSA20_CRYPTO_NONCEBYTES); pos += XSALSA20_CRYPTO_NONCEBYTES; } *((int *)pos) = htonl(pctx->keylen); @@ -2613,8 +2753,10 @@ comp_done: */ if (!pctx->pipe_mode) { if (uncompfd != -1) close(uncompfd); - if (pctx->archive_mode) - archiver_close(pctx); + } + if (pctx->meta_stream) { + meta_ctx_done(pctx->meta_ctx); + archiver_close(pctx); } if (pctx->t_errored) err = pctx->t_errored; @@ -2629,7 +2771,7 @@ comp_done: if (pctx->encrypt_type) hmac_cleanup(&tdat->chunk_hmac); } - if (wthread) + if (thread == 2) pthread_join(writer_thr, NULL); } @@ -2867,6 +3009,7 @@ create_pc_context(void) ctx->pagesize = sysconf(_SC_PAGE_SIZE); ctx->btype = TYPE_UNKNOWN; ctx->delta2_nstrides = NSTRIDES_STANDARD; + pthread_mutex_init(&ctx->write_mutex, NULL); return (ctx); } @@ -2923,7 +3066,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) ff.enable_wavpack = 0; pthread_mutex_lock(&opt_parse); - while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:LPS:B:Fk:avmKjxi")) != -1) { + while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:LPS:B:Fk:avmKjxiT")) != -1) { int ovr; int64_t chunksize; @@ -3099,6 +3242,9 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) pctx->dispack_preprocess = 1; break; + case 'T': + pctx->meta_stream = -1; + case '?': default: return (2); @@ -3214,7 +3360,8 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) */ if ((pctx->dispack_preprocess || ff.enable_packjpg || ff.enable_wavpack) && !pctx->archive_mode) { - log_msg(LOG_ERR, 0, "Dispack Executable Preprocessor and PackJPG are only valid when archiving."); + log_msg(LOG_ERR, 0, "Dispack Executable Preprocessor and PackJPG are " + "only valid when archiving."); return (1); } @@ -3337,7 +3484,8 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) pctx->main_cancel = 0; if (pctx->cksum == 0) - get_checksum_props(DEFAULT_CKSUM, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 0); + get_checksum_props(DEFAULT_CKSUM, &(pctx->cksum), &(pctx->cksum_bytes), + &(pctx->mac_bytes), 0); if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan) && pctx->cksum == CKSUM_CRC64) { log_msg(LOG_ERR, 0, "CRC64 checksum is not suitable for Deduplication."); @@ -3382,6 +3530,12 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) pctx->enable_packjpg = ff.enable_packjpg; pctx->enable_wavpack = ff.enable_wavpack; if (pctx->level > 8) pctx->dispack_preprocess = 1; + if (pctx->meta_stream != -1) + pctx->meta_stream = 1; + else + pctx->meta_stream = 0; + if (pctx->pipe_mode) + pctx->meta_stream = 0; } /* @@ -3410,7 +3564,8 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) if (pctx->level < 9) { pctx->chunksize = DEFAULT_CHUNKSIZE; } else { - pctx->chunksize = DEFAULT_CHUNKSIZE + (pctx->level - 8) * DEFAULT_CHUNKSIZE/4; + pctx->chunksize = DEFAULT_CHUNKSIZE + (pctx->level - 8) * + DEFAULT_CHUNKSIZE/4; } } } else if (pctx->do_uncompress) { diff --git a/pcompress.h b/pcompress.h index 5546463..c4683e1 100644 --- a/pcompress.h +++ b/pcompress.h @@ -36,14 +36,16 @@ extern "C" { #include #include +#include "meta_stream.h" #define CHUNK_FLAG_SZ 1 #define ALGO_SZ 8 #define MIN_CHUNK 2048 -#define VERSION 9 +#define VERSION 10 #define FLAG_DEDUP 1 #define FLAG_DEDUP_FIXED 2 #define FLAG_SINGLE_CHUNK 4 +#define FLAG_META_STREAM 256 #define FLAG_ARCHIVE 2048 #define UTILITY_VERSION "3.1" #define MASK_CRYPTO_ALG 0x30 @@ -220,10 +222,11 @@ typedef struct pc_ctx { int encrypt_type; int archive_mode; int enable_archive_sort; - int pagesize; + long pagesize; int force_archive_perms; int no_overwrite_newer; int advanced_opts; + int meta_stream; /* * Archiving related context data. @@ -241,6 +244,7 @@ typedef struct pc_ctx { uint64_t temp_mmap_len; struct fn_list *fn; Sem_t read_sem, write_sem; + pthread_mutex_t write_mutex; uchar_t *arc_buf; uint64_t arc_buf_size, arc_buf_pos; int arc_closed, arc_writing; @@ -268,6 +272,7 @@ typedef struct pc_ctx { unsigned char *user_pw; int user_pw_len; char *pwd_file, *f_name; + meta_ctx_t *meta_ctx; } pc_ctx_t; /*