Implement Parallel deduplication support.

Restructure compression functions to take chunk flag as argument.
Add missing error flag printing in LZMA.
Only create enough threads as needed by chunk size and file size.
Minor cleanups and variable name changes.
This commit is contained in:
Moinak Ghosh 2012-07-01 21:44:02 +05:30
parent f9c3644459
commit a1825a2305
11 changed files with 265 additions and 113 deletions

View file

@ -65,6 +65,9 @@ LINK = g++ -m64 -pthread -msse3
COMPILE = gcc -m64 -O3 -msse3 -c
COMPILE_cpp = g++ -m64 -O3 -msse3 -c
CPPFLAGS += -DNDEBUG
ifdef DEBUG_NO_SLAB
CPPFLAGS += -DDEBUG_NO_SLAB
endif
endif
all: $(PROG)

View file

@ -40,18 +40,18 @@ static unsigned int bzip2_count = 0;
static unsigned int ppmd_count = 0;
extern int lzma_compress(void *src, size_t srclen, void *dst,
size_t *destlen, int level, void *data);
size_t *destlen, int level, uchar_t chdr, void *data);
extern int bzip2_compress(void *src, size_t srclen, void *dst,
size_t *destlen, int level, void *data);
size_t *destlen, int level, uchar_t chdr, void *data);
extern int ppmd_compress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int lzma_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int bzip2_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int ppmd_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int lzma_init(void **data, int *level, ssize_t chunksize);
extern int lzma_deinit(void **data);
@ -137,7 +137,7 @@ adapt_deinit(void **data)
int
adapt_compress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data)
size_t *dstlen, int level, uchar_t chdr, void *data)
{
struct adapt_data *adat = (struct adapt_data *)(data);
int rv, rv1, rv2;
@ -156,18 +156,18 @@ adapt_compress(void *src, size_t srclen, void *dst,
inc = &ppmd_count;
dst2len = *dstlen;
dst3len = *dstlen;
rv1 = ppmd_compress(src, srclen, dst, dstlen, level, adat->ppmd_data);
rv1 = ppmd_compress(src, srclen, dst, dstlen, level, chdr, adat->ppmd_data);
if (rv1 < 0) *dstlen = dst3len;
if (adat->adapt_mode == 2) {
rv2 = lzma_compress(src, srclen, dst2, &dst2len, level, adat->lzma_data);
rv2 = lzma_compress(src, srclen, dst2, &dst2len, level, chdr, adat->lzma_data);
if (rv2 < 0) dst2len = dst3len;
if (dst2len < *dstlen) {
inc = &lzma_count;
rv = COMPRESS_LZMA;
}
} else {
rv2 = bzip2_compress(src, srclen, dst2, &dst2len, level, NULL);
rv2 = bzip2_compress(src, srclen, dst2, &dst2len, level, chdr, NULL);
if (rv2 < 0) dst2len = dst3len;
if (dst2len < *dstlen) {
inc = &bzip2_count;
@ -194,21 +194,21 @@ adapt_compress(void *src, size_t srclen, void *dst,
int
adapt_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data)
size_t *dstlen, int level, uchar_t chdr, void *data)
{
struct adapt_data *adat = (struct adapt_data *)(data);
uchar_t HDR;
HDR = *((uchar_t *)src);
HDR = chdr;
if (HDR & (COMPRESS_LZMA << 4)) {
return (lzma_decompress(src, srclen, dst, dstlen, level, adat->lzma_data));
return (lzma_decompress(src, srclen, dst, dstlen, level, chdr, adat->lzma_data));
} else if (HDR & (COMPRESS_BZIP2 << 4)) {
return (bzip2_decompress(src, srclen, dst, dstlen, level, NULL));
return (bzip2_decompress(src, srclen, dst, dstlen, level, chdr, NULL));
} else if (HDR & (COMPRESS_PPMD << 4)) {
return (ppmd_decompress(src, srclen, dst, dstlen, level, adat->ppmd_data));
return (ppmd_decompress(src, srclen, dst, dstlen, level, chdr, adat->ppmd_data));
} else {
fprintf(stderr, "Unrecognized compression mode, file corrupt.\n");

View file

@ -81,7 +81,8 @@ bzerr(int err)
}
int
bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data)
bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen,
int level, uchar_t chdr, void *data)
{
bz_stream bzs;
int ret;
@ -120,7 +121,8 @@ bzip2_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, v
}
int
bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data)
bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen,
int level, uchar_t chdr, void *data)
{
bz_stream bzs;
int ret;
@ -135,7 +137,7 @@ bzip2_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level,
return (-1);
}
bzs.next_in = (uchar_t *)src + CHDR_SZ;
bzs.next_in = (uchar_t *)src;
bzs.avail_in = srclen;
bzs.next_out = dst;
bzs.avail_out = *dstlen;

View file

@ -79,11 +79,11 @@ lzma_init(void **data, int *level, ssize_t chunksize)
p->fb = 64;
else
p->fb = 128;
if (*level > 9) *level = 9;
p->level = *level;
LzmaEncProps_Normalize(p);
slab_cache_add(p->litprob_sz);
}
if (*level > 9) *level = 9;
*data = p;
return (0);
}
@ -115,6 +115,9 @@ lzerr(int err)
case SZ_ERROR_PROGRESS:
fprintf(stderr, "LZMA: Progress callback errored\n");
break;
case SZ_ERROR_INPUT_EOF:
fprintf(stderr, "LZMA: More compressed input bytes expected\n");
break;
case SZ_ERROR_OUTPUT_EOF:
fprintf(stderr, "LZMA: Output buffer overflow\n");
break;
@ -146,7 +149,7 @@ lzerr(int err)
*/
int
lzma_compress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data)
size_t *dstlen, int level, uchar_t chdr, void *data)
{
size_t props_len = LZMA_PROPS_SIZE;
SRes res;
@ -175,7 +178,7 @@ lzma_compress(void *src, size_t srclen, void *dst,
int
lzma_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data)
size_t *dstlen, int level, uchar_t chdr, void *data)
{
size_t _srclen;
const uchar_t *_src;
@ -183,10 +186,10 @@ lzma_decompress(void *src, size_t srclen, void *dst,
ELzmaStatus status;
_srclen = srclen - LZMA_PROPS_SIZE;
_src = (uchar_t *)src + LZMA_PROPS_SIZE + CHDR_SZ;
_src = (uchar_t *)src + LZMA_PROPS_SIZE;
if ((res = LzmaDecode((uchar_t *)dst, dstlen, _src, &_srclen,
src + CHDR_SZ, LZMA_PROPS_SIZE, LZMA_FINISH_ANY,
src, LZMA_PROPS_SIZE, LZMA_FINISH_ANY,
&status, &g_Alloc)) != SZ_OK) {
lzerr(res);
return (-1);

103
main.c
View file

@ -148,7 +148,9 @@ perform_decompress(void *dat)
{
struct cmp_data *tdat = (struct cmp_data *)dat;
ssize_t _chunksize;
ssize_t rabin_index_sz, rabin_data_sz, rabin_index_sz_cmp, rabin_data_sz_cmp;
int type, rv;
unsigned int blknum;
typeof (tdat->crc64) crc64;
uchar_t HDR;
uchar_t *cseg;
@ -174,6 +176,7 @@ redo:
_chunksize = tdat->chunksize;
tdat->crc64 = htonll(*((typeof (crc64) *)(tdat->compressed_chunk)));
HDR = *cseg;
cseg += CHDR_SZ;
if (HDR & CHSIZE_MASK) {
uchar_t *rseg;
@ -184,13 +187,48 @@ redo:
}
if (HDR & COMPRESSED) {
if (enable_rabin_scan && (HDR & FLAG_DEDUP)) {
uchar_t *cmpbuf, *ubuf;
/* Extract various sizes from rabin header. */
rabin_parse_hdr(cseg, &blknum, &rabin_index_sz, &rabin_data_sz,
&rabin_index_sz_cmp, &rabin_data_sz_cmp, &_chunksize);
memcpy(tdat->uncompressed_chunk, cseg, RABIN_HDR_SIZE);
/*
* Uncompress the data chunk first and then uncompress the index.
* The uncompress routines can use extra bytes at the end for temporary
* state/dictionary info. Since data chunk directly follows index
* uncompressing index first corrupts the data.
*/
cmpbuf = cseg + RABIN_HDR_SIZE + rabin_index_sz_cmp;
ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE + rabin_index_sz;
rv = tdat->decompress(cmpbuf, rabin_data_sz_cmp, ubuf, &_chunksize,
tdat->level, HDR, tdat->data);
if (rv == -1) {
tdat->len_cmp = 0;
fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id);
goto cont;
}
cmpbuf = cseg + RABIN_HDR_SIZE;
ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE;
rv = lzma_decompress(cmpbuf, rabin_index_sz_cmp, ubuf, &rabin_index_sz,
tdat->rctx->level, 0, tdat->rctx->lzma_data);
} else {
rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk,
&_chunksize, tdat->level, tdat->data);
&_chunksize, tdat->level, HDR, tdat->data);
}
} else {
memcpy(cseg + CHDR_SZ, tdat->uncompressed_chunk, _chunksize);
}
tdat->len_cmp = _chunksize;
if (rv == -1) {
tdat->len_cmp = 0;
fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id);
goto cont;
}
/* Rebuild chunk from dedup blocks. */
if (enable_rabin_scan && (HDR & FLAG_DEDUP)) {
rabin_context_t *rctx;
@ -213,11 +251,6 @@ redo:
tdat->cmp_seg = tdat->uncompressed_chunk;
}
if (rv == -1) {
tdat->len_cmp = 0;
fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id);
goto cont;
}
/*
* Re-compute checksum of original uncompressed chunk.
* If it does not match we set length of chunk to 0 to indicate
@ -356,9 +389,8 @@ start_decompress(const char *filename, const char *to_filename)
enable_rabin_scan = 1;
}
if (nthreads == 0)
nprocs = sysconf(_SC_NPROCESSORS_ONLN);
else
if (nthreads > 0 && nthreads < nprocs)
nprocs = nthreads;
fprintf(stderr, "Scaling to %d threads\n", nprocs);
@ -542,8 +574,9 @@ uncomp_done:
static void *
perform_compress(void *dat) {
struct cmp_data *tdat = (struct cmp_data *)dat;
typeof (tdat->chunksize) _chunksize, len_cmp;
typeof (tdat->chunksize) _chunksize, len_cmp, rabin_index_sz, index_size_cmp;
int type, rv;
uchar_t *compressed_chunk;
redo:
sem_wait(&tdat->start_sem);
@ -553,6 +586,7 @@ redo:
return (0);
}
compressed_chunk = tdat->compressed_chunk + CHDR_SZ;
/* Perform Dedup if enabled. */
if (enable_rabin_scan) {
rabin_context_t *rctx;
@ -567,7 +601,7 @@ redo:
rbytes = tdat->rbytes;
reset_rabin_context(tdat->rctx);
rctx->cbuf = tdat->uncompressed_chunk;
rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0);
rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0);
if (!rctx->valid) {
memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes);
tdat->rbytes = rbytes;
@ -578,11 +612,37 @@ redo:
*/
tdat->crc64 = lzma_crc64(tdat->uncompressed_chunk, tdat->rbytes, 0);
}
/*
* If doing dedup we compress rabin index and deduped data separately.
* The rabin index array values can pollute the compressor's dictionary thereby
* reducing compression effectiveness of the data chunk. So we separate them.
*/
if (enable_rabin_scan && tdat->rctx->valid) {
_chunksize = tdat->rbytes - rabin_index_sz - RABIN_HDR_SIZE;
index_size_cmp = rabin_index_sz;
memcpy(compressed_chunk, tdat->uncompressed_chunk, RABIN_HDR_SIZE);
/* Compress index. */
rv = lzma_compress(tdat->uncompressed_chunk + RABIN_HDR_SIZE,
rabin_index_sz, compressed_chunk + RABIN_HDR_SIZE, &index_size_cmp,
tdat->rctx->level, 0, tdat->rctx->lzma_data);
index_size_cmp += RABIN_HDR_SIZE;
if (rv == 0) {
/* Compress data chunk. */
rv = tdat->compress(tdat->uncompressed_chunk + rabin_index_sz + RABIN_HDR_SIZE,
_chunksize, compressed_chunk + index_size_cmp, &_chunksize,
tdat->level, 0, tdat->data);
/* Now update rabin header with the compressed sizes. */
rabin_update_hdr(compressed_chunk, index_size_cmp - RABIN_HDR_SIZE , _chunksize);
}
_chunksize += index_size_cmp;
} else {
_chunksize = tdat->rbytes;
rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes,
tdat->compressed_chunk + CHDR_SZ, &_chunksize, tdat->level,
tdat->data);
compressed_chunk, &_chunksize, tdat->level, 0, tdat->data);
}
/*
* Sanity check to ensure compressed data is lesser than original.
* If at all compression expands/does not shrink data then the chunk
@ -591,8 +651,7 @@ redo:
*/
tdat->len_cmp = _chunksize;
if (_chunksize >= tdat->chunksize || rv < 0) {
memcpy(tdat->compressed_chunk + CHDR_SZ, tdat->uncompressed_chunk,
tdat->rbytes);
memcpy(compressed_chunk, tdat->uncompressed_chunk, tdat->rbytes);
type = UNCOMPRESSED;
tdat->len_cmp = tdat->rbytes;
} else {
@ -608,7 +667,7 @@ redo:
*/
len_cmp = tdat->len_cmp;
*((typeof (len_cmp) *)(tdat->cmp_seg)) = htonll(tdat->len_cmp);
*((typeof (tdat->crc64) *)(tdat->cmp_seg + sizeof (tdat->crc64))) = htonll(tdat->crc64);
*((typeof (tdat->crc64) *)(tdat->cmp_seg + sizeof (tdat->len_cmp))) = htonll(tdat->crc64);
tdat->len_cmp += CHDR_SZ;
tdat->len_cmp += sizeof (len_cmp);
tdat->len_cmp += sizeof (tdat->crc64);
@ -752,6 +811,11 @@ start_compress(const char *filename, uint64_t chunksize, int level)
*/
if (sbuf.st_size < chunksize) {
chunksize = sbuf.st_size;
nthreads = 1;
} else {
nthreads = sbuf.st_size / chunksize;
if (sbuf.st_size % chunksize)
nthreads++;
}
/*
@ -783,12 +847,13 @@ start_compress(const char *filename, uint64_t chunksize, int level)
}
}
if (nthreads == 0)
nprocs = sysconf(_SC_NPROCESSORS_ONLN);
else
if (nthreads > 0 && nthreads < nprocs)
nprocs = nthreads;
fprintf(stderr, "Scaling to %d threads\n", nprocs);
fprintf(stderr, "Scaling to %d thread", nprocs);
if (nprocs > 1) fprintf(stderr, "s");
fprintf(stderr, "\n");
dary = (struct cmp_data **)slab_alloc(NULL, sizeof (struct cmp_data *) * nprocs);
if (enable_rabin_scan)

View file

@ -51,40 +51,31 @@ extern "C" {
#define CHUNK_FLAG_DEDUP 2
#define COMP_EXTN ".pz"
/* Pointer type for compress and decompress functions. */
typedef int (*compress_func_ptr)(void *src, size_t srclen, void *dst,
size_t *destlen, int level, void *data);
/* Pointer type for algo specific init/deinit/stats functions. */
typedef int (*init_func_ptr)(void **data, int *level, ssize_t chunksize);
typedef int (*deinit_func_ptr)(void **data);
typedef void (*stats_func_ptr)(int show);
extern uint64_t lzma_crc64(const uint8_t *buf, size_t size, uint64_t crc);
extern uint64_t lzma_crc64_8bchk(const uint8_t *buf, size_t size,
uint64_t crc, uint64_t *cnt);
extern int zlib_compress(void *src, size_t srclen, void *dst,
size_t *destlen, int level, void *data);
size_t *destlen, int level, uchar_t chdr, void *data);
extern int lzma_compress(void *src, size_t srclen, void *dst,
size_t *destlen, int level, void *data);
size_t *destlen, int level, uchar_t chdr, void *data);
extern int bzip2_compress(void *src, size_t srclen, void *dst,
size_t *destlen, int level, void *data);
size_t *destlen, int level, uchar_t chdr, void *data);
extern int adapt_compress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int ppmd_compress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int zlib_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int lzma_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int bzip2_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int adapt_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int ppmd_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data);
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int adapt_init(void **data, int *level, ssize_t chunksize);
extern int adapt2_init(void **data, int *level, ssize_t chunksize);

View file

@ -100,7 +100,7 @@ ppmd_deinit(void **data)
int
ppmd_compress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data)
size_t *dstlen, int level, uchar_t chdr, void *data)
{
CPpmd8 *_ppmd = (CPpmd8 *)data;
uchar_t *_src = (uchar_t *)src;
@ -122,10 +122,10 @@ ppmd_compress(void *src, size_t srclen, void *dst,
int
ppmd_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, void *data)
size_t *dstlen, int level, uchar_t chdr, void *data)
{
CPpmd8 *_ppmd = (CPpmd8 *)data;
Byte *_src = (Byte *)src + CHDR_SZ;
Byte *_src = (Byte *)src;
Byte *_dst = (Byte *)dst;
size_t i;
int res;

View file

@ -76,6 +76,13 @@ extern const uint64_t lzma_crc64_table[4][256];
#include "rabin_polynomial.h"
extern int lzma_init(void **data, int *level, ssize_t chunksize);
extern int lzma_compress(void *src, size_t srclen, void *dst,
size_t *destlen, int level, uchar_t chdr, void *data);
extern int lzma_decompress(void *src, size_t srclen, void *dst,
size_t *dstlen, int level, uchar_t chdr, void *data);
extern int lzma_deinit(void **data);
unsigned int rabin_polynomial_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE;
unsigned int rabin_polynomial_min_block_size = RAB_POLYNOMIAL_MIN_BLOCK_SIZE;
unsigned int rabin_avg_block_mask = RAB_POLYNOMIAL_AVG_BLOCK_MASK;
@ -87,7 +94,8 @@ rabin_context_t *
create_rabin_context(uint64_t chunksize) {
rabin_context_t *ctx;
unsigned char *current_window_data;
unsigned int blknum;
unsigned int blknum, index;
int level = 14;
blknum = chunksize / rabin_polynomial_min_block_size;
if (chunksize % rabin_polynomial_min_block_size)
@ -100,9 +108,17 @@ create_rabin_context(uint64_t chunksize) {
if(ctx == NULL || current_window_data == NULL || ctx->blocks == NULL) {
fprintf(stderr,
"Could not allocate rabin polynomial context, out of memory\n");
destroy_rabin_context(ctx);
return (NULL);
}
lzma_init(&(ctx->lzma_data), &(ctx->level), chunksize);
if (!(ctx->lzma_data)) {
fprintf(stderr,
"Could not allocate rabin polynomial context, out of memory\n");
destroy_rabin_context(ctx);
return (NULL);
}
/*
* We should compute the power for the window size.
* static uint64_t polynomial_pow;
@ -131,9 +147,12 @@ reset_rabin_context(rabin_context_t *ctx)
void
destroy_rabin_context(rabin_context_t *ctx)
{
slab_free(NULL, ctx->current_window_data);
slab_free(NULL, ctx->blocks);
if (ctx) {
if (ctx->current_window_data) slab_free(NULL, ctx->current_window_data);
if (ctx->blocks) slab_free(NULL, ctx->blocks);
if (ctx->lzma_data) lzma_deinit(&(ctx->lzma_data));
slab_free(NULL, ctx);
}
}
/*
@ -157,14 +176,14 @@ cmpblks(const void *a, const void *b)
* Perform Deduplication based on Rabin Fingerprinting. A 32-byte window is used for
* the rolling checksum and dedup blocks vary in size from 4K-128K.
*/
void
unsigned int
rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
{
ssize_t i, last_offset;
ssize_t i, last_offset,j;
unsigned int blknum;
char *buf1 = (char *)buf;
unsigned int length;
ssize_t overhead;
ssize_t rabin_index_sz;
length = offset;
last_offset = 0;
@ -186,6 +205,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
*/
ctx->cur_roll_checksum = (ctx->cur_roll_checksum << 1) + cur_byte;
ctx->cur_roll_checksum -= (pushed_out << RAB_POLYNOMIAL_WIN_SIZE);
// CRC64 Calculation swiped from LZMA
ctx->cur_checksum = lzma_crc64_table[0][cur_byte ^ A1(ctx->cur_checksum)] ^ S8(ctx->cur_checksum);
@ -205,7 +225,6 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
ctx->blocks[blknum].checksum = ctx->cur_checksum;
ctx->blocks[blknum].length = length;
ctx->blocks[blknum].refcount = 0;
blknum++;
ctx->cur_checksum = 0;
last_offset = i+1;
@ -220,7 +239,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
ssize_t pos, matches;
int valid = 1;
char *tmp, *prev_offset;
unsigned int *blkarr, prev_index, prev_blk;
unsigned int *rabin_index, prev_index, prev_blk;
// Insert the last left-over trailing bytes, if any, into a block.
if (last_offset < *size) {
@ -228,16 +247,17 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
ctx->blocks[blknum].index = blknum;
ctx->blocks[blknum].checksum = ctx->cur_checksum;
ctx->blocks[blknum].length = *size - last_offset;
ctx->blocks[blknum].refcount = 0;
blknum++;
ctx->cur_checksum = 0;
last_offset = *size;
}
overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE;
rabin_index_sz = (ssize_t)blknum * RABIN_ENTRY_SIZE;
prev_cksum = 0;
prev_length = 0;
prev_offset = 0;
pos = overhead;
pos = rabin_index_sz + RABIN_HDR_SIZE;
/*
* Now sort the block array based on checksums. This will bring virtually
@ -246,7 +266,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
* TODO: Test with a heavily optimized MD5 (from OpenSSL?) later.
*/
qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t), cmpblks);
blkarr = (unsigned int *)(ctx->cbuf + RABIN_HDR_SIZE);
rabin_index = (unsigned int *)(ctx->cbuf + RABIN_HDR_SIZE);
matches = 0;
/*
@ -260,7 +280,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
* blocks. This helps in non-duplicate block merging later.
*/
for (blk = 0; blk < blknum; blk++) {
blkarr[ctx->blocks[blk].index] = blk;
rabin_index[ctx->blocks[blk].index] = blk;
if (blk > 0 && ctx->blocks[blk].checksum == prev_cksum &&
ctx->blocks[blk].length == prev_length &&
@ -279,7 +299,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
prev_blk = blk;
}
if (matches < overhead) {
if (matches < rabin_index_sz) {
ctx->valid = 0;
return;
}
@ -303,7 +323,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
valid = 0;
break;
}
be = &(ctx->blocks[blkarr[blk]]);
be = &(ctx->blocks[rabin_index[blk]]);
if (be->length > 0) {
prev_offset = buf1 + be->offset;
memcpy(ctx->cbuf + pos, prev_offset, be->length);
@ -318,61 +338,101 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
prev_index = blk;
prev_length = be->length;
}
blkarr[blk] = htonl(be->length);
rabin_index[blk] = htonl(be->length);
} else {
if (be->refcount > 0) {
prev_index = 0;
prev_length = 0;
blkarr[blk] = htonl(be->length);
rabin_index[blk] = htonl(be->length);
} else {
if (prev_length + be->length <= RAB_POLYNOMIAL_MAX_BLOCK_SIZE) {
prev_length += be->length;
blkarr[prev_index] = htonl(prev_length);
blkarr[blk] = 0;
rabin_index[prev_index] = htonl(prev_length);
rabin_index[blk] = 0;
} else {
prev_index = 0;
prev_length = 0;
blkarr[blk] = htonl(be->length);
rabin_index[blk] = htonl(be->length);
}
}
}
} else {
prev_index = 0;
prev_length = 0;
blkarr[blk] = htonl(RAB_POLYNOMIAL_MAX_BLOCK_SIZE + be->index + 1);
rabin_index[blk] = htonl(RAB_POLYNOMIAL_MAX_BLOCK_SIZE + be->index + 1);
}
}
cont:
if (valid) {
*((unsigned int *)(ctx->cbuf)) = htonl(blknum);
*((ssize_t *)(ctx->cbuf + sizeof (unsigned int))) = htonll(*size);
uchar_t *cbuf = ctx->cbuf;
ssize_t *entries;
*((unsigned int *)cbuf) = htonl(blknum);
cbuf += sizeof (unsigned int);
entries = (ssize_t *)cbuf;
entries[0] = htonll(*size);
entries[1] = 0;
entries[2] = htonll(pos - rabin_index_sz - RABIN_HDR_SIZE);
*size = pos;
ctx->valid = 1;
/*
* Remaining header entries: size of compressed index and size of
* compressed data are inserted later via rabin_update_hdr, after actual compression!
*/
return (rabin_index_sz);
}
}
return (0);
}
void
rabin_update_hdr(uchar_t *buf, ssize_t rabin_index_sz_cmp, ssize_t rabin_data_sz_cmp)
{
ssize_t *entries;
buf += sizeof (unsigned int);
entries = (ssize_t *)buf;
entries[1] = htonll(rabin_index_sz_cmp);
entries[3] = htonll(rabin_data_sz_cmp);
}
void
rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz,
ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp,
ssize_t *rabin_data_sz_cmp, ssize_t *rabin_deduped_size)
{
ssize_t *entries;
*blknum = ntohl(*((unsigned int *)(buf)));
buf += sizeof (unsigned int);
entries = (ssize_t *)buf;
*rabin_data_sz = ntohll(entries[0]);
*rabin_index_sz = (ssize_t)(*blknum) * RABIN_ENTRY_SIZE;
*rabin_index_sz_cmp = ntohll(entries[1]);
*rabin_deduped_size = ntohll(entries[2]);
*rabin_data_sz_cmp = ntohll(entries[3]);
}
void
rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size)
{
unsigned int blknum, blk, oblk, len;
unsigned int *blkarr;
ssize_t orig_size, sz;
ssize_t overhead, pos1, i;
unsigned int *rabin_index;
ssize_t data_sz, sz, indx_cmp, data_sz_cmp, deduped_sz;
ssize_t rabin_index_sz, pos1, i;
uchar_t *pos2;
blknum = ntohl(*((unsigned int *)(buf)));
orig_size = ntohll(*((ssize_t *)(buf + sizeof (unsigned int))));
blkarr = (unsigned int *)(buf + RABIN_HDR_SIZE);
overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE;
pos1 = overhead;
rabin_parse_hdr(buf, &blknum, &rabin_index_sz, &data_sz, &indx_cmp, &data_sz_cmp, &deduped_sz);
rabin_index = (unsigned int *)(buf + RABIN_HDR_SIZE);
pos1 = rabin_index_sz + RABIN_HDR_SIZE;
pos2 = ctx->cbuf;
sz = 0;
ctx->valid = 1;
for (blk = 0; blk < blknum; blk++) {
len = ntohl(blkarr[blk]);
len = ntohl(rabin_index[blk]);
if (len == 0) {
ctx->blocks[blk].length = 0;
ctx->blocks[blk].index = 0;
@ -399,14 +459,23 @@ rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size)
memcpy(pos2, buf + pos1, len);
pos2 += len;
sz += len;
if (sz > orig_size) {
if (sz > data_sz) {
ctx->valid = 0;
break;
}
}
if (ctx->valid && sz < orig_size) {
if (ctx->valid && sz < data_sz) {
ctx->valid = 0;
}
*size = orig_size;
*size = data_sz;
}
/*
* TODO: Consolidate rabin dedup and compression/decompression in functions here rather than
* messy code in main program.
int
rabin_compress(rabin_context_t *ctx, uchar_t *from, ssize_t fromlen, uchar_t *to, ssize_t *tolen,
int level, char chdr, void *data, compress_func_ptr cmp)
{
}
*/

View file

@ -71,7 +71,7 @@
#define RAB_POLYNOMIAL_AVG_BLOCK_SHIFT 12
#define RAB_POLYNOMIAL_AVG_BLOCK_SIZE (1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT)
#define RAB_POLYNOMIAL_AVG_BLOCK_MASK (RAB_POLYNOMIAL_AVG_BLOCK_SIZE - 1)
#define RAB_POLYNOMIAL_MIN_BLOCK_SIZE (4096)
#define RAB_POLYNOMIAL_MIN_BLOCK_SIZE RAB_POLYNOMIAL_AVG_BLOCK_SIZE
#define RAB_POLYNOMIAL_MAX_BLOCK_SIZE (128 * 1024)
#define RAB_POLYNOMIAL_WIN_SIZE 31
#define RAB_POLYNOMIAL_MIN_WIN_SIZE 17
@ -95,9 +95,9 @@ typedef struct {
#define RABIN_ENTRY_SIZE (sizeof (unsigned int))
// Header for a chunk deduped using Rabin
// Number of rabin blocks, size of original chunk
//
#define RABIN_HDR_SIZE (sizeof (unsigned int) + sizeof (ssize_t))
// Number of rabin blocks, size of original data chunk, size of compressed index,
// size of deduped data, size of compressed data
#define RABIN_HDR_SIZE (sizeof (unsigned int) + sizeof (ssize_t) + sizeof (ssize_t) + sizeof (ssize_t) + sizeof (ssize_t))
typedef struct {
unsigned char *current_window_data;
@ -110,13 +110,20 @@ typedef struct {
uint64_t block_checksum;
int dedup;
int valid;
void *lzma_data;
int level;
} rabin_context_t;
extern rabin_context_t *create_rabin_context(uint64_t chunksize);
extern void destroy_rabin_context(rabin_context_t *ctx);
extern void rabin_dedup(rabin_context_t *ctx, unsigned char *buf,
extern unsigned int rabin_dedup(rabin_context_t *ctx, unsigned char *buf,
ssize_t *size, ssize_t offset);
extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size);
extern void rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz,
ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp,
ssize_t *rabin_data_sz_cmp, ssize_t *rabin_deduped_size);
extern void rabin_update_hdr(uchar_t *buf, ssize_t rabin_index_sz_cmp,
ssize_t rabin_data_sz_cmp);
extern void reset_rabin_context(rabin_context_t *ctx);
#endif /* _RABIN_POLY_H_ */

14
utils.h
View file

@ -98,8 +98,18 @@ extern int parse_numeric(ssize_t *val, const char *str);
extern char *bytes_to_size(uint64_t bytes);
extern ssize_t Read(int fd, void *buf, size_t count);
extern ssize_t Write(int fd, const void *buf, size_t count);
extern ssize_t Dedup_Read(int fd, uchar_t **buf, size_t count,
ssize_t *rabin_count, void *ctx);
// extern ssize_t Dedup_Read(int fd, uchar_t **buf, size_t count,
// ssize_t *rabin_count, void *ctx);
/* Pointer type for compress and decompress functions. */
typedef int (*compress_func_ptr)(void *src, size_t srclen, void *dst,
size_t *destlen, int level, uchar_t chdr, void *data);
/* Pointer type for algo specific init/deinit/stats functions. */
typedef int (*init_func_ptr)(void **data, int *level, ssize_t chunksize);
typedef int (*deinit_func_ptr)(void **data);
typedef void (*stats_func_ptr)(int show);
/*
* Roundup v to the nearest power of 2. From Bit Twiddling Hacks:

View file

@ -75,7 +75,8 @@ void zerr(int ret)
}
int
zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data)
zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen,
int level, uchar_t chdr, void *data)
{
z_stream zs;
int ret;
@ -114,13 +115,14 @@ zlib_compress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, vo
}
int
zlib_decompress(void *src, size_t srclen, void *dst, size_t *dstlen, int level, void *data)
zlib_decompress(void *src, size_t srclen, void *dst, size_t *dstlen,
int level, uchar_t chdr, void *data)
{
z_stream zs;
int err;
bzero(&zs, sizeof (zs));
zs.next_in = (unsigned char *)src + CHDR_SZ;
zs.next_in = (unsigned char *)src;
zs.avail_in = srclen;
zs.next_out = dst;
zs.avail_out = *dstlen;