From a1825a23058a617f7cdc00e590d10f198bc5d6d4 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Sun, 1 Jul 2012 21:44:02 +0530 Subject: [PATCH] Implement Parallel deduplication support. Restructure compression functions to take chunk flag as argument. Add missing error flag printing in LZMA. Only create enough threads as needed by chunk size and file size. Minor cleanups and variable name changes. --- Makefile | 3 + adaptive_compress.c | 30 ++++----- bzip2_compress.c | 8 ++- lzma_compress.c | 13 ++-- main.c | 111 ++++++++++++++++++++++++------- pcompress.h | 29 +++----- ppmd_compress.c | 6 +- rabin/rabin_polynomial.c | 139 +++++++++++++++++++++++++++++---------- rabin/rabin_polynomial.h | 17 +++-- utils.h | 14 +++- zlib_compress.c | 8 ++- 11 files changed, 265 insertions(+), 113 deletions(-) diff --git a/Makefile b/Makefile index 8a9a0ec..e1aa9e1 100644 --- a/Makefile +++ b/Makefile @@ -65,6 +65,9 @@ LINK = g++ -m64 -pthread -msse3 COMPILE = gcc -m64 -O3 -msse3 -c COMPILE_cpp = g++ -m64 -O3 -msse3 -c CPPFLAGS += -DNDEBUG +ifdef DEBUG_NO_SLAB +CPPFLAGS += -DDEBUG_NO_SLAB +endif endif all: $(PROG) diff --git a/adaptive_compress.c b/adaptive_compress.c index 44c2535..49b2317 100644 --- a/adaptive_compress.c +++ b/adaptive_compress.c @@ -40,18 +40,18 @@ static unsigned int bzip2_count = 0; static unsigned int ppmd_count = 0; extern int lzma_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int bzip2_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int ppmd_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int lzma_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int bzip2_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int ppmd_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int lzma_init(void **data, int *level, ssize_t chunksize); extern int lzma_deinit(void **data); @@ -137,7 +137,7 @@ adapt_deinit(void **data) int adapt_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { struct adapt_data *adat = (struct adapt_data *)(data); int rv, rv1, rv2; @@ -156,18 +156,18 @@ adapt_compress(void *src, size_t srclen, void *dst, inc = &ppmd_count; dst2len = *dstlen; dst3len = *dstlen; - rv1 = ppmd_compress(src, srclen, dst, dstlen, level, adat->ppmd_data); + rv1 = ppmd_compress(src, srclen, dst, dstlen, level, chdr, adat->ppmd_data); if (rv1 < 0) *dstlen = dst3len; if (adat->adapt_mode == 2) { - rv2 = lzma_compress(src, srclen, dst2, &dst2len, level, adat->lzma_data); + rv2 = lzma_compress(src, srclen, dst2, &dst2len, level, chdr, adat->lzma_data); if (rv2 < 0) dst2len = dst3len; if (dst2len < *dstlen) { inc = &lzma_count; rv = COMPRESS_LZMA; } } else { - rv2 = bzip2_compress(src, srclen, dst2, &dst2len, level, NULL); + rv2 = bzip2_compress(src, srclen, dst2, &dst2len, level, chdr, NULL); if (rv2 < 0) dst2len = dst3len; if (dst2len < *dstlen) { inc = &bzip2_count; @@ -194,21 +194,21 @@ adapt_compress(void *src, size_t srclen, void *dst, int adapt_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { struct adapt_data *adat = (struct adapt_data *)(data); uchar_t HDR; - HDR = *((uchar_t *)src); + HDR = chdr; if (HDR & (COMPRESS_LZMA << 4)) { - return (lzma_decompress(src, srclen, dst, dstlen, level, adat->lzma_data)); + return (lzma_decompress(src, srclen, dst, dstlen, level, chdr, adat->lzma_data)); } else if (HDR & (COMPRESS_BZIP2 << 4)) { - return (bzip2_decompress(src, srclen, dst, dstlen, level, NULL)); + return (bzip2_decompress(src, srclen, dst, dstlen, level, chdr, NULL)); } else if (HDR & (COMPRESS_PPMD << 4)) { - return (ppmd_decompress(src, srclen, dst, dstlen, level, adat->ppmd_data)); + return (ppmd_decompress(src, srclen, dst, dstlen, level, chdr, adat->ppmd_data)); } else { fprintf(stderr, "Unrecognized compression mode, file corrupt.\n"); diff --git a/bzip2_compress.c b/bzip2_compress.c index e0d1faa..3ad2ee0 100644 --- a/bzip2_compress.c +++ b/bzip2_compress.c @@ -81,7 +81,8 @@ bzerr(int err) } int -bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data) +bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen, + int level, uchar_t chdr, void *data) { bz_stream bzs; int ret; @@ -120,7 +121,8 @@ bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, v } int -bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data) +bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, + int level, uchar_t chdr, void *data) { bz_stream bzs; int ret; @@ -135,7 +137,7 @@ bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, return (-1); } - bzs.next_in = (uchar_t *)src + CHDR_SZ; + bzs.next_in = (uchar_t *)src; bzs.avail_in = srclen; bzs.next_out = dst; bzs.avail_out = *dstlen; diff --git a/lzma_compress.c b/lzma_compress.c index 343edd4..505dc0b 100644 --- a/lzma_compress.c +++ b/lzma_compress.c @@ -79,11 +79,11 @@ lzma_init(void **data, int *level, ssize_t chunksize) p->fb = 64; else p->fb = 128; - if (*level > 9) *level = 9; p->level = *level; LzmaEncProps_Normalize(p); slab_cache_add(p->litprob_sz); } + if (*level > 9) *level = 9; *data = p; return (0); } @@ -115,6 +115,9 @@ lzerr(int err) case SZ_ERROR_PROGRESS: fprintf(stderr, "LZMA: Progress callback errored\n"); break; + case SZ_ERROR_INPUT_EOF: + fprintf(stderr, "LZMA: More compressed input bytes expected\n"); + break; case SZ_ERROR_OUTPUT_EOF: fprintf(stderr, "LZMA: Output buffer overflow\n"); break; @@ -146,7 +149,7 @@ lzerr(int err) */ int lzma_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { size_t props_len = LZMA_PROPS_SIZE; SRes res; @@ -175,7 +178,7 @@ lzma_compress(void *src, size_t srclen, void *dst, int lzma_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { size_t _srclen; const uchar_t *_src; @@ -183,10 +186,10 @@ lzma_decompress(void *src, size_t srclen, void *dst, ELzmaStatus status; _srclen = srclen - LZMA_PROPS_SIZE; - _src = (uchar_t *)src + LZMA_PROPS_SIZE + CHDR_SZ; + _src = (uchar_t *)src + LZMA_PROPS_SIZE; if ((res = LzmaDecode((uchar_t *)dst, dstlen, _src, &_srclen, - src + CHDR_SZ, LZMA_PROPS_SIZE, LZMA_FINISH_ANY, + src, LZMA_PROPS_SIZE, LZMA_FINISH_ANY, &status, &g_Alloc)) != SZ_OK) { lzerr(res); return (-1); diff --git a/main.c b/main.c index 00783e6..da144bb 100644 --- a/main.c +++ b/main.c @@ -148,7 +148,9 @@ perform_decompress(void *dat) { struct cmp_data *tdat = (struct cmp_data *)dat; ssize_t _chunksize; + ssize_t rabin_index_sz, rabin_data_sz, rabin_index_sz_cmp, rabin_data_sz_cmp; int type, rv; + unsigned int blknum; typeof (tdat->crc64) crc64; uchar_t HDR; uchar_t *cseg; @@ -174,6 +176,7 @@ redo: _chunksize = tdat->chunksize; tdat->crc64 = htonll(*((typeof (crc64) *)(tdat->compressed_chunk))); HDR = *cseg; + cseg += CHDR_SZ; if (HDR & CHSIZE_MASK) { uchar_t *rseg; @@ -184,13 +187,48 @@ redo: } if (HDR & COMPRESSED) { - rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk, - &_chunksize, tdat->level, tdat->data); + if (enable_rabin_scan && (HDR & FLAG_DEDUP)) { + uchar_t *cmpbuf, *ubuf; + + /* Extract various sizes from rabin header. */ + rabin_parse_hdr(cseg, &blknum, &rabin_index_sz, &rabin_data_sz, + &rabin_index_sz_cmp, &rabin_data_sz_cmp, &_chunksize); + memcpy(tdat->uncompressed_chunk, cseg, RABIN_HDR_SIZE); + + /* + * Uncompress the data chunk first and then uncompress the index. + * The uncompress routines can use extra bytes at the end for temporary + * state/dictionary info. Since data chunk directly follows index + * uncompressing index first corrupts the data. + */ + cmpbuf = cseg + RABIN_HDR_SIZE + rabin_index_sz_cmp; + ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE + rabin_index_sz; + rv = tdat->decompress(cmpbuf, rabin_data_sz_cmp, ubuf, &_chunksize, + tdat->level, HDR, tdat->data); + if (rv == -1) { + tdat->len_cmp = 0; + fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id); + goto cont; + } + + cmpbuf = cseg + RABIN_HDR_SIZE; + ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE; + rv = lzma_decompress(cmpbuf, rabin_index_sz_cmp, ubuf, &rabin_index_sz, + tdat->rctx->level, 0, tdat->rctx->lzma_data); + } else { + rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk, + &_chunksize, tdat->level, HDR, tdat->data); + } } else { memcpy(cseg + CHDR_SZ, tdat->uncompressed_chunk, _chunksize); } tdat->len_cmp = _chunksize; + if (rv == -1) { + tdat->len_cmp = 0; + fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id); + goto cont; + } /* Rebuild chunk from dedup blocks. */ if (enable_rabin_scan && (HDR & FLAG_DEDUP)) { rabin_context_t *rctx; @@ -213,11 +251,6 @@ redo: tdat->cmp_seg = tdat->uncompressed_chunk; } - if (rv == -1) { - tdat->len_cmp = 0; - fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id); - goto cont; - } /* * Re-compute checksum of original uncompressed chunk. * If it does not match we set length of chunk to 0 to indicate @@ -356,9 +389,8 @@ start_decompress(const char *filename, const char *to_filename) enable_rabin_scan = 1; } - if (nthreads == 0) - nprocs = sysconf(_SC_NPROCESSORS_ONLN); - else + nprocs = sysconf(_SC_NPROCESSORS_ONLN); + if (nthreads > 0 && nthreads < nprocs) nprocs = nthreads; fprintf(stderr, "Scaling to %d threads\n", nprocs); @@ -542,8 +574,9 @@ uncomp_done: static void * perform_compress(void *dat) { struct cmp_data *tdat = (struct cmp_data *)dat; - typeof (tdat->chunksize) _chunksize, len_cmp; + typeof (tdat->chunksize) _chunksize, len_cmp, rabin_index_sz, index_size_cmp; int type, rv; + uchar_t *compressed_chunk; redo: sem_wait(&tdat->start_sem); @@ -553,6 +586,7 @@ redo: return (0); } + compressed_chunk = tdat->compressed_chunk + CHDR_SZ; /* Perform Dedup if enabled. */ if (enable_rabin_scan) { rabin_context_t *rctx; @@ -567,7 +601,7 @@ redo: rbytes = tdat->rbytes; reset_rabin_context(tdat->rctx); rctx->cbuf = tdat->uncompressed_chunk; - rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0); + rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0); if (!rctx->valid) { memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes); tdat->rbytes = rbytes; @@ -578,11 +612,37 @@ redo: */ tdat->crc64 = lzma_crc64(tdat->uncompressed_chunk, tdat->rbytes, 0); } - _chunksize = tdat->rbytes; - rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes, - tdat->compressed_chunk + CHDR_SZ, &_chunksize, tdat->level, - tdat->data); + /* + * If doing dedup we compress rabin index and deduped data separately. + * The rabin index array values can pollute the compressor's dictionary thereby + * reducing compression effectiveness of the data chunk. So we separate them. + */ + if (enable_rabin_scan && tdat->rctx->valid) { + _chunksize = tdat->rbytes - rabin_index_sz - RABIN_HDR_SIZE; + index_size_cmp = rabin_index_sz; + memcpy(compressed_chunk, tdat->uncompressed_chunk, RABIN_HDR_SIZE); + + /* Compress index. */ + rv = lzma_compress(tdat->uncompressed_chunk + RABIN_HDR_SIZE, + rabin_index_sz, compressed_chunk + RABIN_HDR_SIZE, &index_size_cmp, + tdat->rctx->level, 0, tdat->rctx->lzma_data); + + index_size_cmp += RABIN_HDR_SIZE; + if (rv == 0) { + /* Compress data chunk. */ + rv = tdat->compress(tdat->uncompressed_chunk + rabin_index_sz + RABIN_HDR_SIZE, + _chunksize, compressed_chunk + index_size_cmp, &_chunksize, + tdat->level, 0, tdat->data); + /* Now update rabin header with the compressed sizes. */ + rabin_update_hdr(compressed_chunk, index_size_cmp - RABIN_HDR_SIZE , _chunksize); + } + _chunksize += index_size_cmp; + } else { + _chunksize = tdat->rbytes; + rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes, + compressed_chunk, &_chunksize, tdat->level, 0, tdat->data); + } /* * Sanity check to ensure compressed data is lesser than original. * If at all compression expands/does not shrink data then the chunk @@ -591,8 +651,7 @@ redo: */ tdat->len_cmp = _chunksize; if (_chunksize >= tdat->chunksize || rv < 0) { - memcpy(tdat->compressed_chunk + CHDR_SZ, tdat->uncompressed_chunk, - tdat->rbytes); + memcpy(compressed_chunk, tdat->uncompressed_chunk, tdat->rbytes); type = UNCOMPRESSED; tdat->len_cmp = tdat->rbytes; } else { @@ -608,7 +667,7 @@ redo: */ len_cmp = tdat->len_cmp; *((typeof (len_cmp) *)(tdat->cmp_seg)) = htonll(tdat->len_cmp); - *((typeof (tdat->crc64) *)(tdat->cmp_seg + sizeof (tdat->crc64))) = htonll(tdat->crc64); + *((typeof (tdat->crc64) *)(tdat->cmp_seg + sizeof (tdat->len_cmp))) = htonll(tdat->crc64); tdat->len_cmp += CHDR_SZ; tdat->len_cmp += sizeof (len_cmp); tdat->len_cmp += sizeof (tdat->crc64); @@ -752,6 +811,11 @@ start_compress(const char *filename, uint64_t chunksize, int level) */ if (sbuf.st_size < chunksize) { chunksize = sbuf.st_size; + nthreads = 1; + } else { + nthreads = sbuf.st_size / chunksize; + if (sbuf.st_size % chunksize) + nthreads++; } /* @@ -783,12 +847,13 @@ start_compress(const char *filename, uint64_t chunksize, int level) } } - if (nthreads == 0) - nprocs = sysconf(_SC_NPROCESSORS_ONLN); - else + nprocs = sysconf(_SC_NPROCESSORS_ONLN); + if (nthreads > 0 && nthreads < nprocs) nprocs = nthreads; - fprintf(stderr, "Scaling to %d threads\n", nprocs); + fprintf(stderr, "Scaling to %d thread", nprocs); + if (nprocs > 1) fprintf(stderr, "s"); + fprintf(stderr, "\n"); dary = (struct cmp_data **)slab_alloc(NULL, sizeof (struct cmp_data *) * nprocs); if (enable_rabin_scan) diff --git a/pcompress.h b/pcompress.h index ebf5c22..ad718d6 100644 --- a/pcompress.h +++ b/pcompress.h @@ -51,40 +51,31 @@ extern "C" { #define CHUNK_FLAG_DEDUP 2 #define COMP_EXTN ".pz" -/* Pointer type for compress and decompress functions. */ -typedef int (*compress_func_ptr)(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); - -/* Pointer type for algo specific init/deinit/stats functions. */ -typedef int (*init_func_ptr)(void **data, int *level, ssize_t chunksize); -typedef int (*deinit_func_ptr)(void **data); -typedef void (*stats_func_ptr)(int show); - extern uint64_t lzma_crc64(const uint8_t *buf, size_t size, uint64_t crc); extern uint64_t lzma_crc64_8bchk(const uint8_t *buf, size_t size, uint64_t crc, uint64_t *cnt); extern int zlib_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int lzma_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int bzip2_compress(void *src, size_t srclen, void *dst, - size_t *destlen, int level, void *data); + size_t *destlen, int level, uchar_t chdr, void *data); extern int adapt_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int ppmd_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int zlib_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int lzma_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int bzip2_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int adapt_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int ppmd_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data); + size_t *dstlen, int level, uchar_t chdr, void *data); extern int adapt_init(void **data, int *level, ssize_t chunksize); extern int adapt2_init(void **data, int *level, ssize_t chunksize); diff --git a/ppmd_compress.c b/ppmd_compress.c index ade3151..537f039 100644 --- a/ppmd_compress.c +++ b/ppmd_compress.c @@ -100,7 +100,7 @@ ppmd_deinit(void **data) int ppmd_compress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { CPpmd8 *_ppmd = (CPpmd8 *)data; uchar_t *_src = (uchar_t *)src; @@ -122,10 +122,10 @@ ppmd_compress(void *src, size_t srclen, void *dst, int ppmd_decompress(void *src, size_t srclen, void *dst, - size_t *dstlen, int level, void *data) + size_t *dstlen, int level, uchar_t chdr, void *data) { CPpmd8 *_ppmd = (CPpmd8 *)data; - Byte *_src = (Byte *)src + CHDR_SZ; + Byte *_src = (Byte *)src; Byte *_dst = (Byte *)dst; size_t i; int res; diff --git a/rabin/rabin_polynomial.c b/rabin/rabin_polynomial.c index 603494f..22a0b65 100755 --- a/rabin/rabin_polynomial.c +++ b/rabin/rabin_polynomial.c @@ -76,6 +76,13 @@ extern const uint64_t lzma_crc64_table[4][256]; #include "rabin_polynomial.h" +extern int lzma_init(void **data, int *level, ssize_t chunksize); +extern int lzma_compress(void *src, size_t srclen, void *dst, + size_t *destlen, int level, uchar_t chdr, void *data); +extern int lzma_decompress(void *src, size_t srclen, void *dst, + size_t *dstlen, int level, uchar_t chdr, void *data); +extern int lzma_deinit(void **data); + unsigned int rabin_polynomial_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE; unsigned int rabin_polynomial_min_block_size = RAB_POLYNOMIAL_MIN_BLOCK_SIZE; unsigned int rabin_avg_block_mask = RAB_POLYNOMIAL_AVG_BLOCK_MASK; @@ -87,7 +94,8 @@ rabin_context_t * create_rabin_context(uint64_t chunksize) { rabin_context_t *ctx; unsigned char *current_window_data; - unsigned int blknum; + unsigned int blknum, index; + int level = 14; blknum = chunksize / rabin_polynomial_min_block_size; if (chunksize % rabin_polynomial_min_block_size) @@ -100,9 +108,17 @@ create_rabin_context(uint64_t chunksize) { if(ctx == NULL || current_window_data == NULL || ctx->blocks == NULL) { fprintf(stderr, "Could not allocate rabin polynomial context, out of memory\n"); + destroy_rabin_context(ctx); return (NULL); } + lzma_init(&(ctx->lzma_data), &(ctx->level), chunksize); + if (!(ctx->lzma_data)) { + fprintf(stderr, + "Could not allocate rabin polynomial context, out of memory\n"); + destroy_rabin_context(ctx); + return (NULL); + } /* * We should compute the power for the window size. * static uint64_t polynomial_pow; @@ -131,9 +147,12 @@ reset_rabin_context(rabin_context_t *ctx) void destroy_rabin_context(rabin_context_t *ctx) { - slab_free(NULL, ctx->current_window_data); - slab_free(NULL, ctx->blocks); - slab_free(NULL, ctx); + if (ctx) { + if (ctx->current_window_data) slab_free(NULL, ctx->current_window_data); + if (ctx->blocks) slab_free(NULL, ctx->blocks); + if (ctx->lzma_data) lzma_deinit(&(ctx->lzma_data)); + slab_free(NULL, ctx); + } } /* @@ -157,14 +176,14 @@ cmpblks(const void *a, const void *b) * Perform Deduplication based on Rabin Fingerprinting. A 32-byte window is used for * the rolling checksum and dedup blocks vary in size from 4K-128K. */ -void +unsigned int rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) { - ssize_t i, last_offset; + ssize_t i, last_offset,j; unsigned int blknum; char *buf1 = (char *)buf; unsigned int length; - ssize_t overhead; + ssize_t rabin_index_sz; length = offset; last_offset = 0; @@ -186,6 +205,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) */ ctx->cur_roll_checksum = (ctx->cur_roll_checksum << 1) + cur_byte; ctx->cur_roll_checksum -= (pushed_out << RAB_POLYNOMIAL_WIN_SIZE); + // CRC64 Calculation swiped from LZMA ctx->cur_checksum = lzma_crc64_table[0][cur_byte ^ A1(ctx->cur_checksum)] ^ S8(ctx->cur_checksum); @@ -205,7 +225,6 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) ctx->blocks[blknum].checksum = ctx->cur_checksum; ctx->blocks[blknum].length = length; ctx->blocks[blknum].refcount = 0; - blknum++; ctx->cur_checksum = 0; last_offset = i+1; @@ -220,7 +239,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) ssize_t pos, matches; int valid = 1; char *tmp, *prev_offset; - unsigned int *blkarr, prev_index, prev_blk; + unsigned int *rabin_index, prev_index, prev_blk; // Insert the last left-over trailing bytes, if any, into a block. if (last_offset < *size) { @@ -228,16 +247,17 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) ctx->blocks[blknum].index = blknum; ctx->blocks[blknum].checksum = ctx->cur_checksum; ctx->blocks[blknum].length = *size - last_offset; + ctx->blocks[blknum].refcount = 0; blknum++; ctx->cur_checksum = 0; last_offset = *size; } - overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE; + rabin_index_sz = (ssize_t)blknum * RABIN_ENTRY_SIZE; prev_cksum = 0; prev_length = 0; prev_offset = 0; - pos = overhead; + pos = rabin_index_sz + RABIN_HDR_SIZE; /* * Now sort the block array based on checksums. This will bring virtually @@ -246,7 +266,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) * TODO: Test with a heavily optimized MD5 (from OpenSSL?) later. */ qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t), cmpblks); - blkarr = (unsigned int *)(ctx->cbuf + RABIN_HDR_SIZE); + rabin_index = (unsigned int *)(ctx->cbuf + RABIN_HDR_SIZE); matches = 0; /* @@ -260,7 +280,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) * blocks. This helps in non-duplicate block merging later. */ for (blk = 0; blk < blknum; blk++) { - blkarr[ctx->blocks[blk].index] = blk; + rabin_index[ctx->blocks[blk].index] = blk; if (blk > 0 && ctx->blocks[blk].checksum == prev_cksum && ctx->blocks[blk].length == prev_length && @@ -279,7 +299,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) prev_blk = blk; } - if (matches < overhead) { + if (matches < rabin_index_sz) { ctx->valid = 0; return; } @@ -303,7 +323,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) valid = 0; break; } - be = &(ctx->blocks[blkarr[blk]]); + be = &(ctx->blocks[rabin_index[blk]]); if (be->length > 0) { prev_offset = buf1 + be->offset; memcpy(ctx->cbuf + pos, prev_offset, be->length); @@ -318,61 +338,101 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) prev_index = blk; prev_length = be->length; } - blkarr[blk] = htonl(be->length); + rabin_index[blk] = htonl(be->length); } else { if (be->refcount > 0) { prev_index = 0; prev_length = 0; - blkarr[blk] = htonl(be->length); + rabin_index[blk] = htonl(be->length); } else { if (prev_length + be->length <= RAB_POLYNOMIAL_MAX_BLOCK_SIZE) { prev_length += be->length; - blkarr[prev_index] = htonl(prev_length); - blkarr[blk] = 0; + rabin_index[prev_index] = htonl(prev_length); + rabin_index[blk] = 0; } else { prev_index = 0; prev_length = 0; - blkarr[blk] = htonl(be->length); + rabin_index[blk] = htonl(be->length); } } } } else { prev_index = 0; prev_length = 0; - blkarr[blk] = htonl(RAB_POLYNOMIAL_MAX_BLOCK_SIZE + be->index + 1); + rabin_index[blk] = htonl(RAB_POLYNOMIAL_MAX_BLOCK_SIZE + be->index + 1); } } cont: if (valid) { - *((unsigned int *)(ctx->cbuf)) = htonl(blknum); - *((ssize_t *)(ctx->cbuf + sizeof (unsigned int))) = htonll(*size); + uchar_t *cbuf = ctx->cbuf; + ssize_t *entries; + + *((unsigned int *)cbuf) = htonl(blknum); + cbuf += sizeof (unsigned int); + entries = (ssize_t *)cbuf; + entries[0] = htonll(*size); + entries[1] = 0; + entries[2] = htonll(pos - rabin_index_sz - RABIN_HDR_SIZE); *size = pos; ctx->valid = 1; + /* + * Remaining header entries: size of compressed index and size of + * compressed data are inserted later via rabin_update_hdr, after actual compression! + */ + return (rabin_index_sz); } } + return (0); +} + +void +rabin_update_hdr(uchar_t *buf, ssize_t rabin_index_sz_cmp, ssize_t rabin_data_sz_cmp) +{ + ssize_t *entries; + + buf += sizeof (unsigned int); + entries = (ssize_t *)buf; + entries[1] = htonll(rabin_index_sz_cmp); + entries[3] = htonll(rabin_data_sz_cmp); +} + +void +rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz, + ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp, + ssize_t *rabin_data_sz_cmp, ssize_t *rabin_deduped_size) +{ + ssize_t *entries; + + *blknum = ntohl(*((unsigned int *)(buf))); + buf += sizeof (unsigned int); + + entries = (ssize_t *)buf; + *rabin_data_sz = ntohll(entries[0]); + *rabin_index_sz = (ssize_t)(*blknum) * RABIN_ENTRY_SIZE; + *rabin_index_sz_cmp = ntohll(entries[1]); + *rabin_deduped_size = ntohll(entries[2]); + *rabin_data_sz_cmp = ntohll(entries[3]); } void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size) { unsigned int blknum, blk, oblk, len; - unsigned int *blkarr; - ssize_t orig_size, sz; - ssize_t overhead, pos1, i; + unsigned int *rabin_index; + ssize_t data_sz, sz, indx_cmp, data_sz_cmp, deduped_sz; + ssize_t rabin_index_sz, pos1, i; uchar_t *pos2; - blknum = ntohl(*((unsigned int *)(buf))); - orig_size = ntohll(*((ssize_t *)(buf + sizeof (unsigned int)))); - blkarr = (unsigned int *)(buf + RABIN_HDR_SIZE); - overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE; - pos1 = overhead; + rabin_parse_hdr(buf, &blknum, &rabin_index_sz, &data_sz, &indx_cmp, &data_sz_cmp, &deduped_sz); + rabin_index = (unsigned int *)(buf + RABIN_HDR_SIZE); + pos1 = rabin_index_sz + RABIN_HDR_SIZE; pos2 = ctx->cbuf; sz = 0; ctx->valid = 1; for (blk = 0; blk < blknum; blk++) { - len = ntohl(blkarr[blk]); + len = ntohl(rabin_index[blk]); if (len == 0) { ctx->blocks[blk].length = 0; ctx->blocks[blk].index = 0; @@ -399,14 +459,23 @@ rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size) memcpy(pos2, buf + pos1, len); pos2 += len; sz += len; - if (sz > orig_size) { + if (sz > data_sz) { ctx->valid = 0; break; } } - if (ctx->valid && sz < orig_size) { + if (ctx->valid && sz < data_sz) { ctx->valid = 0; } - *size = orig_size; + *size = data_sz; } +/* + * TODO: Consolidate rabin dedup and compression/decompression in functions here rather than + * messy code in main program. +int +rabin_compress(rabin_context_t *ctx, uchar_t *from, ssize_t fromlen, uchar_t *to, ssize_t *tolen, + int level, char chdr, void *data, compress_func_ptr cmp) +{ +} +*/ diff --git a/rabin/rabin_polynomial.h b/rabin/rabin_polynomial.h index 46f0e22..71ac0d3 100644 --- a/rabin/rabin_polynomial.h +++ b/rabin/rabin_polynomial.h @@ -71,7 +71,7 @@ #define RAB_POLYNOMIAL_AVG_BLOCK_SHIFT 12 #define RAB_POLYNOMIAL_AVG_BLOCK_SIZE (1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT) #define RAB_POLYNOMIAL_AVG_BLOCK_MASK (RAB_POLYNOMIAL_AVG_BLOCK_SIZE - 1) -#define RAB_POLYNOMIAL_MIN_BLOCK_SIZE (4096) +#define RAB_POLYNOMIAL_MIN_BLOCK_SIZE RAB_POLYNOMIAL_AVG_BLOCK_SIZE #define RAB_POLYNOMIAL_MAX_BLOCK_SIZE (128 * 1024) #define RAB_POLYNOMIAL_WIN_SIZE 31 #define RAB_POLYNOMIAL_MIN_WIN_SIZE 17 @@ -95,9 +95,9 @@ typedef struct { #define RABIN_ENTRY_SIZE (sizeof (unsigned int)) // Header for a chunk deduped using Rabin -// Number of rabin blocks, size of original chunk -// -#define RABIN_HDR_SIZE (sizeof (unsigned int) + sizeof (ssize_t)) +// Number of rabin blocks, size of original data chunk, size of compressed index, +// size of deduped data, size of compressed data +#define RABIN_HDR_SIZE (sizeof (unsigned int) + sizeof (ssize_t) + sizeof (ssize_t) + sizeof (ssize_t) + sizeof (ssize_t)) typedef struct { unsigned char *current_window_data; @@ -110,13 +110,20 @@ typedef struct { uint64_t block_checksum; int dedup; int valid; + void *lzma_data; + int level; } rabin_context_t; extern rabin_context_t *create_rabin_context(uint64_t chunksize); extern void destroy_rabin_context(rabin_context_t *ctx); -extern void rabin_dedup(rabin_context_t *ctx, unsigned char *buf, +extern unsigned int rabin_dedup(rabin_context_t *ctx, unsigned char *buf, ssize_t *size, ssize_t offset); extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size); +extern void rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz, + ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp, + ssize_t *rabin_data_sz_cmp, ssize_t *rabin_deduped_size); +extern void rabin_update_hdr(uchar_t *buf, ssize_t rabin_index_sz_cmp, + ssize_t rabin_data_sz_cmp); extern void reset_rabin_context(rabin_context_t *ctx); #endif /* _RABIN_POLY_H_ */ \ No newline at end of file diff --git a/utils.h b/utils.h index ce1f9c3..9b58f07 100644 --- a/utils.h +++ b/utils.h @@ -98,8 +98,18 @@ extern int parse_numeric(ssize_t *val, const char *str); extern char *bytes_to_size(uint64_t bytes); extern ssize_t Read(int fd, void *buf, size_t count); extern ssize_t Write(int fd, const void *buf, size_t count); -extern ssize_t Dedup_Read(int fd, uchar_t **buf, size_t count, - ssize_t *rabin_count, void *ctx); +// extern ssize_t Dedup_Read(int fd, uchar_t **buf, size_t count, +// ssize_t *rabin_count, void *ctx); + +/* Pointer type for compress and decompress functions. */ +typedef int (*compress_func_ptr)(void *src, size_t srclen, void *dst, + size_t *destlen, int level, uchar_t chdr, void *data); + +/* Pointer type for algo specific init/deinit/stats functions. */ +typedef int (*init_func_ptr)(void **data, int *level, ssize_t chunksize); +typedef int (*deinit_func_ptr)(void **data); +typedef void (*stats_func_ptr)(int show); + /* * Roundup v to the nearest power of 2. From Bit Twiddling Hacks: diff --git a/zlib_compress.c b/zlib_compress.c index 01ee415..962d78d 100644 --- a/zlib_compress.c +++ b/zlib_compress.c @@ -75,7 +75,8 @@ void zerr(int ret) } int -zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data) +zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen, + int level, uchar_t chdr, void *data) { z_stream zs; int ret; @@ -114,13 +115,14 @@ zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, vo } int -zlib_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data) +zlib_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, + int level, uchar_t chdr, void *data) { z_stream zs; int err; bzero(&zs, sizeof (zs)); - zs.next_in = (unsigned char *)src + CHDR_SZ; + zs.next_in = (unsigned char *)src; zs.avail_in = srclen; zs.next_out = dst; zs.avail_out = *dstlen;