diff --git a/Makefile b/Makefile index 8a9a0ec..e1aa9e1 100644 --- a/Makefile +++ b/Makefile @@ -65,6 +65,9 @@ LINK = g++ -m64 -pthread -msse3 COMPILE = gcc -m64 -O3 -msse3 -c COMPILE_cpp = g++ -m64 -O3 -msse3 -c CPPFLAGS += -DNDEBUG +ifdef DEBUG_NO_SLAB +CPPFLAGS += -DDEBUG_NO_SLAB +endif endif all: $(PROG) diff --git a/adaptive_compress.c b/adaptive_compress.c index 44c2535..49b2317 100644 --- a/adaptive_compress.c +++ b/adaptive_compress.c @@ -40,18 +40,18 @@ static unsigned int bzip2_count = 0; static unsigned int ppmd_count = 0; extern int lzma_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int bzip2_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int ppmd_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int lzma_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int bzip2_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int ppmd_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int lzma_init(void **data, int *level, ssize_t chunksize); extern int lzma_deinit(void **data); @@ -137,7 +137,7 @@ adapt_deinit(void **data) int adapt_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { struct adapt_data *adat = (struct adapt_data *)(data); int rv, rv1, rv2; @@ -156,18 +156,18 @@ adapt_compress(void *src, size_t srclen, void *dst, inc = &ppmd_count; dst2len = *dstlen; dst3len = *dstlen; - rv1 = ppmd_compress(src, srclen, dst, dstlen, level, adat->ppmd_data); + rv1 = ppmd_compress(src, srclen, dst, dstlen, level, chdr, adat->ppmd_data); if (rv1 < 0) *dstlen = dst3len; if (adat->adapt_mode == 2) { - rv2 = lzma_compress(src, srclen, dst2, &dst2len, level, adat->lzma_data); + rv2 = lzma_compress(src, srclen, dst2, &dst2len, level, chdr, adat->lzma_data); if (rv2 < 0) dst2len = dst3len; if (dst2len < *dstlen) { inc = &lzma_count; rv = COMPRESS_LZMA; } } else { - rv2 = bzip2_compress(src, srclen, dst2, &dst2len, level, NULL); + rv2 = bzip2_compress(src, srclen, dst2, &dst2len, level, chdr, NULL); if (rv2 < 0) dst2len = dst3len; if (dst2len < *dstlen) { inc = &bzip2_count; @@ -194,21 +194,21 @@ adapt_compress(void *src, size_t srclen, void *dst, int adapt_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { struct adapt_data *adat = (struct adapt_data *)(data); uchar_t HDR; - HDR = *((uchar_t *)src); + HDR = chdr; if (HDR & (COMPRESS_LZMA << 4)) { - return (lzma_decompress(src, srclen, dst, dstlen, level, adat->lzma_data)); + return (lzma_decompress(src, srclen, dst, dstlen, level, chdr, adat->lzma_data)); } else if (HDR & (COMPRESS_BZIP2 << 4)) { - return (bzip2_decompress(src, srclen, dst, dstlen, level, NULL)); + return (bzip2_decompress(src, srclen, dst, dstlen, level, chdr, NULL)); } else if (HDR & (COMPRESS_PPMD << 4)) { - return (ppmd_decompress(src, srclen, dst, dstlen, level, adat->ppmd_data)); + return (ppmd_decompress(src, srclen, dst, dstlen, level, chdr, adat->ppmd_data)); } else { fprintf(stderr, "Unrecognized compression mode, file corrupt.\n"); diff --git a/bzip2_compress.c b/bzip2_compress.c index e0d1faa..3ad2ee0 100644 --- a/bzip2_compress.c +++ b/bzip2_compress.c @@ -81,7 +81,8 @@ bzerr(int err) } int -bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data) +bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen, + int level, uchar_t chdr, void *data) { bz_stream bzs; int ret; @@ -120,7 +121,8 @@ bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, v } int -bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data) +bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, + int level, uchar_t chdr, void *data) { bz_stream bzs; int ret; @@ -135,7 +137,7 @@ bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, return (-1); } - bzs.next_in = (uchar_t *)src + CHDR_SZ; + bzs.next_in = (uchar_t *)src; bzs.avail_in = srclen; bzs.next_out = dst; bzs.avail_out = *dstlen; diff --git a/lzma_compress.c b/lzma_compress.c index 343edd4..505dc0b 100644 --- a/lzma_compress.c +++ b/lzma_compress.c @@ -79,11 +79,11 @@ lzma_init(void **data, int *level, ssize_t chunksize) p->fb = 64; else p->fb = 128; - if (*level > 9) *level = 9; p->level = *level; LzmaEncProps_Normalize(p); slab_cache_add(p->litprob_sz); } + if (*level > 9) *level = 9; *data = p; return (0); } @@ -115,6 +115,9 @@ lzerr(int err) case SZ_ERROR_PROGRESS: fprintf(stderr, "LZMA: Progress callback errored\n"); break; + case SZ_ERROR_INPUT_EOF: + fprintf(stderr, "LZMA: More compressed input bytes expected\n"); + break; case SZ_ERROR_OUTPUT_EOF: fprintf(stderr, "LZMA: Output buffer overflow\n"); break; @@ -146,7 +149,7 @@ lzerr(int err) */ int lzma_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { size_t props_len = LZMA_PROPS_SIZE; SRes res; @@ -175,7 +178,7 @@ lzma_compress(void *src, size_t srclen, void *dst, int lzma_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { size_t _srclen; const uchar_t *_src; @@ -183,10 +186,10 @@ lzma_decompress(void *src, size_t srclen, void *dst, ELzmaStatus status; _srclen = srclen - LZMA_PROPS_SIZE; - _src = (uchar_t *)src + LZMA_PROPS_SIZE + CHDR_SZ; + _src = (uchar_t *)src + LZMA_PROPS_SIZE; if ((res = LzmaDecode((uchar_t *)dst, dstlen, _src, &_srclen, - src + CHDR_SZ, LZMA_PROPS_SIZE, LZMA_FINISH_ANY, + src, LZMA_PROPS_SIZE, LZMA_FINISH_ANY, &status, &g_Alloc)) != SZ_OK) { lzerr(res); return (-1); diff --git a/main.c b/main.c index 00783e6..da144bb 100644 --- a/main.c +++ b/main.c @@ -148,7 +148,9 @@ perform_decompress(void *dat) { struct cmp_data *tdat = (struct cmp_data *)dat; ssize_t _chunksize; + ssize_t rabin_index_sz, rabin_data_sz, rabin_index_sz_cmp, rabin_data_sz_cmp; int type, rv; + unsigned int blknum; typeof (tdat->crc64) crc64; uchar_t HDR; uchar_t *cseg; @@ -174,6 +176,7 @@ redo: _chunksize = tdat->chunksize; tdat->crc64 = htonll(*((typeof (crc64) *)(tdat->compressed_chunk))); HDR = *cseg; + cseg += CHDR_SZ; if (HDR & CHSIZE_MASK) { uchar_t *rseg; @@ -184,13 +187,48 @@ redo: } if (HDR & COMPRESSED) { - rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk, - &_chunksize, tdat->level, tdat->data); + if (enable_rabin_scan && (HDR & FLAG_DEDUP)) { + uchar_t *cmpbuf, *ubuf; + + /* Extract various sizes from rabin header. */ + rabin_parse_hdr(cseg, &blknum, &rabin_index_sz, &rabin_data_sz, + &rabin_index_sz_cmp, &rabin_data_sz_cmp, &_chunksize); + memcpy(tdat->uncompressed_chunk, cseg, RABIN_HDR_SIZE); + + /* + * Uncompress the data chunk first and then uncompress the index. + * The uncompress routines can use extra bytes at the end for temporary + * state/dictionary info. Since data chunk directly follows index + * uncompressing index first corrupts the data. + */ + cmpbuf = cseg + RABIN_HDR_SIZE + rabin_index_sz_cmp; + ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE + rabin_index_sz; + rv = tdat->decompress(cmpbuf, rabin_data_sz_cmp, ubuf, &_chunksize, + tdat->level, HDR, tdat->data); + if (rv == -1) { + tdat->len_cmp = 0; + fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id); + goto cont; + } + + cmpbuf = cseg + RABIN_HDR_SIZE; + ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE; + rv = lzma_decompress(cmpbuf, rabin_index_sz_cmp, ubuf, &rabin_index_sz, + tdat->rctx->level, 0, tdat->rctx->lzma_data); + } else { + rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk, + &_chunksize, tdat->level, HDR, tdat->data); + } } else { memcpy(cseg + CHDR_SZ, tdat->uncompressed_chunk, _chunksize); } tdat->len_cmp = _chunksize; + if (rv == -1) { + tdat->len_cmp = 0; + fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id); + goto cont; + } /* Rebuild chunk from dedup blocks. */ if (enable_rabin_scan && (HDR & FLAG_DEDUP)) { rabin_context_t *rctx; @@ -213,11 +251,6 @@ redo: tdat->cmp_seg = tdat->uncompressed_chunk; } - if (rv == -1) { - tdat->len_cmp = 0; - fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id); - goto cont; - } /* * Re-compute checksum of original uncompressed chunk. * If it does not match we set length of chunk to 0 to indicate @@ -356,9 +389,8 @@ start_decompress(const char *filename, const char *to_filename) enable_rabin_scan = 1; } - if (nthreads == 0) - nprocs = sysconf(_SC_NPROCESSORS_ONLN); - else + nprocs = sysconf(_SC_NPROCESSORS_ONLN); + if (nthreads > 0 && nthreads < nprocs) nprocs = nthreads; fprintf(stderr, "Scaling to %d threads\n", nprocs); @@ -542,8 +574,9 @@ uncomp_done: static void * perform_compress(void *dat) { struct cmp_data *tdat = (struct cmp_data *)dat; - typeof (tdat->chunksize) _chunksize, len_cmp; + typeof (tdat->chunksize) _chunksize, len_cmp, rabin_index_sz, index_size_cmp; int type, rv; + uchar_t *compressed_chunk; redo: sem_wait(&tdat->start_sem); @@ -553,6 +586,7 @@ redo: return (0); } + compressed_chunk = tdat->compressed_chunk + CHDR_SZ; /* Perform Dedup if enabled. */ if (enable_rabin_scan) { rabin_context_t *rctx; @@ -567,7 +601,7 @@ redo: rbytes = tdat->rbytes; reset_rabin_context(tdat->rctx); rctx->cbuf = tdat->uncompressed_chunk; - rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0); + rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0); if (!rctx->valid) { memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes); tdat->rbytes = rbytes; @@ -578,11 +612,37 @@ redo: */ tdat->crc64 = lzma_crc64(tdat->uncompressed_chunk, tdat->rbytes, 0); } - _chunksize = tdat->rbytes; - rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes, - tdat->compressed_chunk + CHDR_SZ, &_chunksize, tdat->level, - tdat->data); + /* + * If doing dedup we compress rabin index and deduped data separately. + * The rabin index array values can pollute the compressor's dictionary thereby + * reducing compression effectiveness of the data chunk. So we separate them. + */ + if (enable_rabin_scan && tdat->rctx->valid) { + _chunksize = tdat->rbytes - rabin_index_sz - RABIN_HDR_SIZE; + index_size_cmp = rabin_index_sz; + memcpy(compressed_chunk, tdat->uncompressed_chunk, RABIN_HDR_SIZE); + + /* Compress index. */ + rv = lzma_compress(tdat->uncompressed_chunk + RABIN_HDR_SIZE, + rabin_index_sz, compressed_chunk + RABIN_HDR_SIZE, &index_size_cmp, + tdat->rctx->level, 0, tdat->rctx->lzma_data); + + index_size_cmp += RABIN_HDR_SIZE; + if (rv == 0) { + /* Compress data chunk. */ + rv = tdat->compress(tdat->uncompressed_chunk + rabin_index_sz + RABIN_HDR_SIZE, + _chunksize, compressed_chunk + index_size_cmp, &_chunksize, + tdat->level, 0, tdat->data); + /* Now update rabin header with the compressed sizes. */ + rabin_update_hdr(compressed_chunk, index_size_cmp - RABIN_HDR_SIZE , _chunksize); + } + _chunksize += index_size_cmp; + } else { + _chunksize = tdat->rbytes; + rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes, + compressed_chunk, &_chunksize, tdat->level, 0, tdat->data); + } /* * Sanity check to ensure compressed data is lesser than original. * If at all compression expands/does not shrink data then the chunk @@ -591,8 +651,7 @@ redo: */ tdat->len_cmp = _chunksize; if (_chunksize >= tdat->chunksize || rv < 0) { - memcpy(tdat->compressed_chunk + CHDR_SZ, tdat->uncompressed_chunk, - tdat->rbytes); + memcpy(compressed_chunk, tdat->uncompressed_chunk, tdat->rbytes); type = UNCOMPRESSED; tdat->len_cmp = tdat->rbytes; } else { @@ -608,7 +667,7 @@ redo: */ len_cmp = tdat->len_cmp; *((typeof (len_cmp) *)(tdat->cmp_seg)) = htonll(tdat->len_cmp); - *((typeof (tdat->crc64) *)(tdat->cmp_seg + sizeof (tdat->crc64))) = htonll(tdat->crc64); + *((typeof (tdat->crc64) *)(tdat->cmp_seg + sizeof (tdat->len_cmp))) = htonll(tdat->crc64); tdat->len_cmp += CHDR_SZ; tdat->len_cmp += sizeof (len_cmp); tdat->len_cmp += sizeof (tdat->crc64); @@ -752,6 +811,11 @@ start_compress(const char *filename, uint64_t chunksize, int level) */ if (sbuf.st_size < chunksize) { chunksize = sbuf.st_size; + nthreads = 1; + } else { + nthreads = sbuf.st_size / chunksize; + if (sbuf.st_size % chunksize) + nthreads++; } /* @@ -783,12 +847,13 @@ start_compress(const char *filename, uint64_t chunksize, int level) } } - if (nthreads == 0) - nprocs = sysconf(_SC_NPROCESSORS_ONLN); - else + nprocs = sysconf(_SC_NPROCESSORS_ONLN); + if (nthreads > 0 && nthreads < nprocs) nprocs = nthreads; - fprintf(stderr, "Scaling to %d threads\n", nprocs); + fprintf(stderr, "Scaling to %d thread", nprocs); + if (nprocs > 1) fprintf(stderr, "s"); + fprintf(stderr, "\n"); dary = (struct cmp_data **)slab_alloc(NULL, sizeof (struct cmp_data *) * nprocs); if (enable_rabin_scan) diff --git a/pcompress.h b/pcompress.h index ebf5c22..ad718d6 100644 --- a/pcompress.h +++ b/pcompress.h @@ -51,40 +51,31 @@ extern "C" { #define CHUNK_FLAG_DEDUP 2 #define COMP_EXTN ".pz" -/* Pointer type for compress and decompress functions. */ -typedef int (*compress_func_ptr)(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); - -/* Pointer type for algo specific init/deinit/stats functions. */ -typedef int (*init_func_ptr)(void **data, int *level, ssize_t chunksize); -typedef int (*deinit_func_ptr)(void **data); -typedef void (*stats_func_ptr)(int show); - extern uint64_t lzma_crc64(const uint8_t *buf, size_t size, uint64_t crc); extern uint64_t lzma_crc64_8bchk(const uint8_t *buf, size_t size, uint64_t crc, uint64_t *cnt); extern int zlib_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int lzma_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int bzip2_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int adapt_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int ppmd_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int zlib_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int lzma_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int bzip2_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int adapt_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int ppmd_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int adapt_init(void **data, int *level, ssize_t chunksize); extern int adapt2_init(void **data, int *level, ssize_t chunksize); diff --git a/ppmd_compress.c b/ppmd_compress.c index ade3151..537f039 100644 --- a/ppmd_compress.c +++ b/ppmd_compress.c @@ -100,7 +100,7 @@ ppmd_deinit(void **data) int ppmd_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { CPpmd8 *_ppmd = (CPpmd8 *)data; uchar_t *_src = (uchar_t *)src; @@ -122,10 +122,10 @@ ppmd_compress(void *src, size_t srclen, void *dst, int ppmd_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { CPpmd8 *_ppmd = (CPpmd8 *)data; - Byte *_src = (Byte *)src + CHDR_SZ; + Byte *_src = (Byte *)src; Byte *_dst = (Byte *)dst; size_t i; int res; diff --git a/rabin/rabin_polynomial.c b/rabin/rabin_polynomial.c index 603494f..22a0b65 100755 --- a/rabin/rabin_polynomial.c +++ b/rabin/rabin_polynomial.c @@ -76,6 +76,13 @@ extern const uint64_t lzma_crc64_table[4][256]; #include "rabin_polynomial.h" +extern int lzma_init(void **data, int *level, ssize_t chunksize); +extern int lzma_compress(void *src, size_t srclen, void *dst, + size_t *destlen, int level, uchar_t chdr, void *data); +extern int lzma_decompress(void *src, size_t srclen, void *dst, + size_t *dstlen, int level, uchar_t chdr, void *data); +extern int lzma_deinit(void **data); + unsigned int rabin_polynomial_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE; unsigned int rabin_polynomial_min_block_size = RAB_POLYNOMIAL_MIN_BLOCK_SIZE; unsigned int rabin_avg_block_mask = RAB_POLYNOMIAL_AVG_BLOCK_MASK; @@ -87,7 +94,8 @@ rabin_context_t * create_rabin_context(uint64_t chunksize) { rabin_context_t *ctx; unsigned char *current_window_data; - unsigned int blknum; + unsigned int blknum, index; + int level = 14; blknum = chunksize / rabin_polynomial_min_block_size; if (chunksize % rabin_polynomial_min_block_size) @@ -100,9 +108,17 @@ create_rabin_context(uint64_t chunksize) { if(ctx == NULL || current_window_data == NULL || ctx->blocks == NULL) { fprintf(stderr, "Could not allocate rabin polynomial context, out of memory\n"); + destroy_rabin_context(ctx); return (NULL); } + lzma_init(&(ctx->lzma_data), &(ctx->level), chunksize); + if (!(ctx->lzma_data)) { + fprintf(stderr, + "Could not allocate rabin polynomial context, out of memory\n"); + destroy_rabin_context(ctx); + return (NULL); + } /* * We should compute the power for the window size. * static uint64_t polynomial_pow; @@ -131,9 +147,12 @@ reset_rabin_context(rabin_context_t *ctx) void destroy_rabin_context(rabin_context_t *ctx) { - slab_free(NULL, ctx->current_window_data); - slab_free(NULL, ctx->blocks); - slab_free(NULL, ctx); + if (ctx) { + if (ctx->current_window_data) slab_free(NULL, ctx->current_window_data); + if (ctx->blocks) slab_free(NULL, ctx->blocks); + if (ctx->lzma_data) lzma_deinit(&(ctx->lzma_data)); + slab_free(NULL, ctx); + } } /* @@ -157,14 +176,14 @@ cmpblks(const void *a, const void *b) * Perform Deduplication based on Rabin Fingerprinting. A 32-byte window is used for * the rolling checksum and dedup blocks vary in size from 4K-128K. */ -void +unsigned int rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) { - ssize_t i, last_offset; + ssize_t i, last_offset,j; unsigned int blknum; char *buf1 = (char *)buf; unsigned int length; - ssize_t overhead; + ssize_t rabin_index_sz; length = offset; last_offset = 0; @@ -186,6 +205,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) */ ctx->cur_roll_checksum = (ctx->cur_roll_checksum << 1) + cur_byte; ctx->cur_roll_checksum -= (pushed_out << RAB_POLYNOMIAL_WIN_SIZE); + // CRC64 Calculation swiped from LZMA ctx->cur_checksum = lzma_crc64_table[0][cur_byte ^ A1(ctx->cur_checksum)] ^ S8(ctx->cur_checksum); @@ -205,7 +225,6 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) ctx->blocks[blknum].checksum = ctx->cur_checksum; ctx->blocks[blknum].length = length; ctx->blocks[blknum].refcount = 0; - blknum++; ctx->cur_checksum = 0; last_offset = i+1; @@ -220,7 +239,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) ssize_t pos, matches; int valid = 1; char *tmp, *prev_offset; - unsigned int *blkarr, prev_index, prev_blk; + unsigned int *rabin_index, prev_index, prev_blk; // Insert the last left-over trailing bytes, if any, into a block. if (last_offset < *size) { @@ -228,16 +247,17 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) ctx->blocks[blknum].index = blknum; ctx->blocks[blknum].checksum = ctx->cur_checksum; ctx->blocks[blknum].length = *size - last_offset; + ctx->blocks[blknum].refcount = 0; blknum++; ctx->cur_checksum = 0; last_offset = *size; } - overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE; + rabin_index_sz = (ssize_t)blknum * RABIN_ENTRY_SIZE; prev_cksum = 0; prev_length = 0; prev_offset = 0; - pos = overhead; + pos = rabin_index_sz + RABIN_HDR_SIZE; /* * Now sort the block array based on checksums. This will bring virtually @@ -246,7 +266,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) * TODO: Test with a heavily optimized MD5 (from OpenSSL?) later. */ qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t), cmpblks); - blkarr = (unsigned int *)(ctx->cbuf + RABIN_HDR_SIZE); + rabin_index = (unsigned int *)(ctx->cbuf + RABIN_HDR_SIZE); matches = 0; /* @@ -260,7 +280,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) * blocks. This helps in non-duplicate block merging later. */ for (blk = 0; blk < blknum; blk++) { - blkarr[ctx->blocks[blk].index] = blk; + rabin_index[ctx->blocks[blk].index] = blk; if (blk > 0 && ctx->blocks[blk].checksum == prev_cksum && ctx->blocks[blk].length == prev_length && @@ -279,7 +299,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) prev_blk = blk; } - if (matches < overhead) { + if (matches < rabin_index_sz) { ctx->valid = 0; return; } @@ -303,7 +323,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) valid = 0; break; } - be = &(ctx->blocks[blkarr[blk]]); + be = &(ctx->blocks[rabin_index[blk]]); if (be->length > 0) { prev_offset = buf1 + be->offset; memcpy(ctx->cbuf + pos, prev_offset, be->length); @@ -318,61 +338,101 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) prev_index = blk; prev_length = be->length; } - blkarr[blk] = htonl(be->length); + rabin_index[blk] = htonl(be->length); } else { if (be->refcount > 0) { prev_index = 0; prev_length = 0; - blkarr[blk] = htonl(be->length); + rabin_index[blk] = htonl(be->length); } else { if (prev_length + be->length <= RAB_POLYNOMIAL_MAX_BLOCK_SIZE) { prev_length += be->length; - blkarr[prev_index] = htonl(prev_length); - blkarr[blk] = 0; + rabin_index[prev_index] = htonl(prev_length); + rabin_index[blk] = 0; } else { prev_index = 0; prev_length = 0; - blkarr[blk] = htonl(be->length); + rabin_index[blk] = htonl(be->length); } } } } else { prev_index = 0; prev_length = 0; - blkarr[blk] = htonl(RAB_POLYNOMIAL_MAX_BLOCK_SIZE + be->index + 1); + rabin_index[blk] = htonl(RAB_POLYNOMIAL_MAX_BLOCK_SIZE + be->index + 1); } } cont: if (valid) { - *((unsigned int *)(ctx->cbuf)) = htonl(blknum); - *((ssize_t *)(ctx->cbuf + sizeof (unsigned int))) = htonll(*size); + uchar_t *cbuf = ctx->cbuf; + ssize_t *entries; + + *((unsigned int *)cbuf) = htonl(blknum); + cbuf += sizeof (unsigned int); + entries = (ssize_t *)cbuf; + entries[0] = htonll(*size); + entries[1] = 0; + entries[2] = htonll(pos - rabin_index_sz - RABIN_HDR_SIZE); *size = pos; ctx->valid = 1; + /* + * Remaining header entries: size of compressed index and size of + * compressed data are inserted later via rabin_update_hdr, after actual compression! + */ + return (rabin_index_sz); } } + return (0); +} + +void +rabin_update_hdr(uchar_t *buf, ssize_t rabin_index_sz_cmp, ssize_t rabin_data_sz_cmp) +{ + ssize_t *entries; + + buf += sizeof (unsigned int); + entries = (ssize_t *)buf; + entries[1] = htonll(rabin_index_sz_cmp); + entries[3] = htonll(rabin_data_sz_cmp); +} + +void +rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz, + ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp, + ssize_t *rabin_data_sz_cmp, ssize_t *rabin_deduped_size) +{ + ssize_t *entries; + + *blknum = ntohl(*((unsigned int *)(buf))); + buf += sizeof (unsigned int); + + entries = (ssize_t *)buf; + *rabin_data_sz = ntohll(entries[0]); + *rabin_index_sz = (ssize_t)(*blknum) * RABIN_ENTRY_SIZE; + *rabin_index_sz_cmp = ntohll(entries[1]); + *rabin_deduped_size = ntohll(entries[2]); + *rabin_data_sz_cmp = ntohll(entries[3]); } void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size) { unsigned int blknum, blk, oblk, len; - unsigned int *blkarr; - ssize_t orig_size, sz; - ssize_t overhead, pos1, i; + unsigned int *rabin_index; + ssize_t data_sz, sz, indx_cmp, data_sz_cmp, deduped_sz; + ssize_t rabin_index_sz, pos1, i; uchar_t *pos2; - blknum = ntohl(*((unsigned int *)(buf))); - orig_size = ntohll(*((ssize_t *)(buf + sizeof (unsigned int)))); - blkarr = (unsigned int *)(buf + RABIN_HDR_SIZE); - overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE; - pos1 = overhead; + rabin_parse_hdr(buf, &blknum, &rabin_index_sz, &data_sz, &indx_cmp, &data_sz_cmp, &deduped_sz); + rabin_index = (unsigned int *)(buf + RABIN_HDR_SIZE); + pos1 = rabin_index_sz + RABIN_HDR_SIZE; pos2 = ctx->cbuf; sz = 0; ctx->valid = 1; for (blk = 0; blk < blknum; blk++) { - len = ntohl(blkarr[blk]); + len = ntohl(rabin_index[blk]); if (len == 0) { ctx->blocks[blk].length = 0; ctx->blocks[blk].index = 0; @@ -399,14 +459,23 @@ rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size) memcpy(pos2, buf + pos1, len); pos2 += len; sz += len; - if (sz > orig_size) { + if (sz > data_sz) { ctx->valid = 0; break; } } - if (ctx->valid && sz < orig_size) { + if (ctx->valid && sz < data_sz) { ctx->valid = 0; } - *size = orig_size; + *size = data_sz; } +/* + * TODO: Consolidate rabin dedup and compression/decompression in functions here rather than + * messy code in main program. +int +rabin_compress(rabin_context_t *ctx, uchar_t *from, ssize_t fromlen, uchar_t *to, ssize_t *tolen, + int level, char chdr, void *data, compress_func_ptr cmp) +{ +} +*/ diff --git a/rabin/rabin_polynomial.h b/rabin/rabin_polynomial.h index 46f0e22..71ac0d3 100644 --- a/rabin/rabin_polynomial.h +++ b/rabin/rabin_polynomial.h @@ -71,7 +71,7 @@ #define RAB_POLYNOMIAL_AVG_BLOCK_SHIFT 12 #define RAB_POLYNOMIAL_AVG_BLOCK_SIZE (1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT) #define RAB_POLYNOMIAL_AVG_BLOCK_MASK (RAB_POLYNOMIAL_AVG_BLOCK_SIZE - 1) -#define RAB_POLYNOMIAL_MIN_BLOCK_SIZE (4096) +#define RAB_POLYNOMIAL_MIN_BLOCK_SIZE RAB_POLYNOMIAL_AVG_BLOCK_SIZE #define RAB_POLYNOMIAL_MAX_BLOCK_SIZE (128 * 1024) #define RAB_POLYNOMIAL_WIN_SIZE 31 #define RAB_POLYNOMIAL_MIN_WIN_SIZE 17 @@ -95,9 +95,9 @@ typedef struct { #define RABIN_ENTRY_SIZE (sizeof (unsigned int)) // Header for a chunk deduped using Rabin -// Number of rabin blocks, size of original chunk -// -#define RABIN_HDR_SIZE (sizeof (unsigned int) + sizeof (ssize_t)) +// Number of rabin blocks, size of original data chunk, size of compressed index, +// size of deduped data, size of compressed data +#define RABIN_HDR_SIZE (sizeof (unsigned int) + sizeof (ssize_t) + sizeof (ssize_t) + sizeof (ssize_t) + sizeof (ssize_t)) typedef struct { unsigned char *current_window_data; @@ -110,13 +110,20 @@ typedef struct { uint64_t block_checksum; int dedup; int valid; + void *lzma_data; + int level; } rabin_context_t; extern rabin_context_t *create_rabin_context(uint64_t chunksize); extern void destroy_rabin_context(rabin_context_t *ctx); -extern void rabin_dedup(rabin_context_t *ctx, unsigned char *buf, +extern unsigned int rabin_dedup(rabin_context_t *ctx, unsigned char *buf, ssize_t *size, ssize_t offset); extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size); +extern void rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz, + ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp, + ssize_t *rabin_data_sz_cmp, ssize_t *rabin_deduped_size); +extern void rabin_update_hdr(uchar_t *buf, ssize_t rabin_index_sz_cmp, + ssize_t rabin_data_sz_cmp); extern void reset_rabin_context(rabin_context_t *ctx); #endif /* _RABIN_POLY_H_ */ \ No newline at end of file diff --git a/utils.h b/utils.h index ce1f9c3..9b58f07 100644 --- a/utils.h +++ b/utils.h @@ -98,8 +98,18 @@ extern int parse_numeric(ssize_t *val, const char *str); extern char *bytes_to_size(uint64_t bytes); extern ssize_t Read(int fd, void *buf, size_t count); extern ssize_t Write(int fd, const void *buf, size_t count); -extern ssize_t Dedup_Read(int fd, uchar_t **buf, size_t count, - ssize_t *rabin_count, void *ctx); +// extern ssize_t Dedup_Read(int fd, uchar_t **buf, size_t count, +// ssize_t *rabin_count, void *ctx); + +/* Pointer type for compress and decompress functions. */ +typedef int (*compress_func_ptr)(void *src, size_t srclen, void *dst, + size_t *destlen, int level, uchar_t chdr, void *data); + +/* Pointer type for algo specific init/deinit/stats functions. */ +typedef int (*init_func_ptr)(void **data, int *level, ssize_t chunksize); +typedef int (*deinit_func_ptr)(void **data); +typedef void (*stats_func_ptr)(int show); + /* * Roundup v to the nearest power of 2. From Bit Twiddling Hacks: diff --git a/zlib_compress.c b/zlib_compress.c index 01ee415..962d78d 100644 --- a/zlib_compress.c +++ b/zlib_compress.c @@ -75,7 +75,8 @@ void zerr(int ret) } int -zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data) +zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen, + int level, uchar_t chdr, void *data) { z_stream zs; int ret; @@ -114,13 +115,14 @@ zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, vo } int -zlib_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data) +zlib_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, + int level, uchar_t chdr, void *data) { z_stream zs; int err; bzero(&zs, sizeof (zs)); - zs.next_in = (unsigned char *)src + CHDR_SZ; + zs.next_in = (unsigned char *)src; zs.avail_in = srclen; zs.next_out = dst; zs.avail_out = *dstlen;