Implement ability to partition chunks at the last rabin boundary instead of fixed size.

This commit is contained in:
Moinak Ghosh 2012-07-08 21:44:08 +05:30
parent d3f5287ee5
commit 010f49f412
4 changed files with 105 additions and 22 deletions

40
main.c
View file

@ -78,6 +78,7 @@ static int nthreads = 0;
static int hide_mem_stats = 1;
static int hide_cmp_stats = 1;
static int enable_rabin_scan = 0;
static int enable_rabin_split = 1;
static unsigned int chunk_num;
static uint64_t largest_chunk, smallest_chunk, avg_chunk;
static const char *exec_name;
@ -116,6 +117,7 @@ usage(void)
" %s -p ...\n"
"4) Attempt Rabin fingerprinting based deduplication on chunks:\n"
" %s -D ...\n"
" %s -D -r ... - Do NOT split chunks at a rabin boundary. Default is to split.\n"
"5) Number of threads can optionally be specified: -t <1 - 256 count>\n"
"6) Pass '-M' to display memory allocator statistics\n"
"7) Pass '-C' to display compression statistics\n\n",
@ -602,7 +604,7 @@ redo:
rbytes = tdat->rbytes;
reset_rabin_context(tdat->rctx);
rctx->cbuf = tdat->uncompressed_chunk;
rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0);
rabin_index_sz = rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0, NULL);
if (!rctx->valid) {
memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes);
tdat->rbytes = rbytes;
@ -753,7 +755,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
char tmpfile1[MAXPATHLEN];
char to_filename[MAXPATHLEN];
ssize_t compressed_chunksize;
ssize_t n_chunksize, rbytes;
ssize_t n_chunksize, rbytes, rabin_count;
short version, flags;
struct stat sbuf;
int compfd = -1, uncompfd = -1, err;
@ -762,6 +764,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
struct cmp_data **dary = NULL, *tdat;
pthread_t writer_thr;
uchar_t *cread_buf, *pos;
rabin_context_t *rctx;
/*
* Compressed buffer size must include zlib scratch space and
@ -956,11 +959,17 @@ start_compress(const char *filename, uint64_t chunksize, int level)
largest_chunk = 0;
smallest_chunk = chunksize;
avg_chunk = 0;
rabin_count = 0;
/*
* Read the first chunk into a spare buffer (a simple double-buffering).
*/
rbytes = Read(uncompfd, cread_buf, chunksize);
if (enable_rabin_split) {
rctx = create_rabin_context(chunksize, 0, algo);
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx);
} else {
rbytes = Read(uncompfd, cread_buf, chunksize);
}
while (!bail) {
uchar_t *tmp;
@ -990,6 +999,18 @@ start_compress(const char *filename, uint64_t chunksize, int level)
tdat->cmp_seg = cread_buf;
cread_buf = tmp;
tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t);
/*
* If there is data after the last rabin boundary in the chunk, then
* rabin_count will be non-zero. We carry over the data to the beginning
* of the next chunk.
*/
if (rabin_count) {
memcpy(cread_buf,
tdat->cmp_seg + rabin_count, rbytes - rabin_count);
tdat->rbytes = rabin_count;
rabin_count = rbytes - rabin_count;
}
} else {
tmp = tdat->uncompressed_chunk;
tdat->uncompressed_chunk = cread_buf;
@ -1014,7 +1035,11 @@ start_compress(const char *filename, uint64_t chunksize, int level)
* Read the next buffer we want to process while previous
* buffer is in progress.
*/
rbytes = Read(uncompfd, cread_buf, chunksize);
if (enable_rabin_split) {
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx);
} else {
rbytes = Read(uncompfd, cread_buf, chunksize);
}
}
}
@ -1088,6 +1113,7 @@ comp_done:
}
slab_free(NULL, dary);
}
if (enable_rabin_split) destroy_rabin_context(rctx);
slab_free(NULL, cread_buf);
if (!pipe_mode) {
if (compfd != -1) close(compfd);
@ -1177,7 +1203,7 @@ main(int argc, char *argv[])
level = 6;
slab_init();
while ((opt = getopt(argc, argv, "dc:s:l:pt:MCD")) != -1) {
while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDr")) != -1) {
int ovr;
switch (opt) {
@ -1233,6 +1259,10 @@ main(int argc, char *argv[])
enable_rabin_scan = 1;
break;
case 'r':
enable_rabin_split = 0;
break;
case '?':
default:
usage();

View file

@ -123,21 +123,27 @@ create_rabin_context(uint64_t chunksize, uint64_t real_chunksize, const char *al
return (NULL);
}
current_window_data = slab_alloc(NULL, RAB_POLYNOMIAL_WIN_SIZE);
ctx->blocks = (rabin_blockentry_t *)slab_alloc(NULL,
blknum * ctx->rabin_poly_min_block_size);
if(ctx == NULL || current_window_data == NULL || ctx->blocks == NULL) {
ctx->blocks = NULL;
if (real_chunksize > 0) {
ctx->blocks = (rabin_blockentry_t *)slab_alloc(NULL,
blknum * ctx->rabin_poly_min_block_size);
}
if(ctx == NULL || current_window_data == NULL || (ctx->blocks == NULL && real_chunksize > 0)) {
fprintf(stderr,
"Could not allocate rabin polynomial context, out of memory\n");
destroy_rabin_context(ctx);
return (NULL);
}
lzma_init(&(ctx->lzma_data), &(ctx->level), chunksize);
if (!(ctx->lzma_data)) {
fprintf(stderr,
"Could not allocate rabin polynomial context, out of memory\n");
destroy_rabin_context(ctx);
return (NULL);
ctx->lzma_data = NULL;
if (real_chunksize > 0) {
lzma_init(&(ctx->lzma_data), &(ctx->level), chunksize);
if (!(ctx->lzma_data)) {
fprintf(stderr,
"Could not allocate rabin polynomial context, out of memory\n");
destroy_rabin_context(ctx);
return (NULL);
}
}
/*
* We should compute the power for the window size.
@ -198,7 +204,7 @@ cmpblks(const void *a, const void *b)
* the rolling checksum and dedup blocks vary in size from 4K-128K.
*/
uint32_t
rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset, ssize_t *rabin_pos)
{
ssize_t i, last_offset,j;
uint32_t blknum;
@ -211,6 +217,14 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
ctx->valid = 0;
ctx->cur_checksum = 0;
/*
* If rabin_pos is non-zero then we are being asked to scan for the last rabin boundary
* in the chunk. We start scanning at chunk end - max rabin block size. We avoid doing
* a full chunk scan.
*/
if (rabin_pos) {
offset = *size - RAB_POLYNOMIAL_MAX_BLOCK_SIZE;
}
if (*size < ctx->rabin_poly_avg_block_size) return;
for (i=offset; i<*size; i++) {
char cur_byte = buf1[i];
@ -241,18 +255,24 @@ rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
// If we hit our special value or reached the max block size update block offset
if ((ctx->cur_roll_checksum & ctx->rabin_avg_block_mask) == ctx->rabin_break_patt ||
length >= rabin_polynomial_max_block_size) {
ctx->blocks[blknum].offset = last_offset;
ctx->blocks[blknum].index = blknum; // Need to store for sorting
ctx->blocks[blknum].cksum_n_offset = ctx->cur_checksum;
ctx->blocks[blknum].length = length;
ctx->blocks[blknum].refcount = 0;
blknum++;
if (rabin_pos == NULL) {
ctx->blocks[blknum].offset = last_offset;
ctx->blocks[blknum].index = blknum; // Need to store for sorting
ctx->blocks[blknum].cksum_n_offset = ctx->cur_checksum;
ctx->blocks[blknum].length = length;
ctx->blocks[blknum].refcount = 0;
blknum++;
}
ctx->cur_checksum = 0;
last_offset = i+1;
length = 0;
}
}
if (rabin_pos && last_offset < *size) {
*rabin_pos = last_offset;
return (0);
}
// If we found at least a few chunks, perform dedup.
if (blknum > 2) {
uint64_t prev_cksum;

View file

@ -144,7 +144,7 @@ extern rabin_context_t *create_rabin_context(uint64_t chunksize, uint64_t real_c
const char *algo);
extern void destroy_rabin_context(rabin_context_t *ctx);
extern unsigned int rabin_dedup(rabin_context_t *ctx, unsigned char *buf,
ssize_t *size, ssize_t offset);
ssize_t *size, ssize_t offset, ssize_t *rabin_pos);
extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size);
extern void rabin_parse_hdr(uchar_t *buf, unsigned int *blknum, ssize_t *rabin_index_sz,
ssize_t *rabin_data_sz, ssize_t *rabin_index_sz_cmp,

33
utils.c
View file

@ -194,6 +194,39 @@ Read(int fd, void *buf, size_t count)
return (count - rem);
}
/*
* Read the requested chunk and return the last rabin boundary in the chunk.
* This helps in splitting chunks at rabin boundaries rather than fixed points.
* The request buffer may have some data at the beginning carried over from
* after the previous rabin boundary.
*/
ssize_t
Read_Adjusted(int fd, uchar_t *buf, size_t count, ssize_t *rabin_count, void *ctx)
{
char *buf2;
ssize_t rcount;
rabin_context_t *rctx = (rabin_context_t *)ctx;
if (!ctx) return (Read(fd, buf, count));
buf2 = buf;
if (*rabin_count) {
buf2 = (char *)buf + *rabin_count;
count -= *rabin_count;
}
rcount = Read(fd, buf2, count);
if (rcount > 0) {
rcount += *rabin_count;
if (!rcount < count)
rabin_dedup(rctx, buf, &rcount, *rabin_count, rabin_count);
else
*rabin_count = 0;
} else {
if (rcount == 0) rcount = *rabin_count;
*rabin_count = 0;
}
return (rcount);
}
ssize_t
Write(int fd, const void *buf, size_t count)
{