Techniques to better reduce Rabin Metadata.

Fix wrong chunk sizing with dedup enabled.
This commit is contained in:
Moinak Ghosh 2012-07-06 00:16:02 +05:30
parent 774384c204
commit f5ce45b16e
3 changed files with 91 additions and 48 deletions

15
main.c
View file

@ -434,7 +434,7 @@ start_decompress(const char *filename, const char *to_filename)
if (_init_func) if (_init_func)
_init_func(&(tdat->data), &(tdat->level), chunksize); _init_func(&(tdat->data), &(tdat->level), chunksize);
if (enable_rabin_scan) if (enable_rabin_scan)
tdat->rctx = create_rabin_context(chunksize, algo); tdat->rctx = create_rabin_context(chunksize, compressed_chunksize, algo);
else else
tdat->rctx = NULL; tdat->rctx = NULL;
if (pthread_create(&(tdat->thr), NULL, perform_decompress, if (pthread_create(&(tdat->thr), NULL, perform_decompress,
@ -602,7 +602,7 @@ redo:
rbytes = tdat->rbytes; rbytes = tdat->rbytes;
reset_rabin_context(tdat->rctx); reset_rabin_context(tdat->rctx);
rctx->cbuf = tdat->uncompressed_chunk; rctx->cbuf = tdat->uncompressed_chunk;
rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0); rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0, NULL);
if (!rctx->valid) { if (!rctx->valid) {
memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes); memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes);
tdat->rbytes = rbytes; tdat->rbytes = rbytes;
@ -630,13 +630,15 @@ redo:
tdat->rctx->level, 0, tdat->rctx->lzma_data); tdat->rctx->level, 0, tdat->rctx->lzma_data);
index_size_cmp += RABIN_HDR_SIZE; index_size_cmp += RABIN_HDR_SIZE;
rabin_index_sz += RABIN_HDR_SIZE;
if (rv == 0) { if (rv == 0) {
/* Compress data chunk. */ /* Compress data chunk. */
rv = tdat->compress(tdat->uncompressed_chunk + rabin_index_sz + RABIN_HDR_SIZE, rv = tdat->compress(tdat->uncompressed_chunk + rabin_index_sz,
_chunksize, compressed_chunk + index_size_cmp, &_chunksize, _chunksize, compressed_chunk + index_size_cmp, &_chunksize,
tdat->level, 0, tdat->data); tdat->level, 0, tdat->data);
/* Now update rabin header with the compressed sizes. */ /* Now update rabin header with the compressed sizes. */
rabin_update_hdr(compressed_chunk, index_size_cmp - RABIN_HDR_SIZE , _chunksize); rabin_update_hdr(compressed_chunk, index_size_cmp - RABIN_HDR_SIZE,
_chunksize);
} }
_chunksize += index_size_cmp; _chunksize += index_size_cmp;
} else { } else {
@ -881,6 +883,9 @@ start_compress(const char *filename, uint64_t chunksize, int level)
fprintf(stderr, "Out of memory\n"); fprintf(stderr, "Out of memory\n");
COMP_BAIL; COMP_BAIL;
} }
if (enable_rabin_scan)
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ);
else
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize);
if (!tdat->uncompressed_chunk) { if (!tdat->uncompressed_chunk) {
fprintf(stderr, "Out of memory\n"); fprintf(stderr, "Out of memory\n");
@ -897,7 +902,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
if (_init_func) if (_init_func)
_init_func(&(tdat->data), &(tdat->level), chunksize); _init_func(&(tdat->data), &(tdat->level), chunksize);
if (enable_rabin_scan) if (enable_rabin_scan)
tdat->rctx = create_rabin_context(chunksize, algo); tdat->rctx = create_rabin_context(chunksize, compressed_chunksize, algo);
else else
tdat->rctx = NULL; tdat->rctx = NULL;

View file

@ -89,7 +89,7 @@ uint32_t rabin_polynomial_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE;
* Initialize the algorithm with the default params. * Initialize the algorithm with the default params.
*/ */
rabin_context_t * rabin_context_t *
create_rabin_context(uint64_t chunksize, const char *algo) { create_rabin_context(uint64_t chunksize, uint64_t real_chunksize, const char *algo) {
rabin_context_t *ctx; rabin_context_t *ctx;
unsigned char *current_window_data; unsigned char *current_window_data;
uint32_t blknum, index; uint32_t blknum, index;
@ -149,6 +149,7 @@ create_rabin_context(uint64_t chunksize, const char *algo) {
*/ */
ctx->current_window_data = current_window_data; ctx->current_window_data = current_window_data;
ctx->real_chunksize = real_chunksize;
reset_rabin_context(ctx); reset_rabin_context(ctx);
return (ctx); return (ctx);
} }
@ -182,11 +183,11 @@ cmpblks(const void *a, const void *b)
rabin_blockentry_t *a1 = (rabin_blockentry_t *)a; rabin_blockentry_t *a1 = (rabin_blockentry_t *)a;
rabin_blockentry_t *b1 = (rabin_blockentry_t *)b; rabin_blockentry_t *b1 = (rabin_blockentry_t *)b;
if (a1->checksum < b1->checksum) if (a1->cksum_n_offset < b1->cksum_n_offset)
return (-1); return (-1);
else if (a1->checksum == b1->checksum) else if (a1->cksum_n_offset == b1->cksum_n_offset)
return (0); return (0);
else if (a1->checksum > b1->checksum) else if (a1->cksum_n_offset > b1->cksum_n_offset)
return (1); return (1);
} }
@ -195,18 +196,18 @@ cmpblks(const void *a, const void *b)
* the rolling checksum and dedup blocks vary in size from 4K-128K. * the rolling checksum and dedup blocks vary in size from 4K-128K.
*/ */
uint32_t uint32_t
rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, ssize_t *rabin_pos)
{ {
ssize_t i, last_offset,j; ssize_t i, last_offset,j;
uint32_t blknum; uint32_t blknum;
char *buf1 = (char *)buf; char *buf1 = (char *)buf;
uint32_t length; uint32_t length;
ssize_t rabin_index_sz;
length = offset; length = offset;
last_offset = 0; last_offset = 0;
blknum = 0; blknum = 0;
ctx->valid = 0; ctx->valid = 0;
ctx->cur_checksum = 0;
if (*size < ctx->rabin_poly_avg_block_size) return; if (*size < ctx->rabin_poly_avg_block_size) return;
for (i=offset; i<*size; i++) { for (i=offset; i<*size; i++) {
@ -241,7 +242,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
length >= rabin_polynomial_max_block_size) { length >= rabin_polynomial_max_block_size) {
ctx->blocks[blknum].offset = last_offset; ctx->blocks[blknum].offset = last_offset;
ctx->blocks[blknum].index = blknum; // Need to store for sorting ctx->blocks[blknum].index = blknum; // Need to store for sorting
ctx->blocks[blknum].checksum = ctx->cur_checksum; ctx->blocks[blknum].cksum_n_offset = ctx->cur_checksum;
ctx->blocks[blknum].length = length; ctx->blocks[blknum].length = length;
ctx->blocks[blknum].refcount = 0; ctx->blocks[blknum].refcount = 0;
blknum++; blknum++;
@ -251,20 +252,25 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
} }
} }
if (rabin_pos) {
*rabin_pos = last_offset;
return (0);
}
// If we found at least a few chunks, perform dedup. // If we found at least a few chunks, perform dedup.
if (blknum > 2) { if (blknum > 2) {
uint64_t prev_cksum; uint64_t prev_cksum;
uint32_t blk, prev_length, nblocks; uint32_t blk, prev_length;
ssize_t pos, matchlen; ssize_t pos, matchlen, pos1;
int valid = 1; int valid = 1;
char *tmp, *prev_offset; char *tmp, *prev_offset;
uint32_t *rabin_index, prev_index, prev_blk; uint32_t *blkarr, *trans, *rabin_index, prev_index, prev_blk;
ssize_t rabin_index_sz;
// Insert the last left-over trailing bytes, if any, into a block. // Insert the last left-over trailing bytes, if any, into a block.
if (last_offset < *size) { if (last_offset < *size) {
ctx->blocks[blknum].offset = last_offset; ctx->blocks[blknum].offset = last_offset;
ctx->blocks[blknum].index = blknum; ctx->blocks[blknum].index = blknum;
ctx->blocks[blknum].checksum = ctx->cur_checksum; ctx->blocks[blknum].cksum_n_offset = ctx->cur_checksum;
ctx->blocks[blknum].length = *size - last_offset; ctx->blocks[blknum].length = *size - last_offset;
ctx->blocks[blknum].refcount = 0; ctx->blocks[blknum].refcount = 0;
blknum++; blknum++;
@ -276,7 +282,6 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
prev_cksum = 0; prev_cksum = 0;
prev_length = 0; prev_length = 0;
prev_offset = 0; prev_offset = 0;
pos = rabin_index_sz + RABIN_HDR_SIZE;
/* /*
* Now sort the block array based on checksums. This will bring virtually * Now sort the block array based on checksums. This will bring virtually
@ -286,6 +291,13 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
*/ */
qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t), cmpblks); qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t), cmpblks);
rabin_index = (uint32_t *)(ctx->cbuf + RABIN_HDR_SIZE); rabin_index = (uint32_t *)(ctx->cbuf + RABIN_HDR_SIZE);
/*
* We need 2 temporary arrays. We just use available space in the last
* portion of the buffer that will hold the deduped segment.
*/
blkarr = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - (rabin_index_sz * 2 + 1));
trans = (uint32_t *)(ctx->cbuf + ctx->real_chunksize - (rabin_index_sz + 1));
matchlen = 0; matchlen = 0;
/* /*
@ -299,9 +311,9 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
* blocks. This helps in non-duplicate block merging later. * blocks. This helps in non-duplicate block merging later.
*/ */
for (blk = 0; blk < blknum; blk++) { for (blk = 0; blk < blknum; blk++) {
rabin_index[ctx->blocks[blk].index] = blk; blkarr[ctx->blocks[blk].index] = blk;
if (blk > 0 && ctx->blocks[blk].checksum == prev_cksum && if (blk > 0 && ctx->blocks[blk].cksum_n_offset == prev_cksum &&
ctx->blocks[blk].length == prev_length && ctx->blocks[blk].length == prev_length &&
memcmp(prev_offset, buf1 + ctx->blocks[blk].offset, prev_length) == 0) { memcmp(prev_offset, buf1 + ctx->blocks[blk].offset, prev_length) == 0) {
ctx->blocks[blk].length = 0; ctx->blocks[blk].length = 0;
@ -312,7 +324,7 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
} }
prev_offset = buf1 + ctx->blocks[blk].offset; prev_offset = buf1 + ctx->blocks[blk].offset;
prev_cksum = ctx->blocks[blk].checksum; prev_cksum = ctx->blocks[blk].cksum_n_offset;
prev_length = ctx->blocks[blk].length; prev_length = ctx->blocks[blk].length;
prev_index = ctx->blocks[blk].index; prev_index = ctx->blocks[blk].index;
prev_blk = blk; prev_blk = blk;
@ -325,29 +337,18 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
/* /*
* Another pass, this time through the block index in the chunk. We insert * Another pass, this time through the block index in the chunk. We insert
* block length into unique block entries. For block entries that are * block length into unique block entries. For block entries that are
* identical with another one we store the index number + max rabin block length. * identical with another one we store the index number with msb set.
* This way we can differentiate between a unique block length entry and a * This way we can differentiate between a unique block length entry and a
* pointer to another block without needing a separate flag. * pointer to another block without needing a separate flag.
*/ */
prev_index = 0; prev_index = 0;
prev_length = 0; prev_length = 0;
nblocks = 0; pos = 0;
for (blk = 0; blk < blknum; blk++) { for (blk = 0; blk < blknum; blk++) {
rabin_blockentry_t *be; rabin_blockentry_t *be;
/* be = &(ctx->blocks[blkarr[blk]]);
* If blocks are overflowing the allowed chunk size then dedup did not
* help at all. We invalidate the dedup operation.
*/
if (pos > last_offset) {
valid = 0;
break;
}
be = &(ctx->blocks[rabin_index[blk]]);
if (be->length > 0) { if (be->length > 0) {
prev_offset = buf1 + be->offset;
memcpy(ctx->cbuf + pos, prev_offset, be->length);
pos += be->length;
/* /*
* Update Index entry with the length. Also try to merge runs * Update Index entry with the length. Also try to merge runs
* of unique (non-duplicate) blocks into a single block entry * of unique (non-duplicate) blocks into a single block entry
@ -355,32 +356,67 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
*/ */
if (prev_index == 0) { if (prev_index == 0) {
if (be->refcount == 0) { if (be->refcount == 0) {
prev_index = blk; prev_index = pos;
prev_length = be->length; prev_length = be->length;
} }
rabin_index[blk] = htonl(be->length); rabin_index[pos] = be->length;
ctx->blocks[pos].cksum_n_offset = be->offset;
trans[blk] = pos;
pos++;
} else { } else {
if (be->refcount > 0) { if (be->refcount > 0) {
prev_index = 0; prev_index = 0;
prev_length = 0; prev_length = 0;
rabin_index[blk] = htonl(be->length); rabin_index[pos] = be->length;
ctx->blocks[pos].cksum_n_offset = be->offset;
trans[blk] = pos;
pos++;
} else { } else {
if (prev_length + be->length <= RABIN_MAX_BLOCK_SIZE) { if (prev_length + be->length <= RABIN_MAX_BLOCK_SIZE) {
prev_length += be->length; prev_length += be->length;
rabin_index[prev_index] = htonl(prev_length); rabin_index[prev_index] = prev_length;
rabin_index[blk] = 0;
nblocks++;
} else { } else {
prev_index = 0; prev_index = 0;
prev_length = 0; prev_length = 0;
rabin_index[blk] = htonl(be->length); rabin_index[pos] = be->length;
ctx->blocks[pos].cksum_n_offset = be->offset;
trans[blk] = pos;
pos++;
} }
} }
} }
} else { } else {
prev_index = 0; prev_index = 0;
prev_length = 0; prev_length = 0;
rabin_index[blk] = htonl(be->index | RABIN_INDEX_FLAG); blkarr[blk] = htonl(be->index | RABIN_INDEX_FLAG);
rabin_index[pos] = be->index | RABIN_INDEX_FLAG;
trans[blk] = pos;
pos++;
}
}
/*
* Final pass, copy the data.
*/
blknum = pos;
rabin_index_sz = (ssize_t)pos * RABIN_ENTRY_SIZE;
pos1 = rabin_index_sz + RABIN_HDR_SIZE;
for (blk = 0; blk < blknum; blk++) {
if (rabin_index[blk] & RABIN_INDEX_FLAG) {
j = rabin_index[blk] & RABIN_INDEX_VALUE;
rabin_index[blk] = htonl(trans[j] | RABIN_INDEX_FLAG);
} else {
/*
* If blocks are overflowing the allowed chunk size then dedup did not
* help at all. We invalidate the dedup operation.
*/
if (pos1 > last_offset) {
valid = 0;
break;
}
memcpy(ctx->cbuf + pos1, buf1 + ctx->blocks[blk].cksum_n_offset, rabin_index[blk]);
pos1 += rabin_index[blk];
rabin_index[blk] = htonl(rabin_index[blk]);
} }
} }
cont: cont:
@ -393,9 +429,10 @@ cont:
entries = (ssize_t *)cbuf; entries = (ssize_t *)cbuf;
entries[0] = htonll(*size); entries[0] = htonll(*size);
entries[1] = 0; entries[1] = 0;
entries[2] = htonll(pos - rabin_index_sz - RABIN_HDR_SIZE); entries[2] = htonll(pos1 - rabin_index_sz - RABIN_HDR_SIZE);
*size = pos; *size = pos1;
ctx->valid = 1; ctx->valid = 1;
/* /*
* Remaining header entries: size of compressed index and size of * Remaining header entries: size of compressed index and size of
* compressed data are inserted later via rabin_update_hdr, after actual compression! * compressed data are inserted later via rabin_update_hdr, after actual compression!

View file

@ -85,7 +85,7 @@
typedef struct { typedef struct {
ssize_t offset; ssize_t offset;
uint64_t checksum; uint64_t cksum_n_offset; // Dual purpose variable
unsigned int index; unsigned int index;
unsigned int length; unsigned int length;
unsigned short refcount; unsigned short refcount;
@ -131,16 +131,17 @@ typedef struct {
uint32_t rabin_poly_min_block_size; uint32_t rabin_poly_min_block_size;
uint32_t rabin_poly_avg_block_size; uint32_t rabin_poly_avg_block_size;
uint32_t rabin_avg_block_mask; uint32_t rabin_avg_block_mask;
uint64_t real_chunksize;
int dedup; int dedup;
int valid; int valid;
void *lzma_data; void *lzma_data;
int level; int level;
} rabin_context_t; } rabin_context_t;
extern rabin_context_t *create_rabin_context(uint64_t chunksize, const char *algo); extern rabin_context_t *create_rabin_context(uint64_t chunksize, uint64_t real_chunksize, const char *algo);
extern void destroy_rabin_context(rabin_context_t *ctx); extern void destroy_rabin_context(rabin_context_t *ctx);
extern unsigned int rabin_dedup(rabin_context_t *ctx, unsigned char *buf, extern unsigned int rabin_dedup(rabin_context_t *ctx, unsigned char *buf,
ssize_t *size, ssize_t offset); ssize_t *size, ssize_t offset, ssize_t *rabin_pos);
extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size); extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size);
extern void rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz, extern void rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz,
ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp, ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp,