Implement Deduplication based on Rabin Fingerprinting: work in progress.

Fix bug that prevented pipe mode from being used.
Allow building without specialized allocator.
Use basic optimize flag in debuig build.
This commit is contained in:
Moinak Ghosh 2012-06-29 18:23:55 +05:30
parent 8f5f531967
commit cbf9728278
8 changed files with 477 additions and 122 deletions

View file

@ -55,8 +55,11 @@ LDLIBS = -ldl -lbz2 $(ZLIB_DIR) -lz -lm
ifdef DEBUG ifdef DEBUG
LINK = g++ -m64 -pthread -msse3 LINK = g++ -m64 -pthread -msse3
COMPILE = gcc -m64 -g -msse3 -c COMPILE = gcc -m64 -O -g -msse3 -c
COMPILE_cpp = g++ -m64 -g -msse3 -c COMPILE_cpp = g++ -m64 -O -g -msse3 -c
ifdef DEBUG_NO_SLAB
CPPFLAGS += -DDEBUG_NO_SLAB
endif
else else
LINK = g++ -m64 -pthread -msse3 LINK = g++ -m64 -pthread -msse3
COMPILE = gcc -m64 -O3 -msse3 -c COMPILE = gcc -m64 -O3 -msse3 -c

View file

@ -48,6 +48,7 @@
#include "utils.h" #include "utils.h"
#include "allocator.h" #include "allocator.h"
#ifndef DEBUG_NO_SLAB
/* /*
* Number of slabs: * Number of slabs:
* 256 bytes to 1M in power of 2 steps: 13 * 256 bytes to 1M in power of 2 steps: 13
@ -482,7 +483,37 @@ slab_free(void *p, void *address)
pthread_mutex_unlock(&hbucket_locks[hindx]); pthread_mutex_unlock(&hbucket_locks[hindx]);
free(address); free(address);
fprintf(stderr, "Freed buf(%p) not in slab allocations!\n", address); fprintf(stderr, "Freed buf(%p) not in slab allocations!\n", address);
abort();
fflush(stderr); fflush(stderr);
} }
} }
#else
void
slab_init() {}
void
slab_cleanup(int quiet) {}
void
*slab_alloc(void *p, size_t size)
{
return (malloc(size));
}
void
*slab_calloc(void *p, size_t items, size_t size)
{
return (calloc(items, size));
}
void
slab_free(void *p, void *address)
{
free(address);
}
int
slab_cache_add(size_t size) {}
#endif

207
main.c
View file

@ -58,6 +58,7 @@ struct wdata {
struct cmp_data **dary; struct cmp_data **dary;
int wfd; int wfd;
int nprocs; int nprocs;
ssize_t chunksize;
}; };
@ -113,12 +114,7 @@ usage(void)
" %s -d <compressed file> <target file>\n" " %s -d <compressed file> <target file>\n"
"3) To operate as a pipe, read from stdin and write to stdout:\n" "3) To operate as a pipe, read from stdin and write to stdout:\n"
" %s -p ...\n" " %s -p ...\n"
"4) To use Rabin Fingerprinting to adjust chunk boundaries:\n" "4) Rabin Deduplication: Work in progress.\n"
" %s -r -c ...\n"
" In this case <chunk_size> will specify the max chunk size and chunks\n"
" will be variable-length delimited at the rabin boundary closest to\n"
" <chunk_size> bytes. This should improve chunked compression.\n"
" This option is obviously valid only when compressing.\n"
"5) Number of threads can optionally be specified: -t <1 - 256 count>\n" "5) Number of threads can optionally be specified: -t <1 - 256 count>\n"
"6) Pass '-M' to display memory allocator statistics\n" "6) Pass '-M' to display memory allocator statistics\n"
"7) Pass '-C' to display compression statistics\n\n", "7) Pass '-C' to display compression statistics\n\n",
@ -185,6 +181,7 @@ redo:
rseg = tdat->compressed_chunk + tdat->rbytes; rseg = tdat->compressed_chunk + tdat->rbytes;
_chunksize = ntohll(*((ssize_t *)rseg)); _chunksize = ntohll(*((ssize_t *)rseg));
} }
if (HDR & COMPRESSED) { if (HDR & COMPRESSED) {
rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk, rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk,
&_chunksize, tdat->level, tdat->data); &_chunksize, tdat->level, tdat->data);
@ -193,6 +190,28 @@ redo:
} }
tdat->len_cmp = _chunksize; tdat->len_cmp = _chunksize;
/* Rebuild chunk from dedup blocks. */
if (enable_rabin_scan && (HDR & FLAG_DEDUP)) {
rabin_context_t *rctx;
uchar_t *tmp;
rctx = tdat->rctx;
reset_rabin_context(tdat->rctx);
rctx->cbuf = tdat->compressed_chunk;
rabin_inverse_dedup(rctx, tdat->uncompressed_chunk, &(tdat->len_cmp));
if (!rctx->valid) {
fprintf(stderr, "ERROR: Chunk %d, dedup recovery failed.\n", tdat->id);
rv = -1;
tdat->len_cmp = 0;
goto cont;
}
_chunksize = tdat->len_cmp;
tmp = tdat->uncompressed_chunk;
tdat->uncompressed_chunk = tdat->compressed_chunk;
tdat->compressed_chunk = tmp;
tdat->cmp_seg = tdat->uncompressed_chunk;
}
if (rv == -1) { if (rv == -1) {
tdat->len_cmp = 0; tdat->len_cmp = 0;
fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id); fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id);
@ -221,9 +240,10 @@ cont:
* ---------------------- * ----------------------
* File Header: * File Header:
* Algorithm string: 8 bytes. * Algorithm string: 8 bytes.
* Version number: 4 bytes. * Version number: 2 bytes.
* Global Flags: 2 bytes.
* Chunk size: 8 bytes. * Chunk size: 8 bytes.
* Compression Level: 4 bytes; * Compression Level: 4 bytes.
* *
* Chunk Header: * Chunk Header:
* Compressed length: 8 bytes. * Compressed length: 8 bytes.
@ -232,12 +252,15 @@ cont:
* *
* Chunk Flags, 8 bits: * Chunk Flags, 8 bits:
* I I I I I I I I * I I I I I I I I
* | | | | * | | | | |
* | | | `- 0 - Uncompressed * | '-----' | `- 0 - Uncompressed
* | | | 1 - Compressed * | | | 1 - Compressed
* | | | * | | |
* | | `------------- 1 - Bzip2 (Adaptive Mode) * | | `---- 1 - Chunk was Deduped
* | `---------------- 1 - Lzma (Adaptive Mode) * | |
* | | 1 - Bzip2 (Adaptive Mode)
* | `---------------- 2 - Lzma (Adaptive Mode)
* | 3 - PPMD (Adaptive Mode)
* | * |
* `---------------------- 1 - Last Chunk flag * `---------------------- 1 - Last Chunk flag
* *
@ -255,12 +278,16 @@ start_decompress(const char *filename, const char *to_filename)
struct wdata w; struct wdata w;
int compfd = -1, i, p; int compfd = -1, i, p;
int uncompfd = -1, err, np, bail; int uncompfd = -1, err, np, bail;
int version, nprocs, thread = 0, level; int nprocs, thread = 0, level;
short version, flags;
ssize_t chunksize, compressed_chunksize; ssize_t chunksize, compressed_chunksize;
struct cmp_data **dary, *tdat; struct cmp_data **dary, *tdat;
pthread_t writer_thr; pthread_t writer_thr;
err = 0; err = 0;
flags = 0;
thread = 0;
/* /*
* Open files and do sanity checks. * Open files and do sanity checks.
*/ */
@ -303,17 +330,19 @@ start_decompress(const char *filename, const char *to_filename)
} }
if (Read(compfd, &version, sizeof (version)) < sizeof (version) || if (Read(compfd, &version, sizeof (version)) < sizeof (version) ||
Read(compfd, &flags, sizeof (flags)) < sizeof (flags) ||
Read(compfd, &chunksize, sizeof (chunksize)) < sizeof (chunksize) || Read(compfd, &chunksize, sizeof (chunksize)) < sizeof (chunksize) ||
Read(compfd, &level, sizeof (level)) < sizeof (level)) { Read(compfd, &level, sizeof (level)) < sizeof (level)) {
perror("Read: "); perror("Read: ");
UNCOMP_BAIL; UNCOMP_BAIL;
} }
version = ntohl(version); version = ntohs(version);
flags = ntohs(flags);
chunksize = ntohll(chunksize); chunksize = ntohll(chunksize);
level = ntohl(level); level = ntohl(level);
if (version != 1) { if (version != VERSION) {
fprintf(stderr, "Unsupported version: %d\n", version); fprintf(stderr, "Unsupported version: %d\n", version);
err = 1; err = 1;
goto uncomp_done; goto uncomp_done;
@ -322,6 +351,10 @@ start_decompress(const char *filename, const char *to_filename)
compressed_chunksize = chunksize + (chunksize >> 6) + sizeof (uint64_t) compressed_chunksize = chunksize + (chunksize >> 6) + sizeof (uint64_t)
+ sizeof (chunksize); + sizeof (chunksize);
if (flags & FLAG_DEDUP) {
enable_rabin_scan = 1;
}
if (nthreads == 0) if (nthreads == 0)
nprocs = sysconf(_SC_NPROCESSORS_ONLN); nprocs = sysconf(_SC_NPROCESSORS_ONLN);
else else
@ -346,7 +379,11 @@ start_decompress(const char *filename, const char *to_filename)
fprintf(stderr, "Out of memory\n"); fprintf(stderr, "Out of memory\n");
UNCOMP_BAIL; UNCOMP_BAIL;
} }
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); if (enable_rabin_scan)
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL,
compressed_chunksize + CHDR_SZ);
else
tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize);
if (!tdat->uncompressed_chunk) { if (!tdat->uncompressed_chunk) {
fprintf(stderr, "Out of memory\n"); fprintf(stderr, "Out of memory\n");
UNCOMP_BAIL; UNCOMP_BAIL;
@ -362,6 +399,10 @@ start_decompress(const char *filename, const char *to_filename)
sem_init(&(tdat->write_done_sem), 0, 1); sem_init(&(tdat->write_done_sem), 0, 1);
if (_init_func) if (_init_func)
_init_func(&(tdat->data), &(tdat->level), chunksize); _init_func(&(tdat->data), &(tdat->level), chunksize);
if (enable_rabin_scan)
tdat->rctx = create_rabin_context(chunksize);
else
tdat->rctx = NULL;
if (pthread_create(&(tdat->thr), NULL, perform_decompress, if (pthread_create(&(tdat->thr), NULL, perform_decompress,
(void *)tdat) != 0) { (void *)tdat) != 0) {
perror("Error in thread creation: "); perror("Error in thread creation: ");
@ -373,6 +414,7 @@ start_decompress(const char *filename, const char *to_filename)
w.dary = dary; w.dary = dary;
w.wfd = uncompfd; w.wfd = uncompfd;
w.nprocs = nprocs; w.nprocs = nprocs;
w.chunksize = chunksize;
if (pthread_create(&writer_thr, NULL, writer_thread, (void *)(&w)) != 0) { if (pthread_create(&writer_thr, NULL, writer_thread, (void *)(&w)) != 0) {
perror("Error in thread creation: "); perror("Error in thread creation: ");
UNCOMP_BAIL; UNCOMP_BAIL;
@ -480,6 +522,9 @@ uncomp_done:
slab_free(NULL, dary[i]->compressed_chunk); slab_free(NULL, dary[i]->compressed_chunk);
if (_deinit_func) if (_deinit_func)
_deinit_func(&(tdat->data)); _deinit_func(&(tdat->data));
if (enable_rabin_scan) {
destroy_rabin_context(dary[i]->rctx);
}
slab_free(NULL, dary[i]); slab_free(NULL, dary[i]);
} }
slab_free(NULL, dary); slab_free(NULL, dary);
@ -507,11 +552,31 @@ redo:
return (0); return (0);
} }
/* /* Perform Dedup if enabled. */
* Compute checksum of original uncompressed chunk. if (enable_rabin_scan) {
*/ rabin_context_t *rctx;
tdat->crc64 = lzma_crc64(tdat->uncompressed_chunk, tdat->rbytes, 0); ssize_t rbytes;
/*
* Compute checksum of original uncompressed chunk.
*/
tdat->crc64 = lzma_crc64(tdat->cmp_seg, tdat->rbytes, 0);
rctx = tdat->rctx;
rbytes = tdat->rbytes;
reset_rabin_context(tdat->rctx);
rctx->cbuf = tdat->uncompressed_chunk;
rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0);
if (!rctx->valid) {
memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes);
tdat->rbytes = rbytes;
}
} else {
/*
* Compute checksum of original uncompressed chunk.
*/
tdat->crc64 = lzma_crc64(tdat->uncompressed_chunk, tdat->rbytes, 0);
}
_chunksize = tdat->rbytes; _chunksize = tdat->rbytes;
rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes, rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes,
tdat->compressed_chunk + CHDR_SZ, &_chunksize, tdat->level, tdat->compressed_chunk + CHDR_SZ, &_chunksize, tdat->level,
@ -533,6 +598,9 @@ redo:
type = COMPRESSED; type = COMPRESSED;
} }
if (enable_rabin_scan && tdat->rctx->valid) {
type |= CHUNK_FLAG_DEDUP;
}
/* /*
* Insert compressed chunk length and CRC64 checksum into * Insert compressed chunk length and CRC64 checksum into
* chunk header. * chunk header.
@ -548,7 +616,7 @@ redo:
type |= (rv << 4); type |= (rv << 4);
/* /*
* If last chunk is less than chunksize, store this length as well. * If chunk is less than max chunksize, store this length as well.
*/ */
if (tdat->rbytes < tdat->chunksize) { if (tdat->rbytes < tdat->chunksize) {
type |= CHSIZE_MASK; type |= CHSIZE_MASK;
@ -615,15 +683,15 @@ do_cancel:
*/ */
#define COMP_BAIL err = 1; goto comp_done #define COMP_BAIL err = 1; goto comp_done
static void void
start_compress(const char *filename, uint64_t chunksize, int level) start_compress(const char *filename, uint64_t chunksize, int level)
{ {
struct wdata w; struct wdata w;
char tmpfile[MAXPATHLEN]; char tmpfile1[MAXPATHLEN];
char to_filename[MAXPATHLEN]; char to_filename[MAXPATHLEN];
ssize_t compressed_chunksize; ssize_t compressed_chunksize;
ssize_t n_chunksize, rbytes, rabin_count; ssize_t n_chunksize, rbytes;
int version; short version, flags;
struct stat sbuf; struct stat sbuf;
int compfd = -1, uncompfd = -1, err; int compfd = -1, uncompfd = -1, err;
int i, thread = 0, bail; int i, thread = 0, bail;
@ -647,11 +715,14 @@ start_compress(const char *filename, uint64_t chunksize, int level)
compressed_chunksize = chunksize + (chunksize >> 6) + compressed_chunksize = chunksize + (chunksize >> 6) +
sizeof (chunksize) + sizeof (uint64_t) + sizeof (chunksize); sizeof (chunksize) + sizeof (uint64_t) + sizeof (chunksize);
err = 0; err = 0;
flags = 0;
thread = 0;
slab_cache_add(chunksize);
slab_cache_add(compressed_chunksize + CHDR_SZ);
slab_cache_add(sizeof (struct cmp_data));
if (enable_rabin_scan) { if (enable_rabin_scan) {
rctx = create_rabin_context(); flags |= FLAG_DEDUP;
if (rctx == NULL)
err_exit(0, "Initializing Rabin Polynomial failed\n");
} }
/* A host of sanity checks. */ /* A host of sanity checks. */
@ -687,11 +758,11 @@ start_compress(const char *filename, uint64_t chunksize, int level)
* the end. The target file name is same as original file with the '.pz' * the end. The target file name is same as original file with the '.pz'
* extension appended. * extension appended.
*/ */
strcpy(tmpfile, filename); strcpy(tmpfile1, filename);
strcpy(tmpfile, dirname(tmpfile)); strcpy(tmpfile1, dirname(tmpfile1));
strcat(tmpfile, "/.pcompXXXXXX"); strcat(tmpfile1, "/.pcompXXXXXX");
snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename); snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename);
if ((compfd = mkstemp(tmpfile)) == -1) { if ((compfd = mkstemp(tmpfile1)) == -1) {
perror("mkstemp "); perror("mkstemp ");
COMP_BAIL; COMP_BAIL;
} }
@ -717,12 +788,12 @@ start_compress(const char *filename, uint64_t chunksize, int level)
nprocs = nthreads; nprocs = nthreads;
fprintf(stderr, "Scaling to %d threads\n", nprocs); fprintf(stderr, "Scaling to %d threads\n", nprocs);
slab_cache_add(chunksize);
slab_cache_add(compressed_chunksize + CHDR_SZ);
slab_cache_add(sizeof (struct cmp_data));
dary = (struct cmp_data **)slab_alloc(NULL, sizeof (struct cmp_data *) * nprocs); dary = (struct cmp_data **)slab_alloc(NULL, sizeof (struct cmp_data *) * nprocs);
cread_buf = (uchar_t *)slab_alloc(NULL, chunksize); if (enable_rabin_scan)
cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ);
else
cread_buf = (uchar_t *)slab_alloc(NULL, chunksize);
if (!cread_buf) { if (!cread_buf) {
fprintf(stderr, "Out of memory\n"); fprintf(stderr, "Out of memory\n");
COMP_BAIL; COMP_BAIL;
@ -756,6 +827,10 @@ start_compress(const char *filename, uint64_t chunksize, int level)
sem_init(&(tdat->write_done_sem), 0, 1); sem_init(&(tdat->write_done_sem), 0, 1);
if (_init_func) if (_init_func)
_init_func(&(tdat->data), &(tdat->level), chunksize); _init_func(&(tdat->data), &(tdat->level), chunksize);
if (enable_rabin_scan)
tdat->rctx = create_rabin_context(chunksize);
else
tdat->rctx = NULL;
if (pthread_create(&(tdat->thr), NULL, perform_compress, if (pthread_create(&(tdat->thr), NULL, perform_compress,
(void *)tdat) != 0) { (void *)tdat) != 0) {
@ -779,12 +854,15 @@ start_compress(const char *filename, uint64_t chunksize, int level)
*/ */
memset(cread_buf, 0, ALGO_SZ); memset(cread_buf, 0, ALGO_SZ);
strncpy(cread_buf, algo, ALGO_SZ); strncpy(cread_buf, algo, ALGO_SZ);
version = htonl(VERSION); version = htons(VERSION);
flags = htons(flags);
n_chunksize = htonll(chunksize); n_chunksize = htonll(chunksize);
level = htonl(level); level = htonl(level);
pos = cread_buf + ALGO_SZ; pos = cread_buf + ALGO_SZ;
memcpy(pos, &version, sizeof (version)); memcpy(pos, &version, sizeof (version));
pos += sizeof (version); pos += sizeof (version);
memcpy(pos, &flags, sizeof (flags));
pos += sizeof (flags);
memcpy(pos, &n_chunksize, sizeof (n_chunksize)); memcpy(pos, &n_chunksize, sizeof (n_chunksize));
pos += sizeof (n_chunksize); pos += sizeof (n_chunksize);
memcpy(pos, &level, sizeof (level)); memcpy(pos, &level, sizeof (level));
@ -808,8 +886,8 @@ start_compress(const char *filename, uint64_t chunksize, int level)
/* /*
* Read the first chunk into a spare buffer (a simple double-buffering). * Read the first chunk into a spare buffer (a simple double-buffering).
*/ */
rabin_count = 0; rbytes = Read(uncompfd, cread_buf, chunksize);
rbytes = Read2(uncompfd, cread_buf, chunksize, &rabin_count, rctx);
while (!bail) { while (!bail) {
uchar_t *tmp; uchar_t *tmp;
@ -825,18 +903,23 @@ start_compress(const char *filename, uint64_t chunksize, int level)
/* /*
* Once previous chunk is done swap already read buffer and * Once previous chunk is done swap already read buffer and
* it's size into the thread data. * it's size into the thread data.
* Normally it goes into uncompressed_chunk, because that's what it is.
* With dedup enabled however, we do some jugglery to save additional
* memory usage and avoid a memcpy, so it goes into the compressed_chunk
* area:
* cmp_seg -> dedup -> uncompressed_chunk -> compression -> cmp_seg
*/ */
tdat->id = chunk_num; tdat->id = chunk_num;
tmp = tdat->uncompressed_chunk;
tdat->uncompressed_chunk = cread_buf;
cread_buf = tmp;
tdat->rbytes = rbytes; tdat->rbytes = rbytes;
if (rabin_count) { if (enable_rabin_scan) {
memcpy(cread_buf, tmp = tdat->cmp_seg;
tdat->uncompressed_chunk + rabin_count, tdat->cmp_seg = cread_buf;
rbytes - rabin_count); cread_buf = tmp;
tdat->rbytes = rabin_count; tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t);
rabin_count = rbytes - rabin_count; } else {
tmp = tdat->uncompressed_chunk;
tdat->uncompressed_chunk = cread_buf;
cread_buf = tmp;
} }
if (rbytes < chunksize) { if (rbytes < chunksize) {
if (rbytes < 0) { if (rbytes < 0) {
@ -857,7 +940,7 @@ start_compress(const char *filename, uint64_t chunksize, int level)
* Read the next buffer we want to process while previous * Read the next buffer we want to process while previous
* buffer is in progress. * buffer is in progress.
*/ */
rbytes = Read2(uncompfd, cread_buf, chunksize, &rabin_count, rctx); rbytes = Read(uncompfd, cread_buf, chunksize);
} }
} }
@ -887,7 +970,7 @@ comp_done:
if (err) { if (err) {
if (compfd != -1 && !pipe_mode) if (compfd != -1 && !pipe_mode)
unlink(tmpfile); unlink(tmpfile1);
fprintf(stderr, "Error compressing file: %s\n", filename); fprintf(stderr, "Error compressing file: %s\n", filename);
} else { } else {
/* /*
@ -912,9 +995,9 @@ comp_done:
if (fchown(compfd, sbuf.st_uid, sbuf.st_gid) == -1) if (fchown(compfd, sbuf.st_uid, sbuf.st_gid) == -1)
perror("chown "); perror("chown ");
if (rename(tmpfile, to_filename) == -1) { if (rename(tmpfile1, to_filename) == -1) {
perror("Cannot rename temporary file "); perror("Cannot rename temporary file ");
unlink(tmpfile); unlink(tmpfile1);
} }
} }
} }
@ -922,6 +1005,9 @@ comp_done:
for (i = 0; i < nprocs; i++) { for (i = 0; i < nprocs; i++) {
slab_free(NULL, dary[i]->uncompressed_chunk); slab_free(NULL, dary[i]->uncompressed_chunk);
slab_free(NULL, dary[i]->cmp_seg); slab_free(NULL, dary[i]->cmp_seg);
if (enable_rabin_scan) {
destroy_rabin_context(dary[i]->rctx);
}
if (_deinit_func) if (_deinit_func)
_deinit_func(&(dary[i]->data)); _deinit_func(&(dary[i]->data));
slab_free(NULL, dary[i]); slab_free(NULL, dary[i]);
@ -982,7 +1068,7 @@ init_algo(const char *algo, int bail)
_stats_func = ppmd_stats; _stats_func = ppmd_stats;
rv = 0; rv = 0;
/* adapt2 and adapt ordering of the checks matters here. */ /* adapt2 and adapt ordering of the checks matter here. */
} else if (memcmp(algorithm, "adapt2", 6) == 0) { } else if (memcmp(algorithm, "adapt2", 6) == 0) {
_compress_func = adapt_compress; _compress_func = adapt_compress;
_decompress_func = adapt_decompress; _decompress_func = adapt_decompress;
@ -1069,9 +1155,9 @@ main(int argc, char *argv[])
hide_cmp_stats = 0; hide_cmp_stats = 0;
break; break;
case 'r': //case 'r':
enable_rabin_scan = 1; //enable_rabin_scan = 1;
break; //break;
case '?': case '?':
default: default:
@ -1097,7 +1183,7 @@ main(int argc, char *argv[])
} }
if (enable_rabin_scan && !do_compress) { if (enable_rabin_scan && !do_compress) {
fprintf(stderr, "Rabin Fingerprinting is only used during compression.\n"); fprintf(stderr, "Rabin Deduplication is only used during compression.\n");
usage(); usage();
exit(1); exit(1);
} }
@ -1138,7 +1224,8 @@ main(int argc, char *argv[])
usage(); usage();
exit(1); exit(1);
} }
} else { } else if (num_rem > 2) {
fprintf(stderr, "Too many filenames.\n");
usage(); usage();
exit(1); exit(1);
} }

View file

@ -32,10 +32,13 @@
extern "C" { extern "C" {
#endif #endif
#include <rabin_polynomial.h>
#define CHDR_SZ 1 #define CHDR_SZ 1
#define ALGO_SZ 8 #define ALGO_SZ 8
#define MIN_CHUNK 2048 #define MIN_CHUNK 2048
#define VERSION 1 #define VERSION 2
#define FLAG_DEDUP 1
#define COMPRESSED 1 #define COMPRESSED 1
#define UNCOMPRESSED 0 #define UNCOMPRESSED 0
@ -45,6 +48,7 @@ extern "C" {
#define COMPRESS_LZMA 1 #define COMPRESS_LZMA 1
#define COMPRESS_BZIP2 2 #define COMPRESS_BZIP2 2
#define COMPRESS_PPMD 3 #define COMPRESS_PPMD 3
#define CHUNK_FLAG_DEDUP 2
#define COMP_EXTN ".pz" #define COMP_EXTN ".pz"
/* Pointer type for compress and decompress functions. */ /* Pointer type for compress and decompress functions. */
@ -106,6 +110,7 @@ struct cmp_data {
uchar_t *cmp_seg; uchar_t *cmp_seg;
uchar_t *compressed_chunk; uchar_t *compressed_chunk;
uchar_t *uncompressed_chunk; uchar_t *uncompressed_chunk;
rabin_context_t *rctx;
ssize_t rbytes; ssize_t rbytes;
ssize_t chunksize; ssize_t chunksize;
ssize_t len_cmp; ssize_t len_cmp;

View file

@ -62,28 +62,46 @@
#include <allocator.h> #include <allocator.h>
#include <utils.h> #include <utils.h>
// CRC64 pieces from LZMA's implementation -----------------
#include <crc_macros.h>
#ifdef WORDS_BIGENDIAN
# define A1(x) ((x) >> 56)
#else
# define A1 A
#endif
extern const uint64_t lzma_crc64_table[4][256];
// ---------------------------------------------------------
#include "rabin_polynomial.h" #include "rabin_polynomial.h"
unsigned int rabin_polynomial_max_block_size = RAB_POLYNOMIAL_AVG_BLOCK_SIZE; unsigned int rabin_polynomial_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE;
unsigned int rabin_polynomial_min_block_size = RAB_POLYNOMIAL_MIN_BLOCK_SIZE;
unsigned int rabin_avg_block_mask = RAB_POLYNOMIAL_AVG_BLOCK_MASK;
/* /*
* Initialize the algorithm with the default params. Not thread-safe. * Initialize the algorithm with the default params.
*/ */
rabin_context_t * rabin_context_t *
create_rabin_context() { create_rabin_context(uint64_t chunksize) {
rabin_context_t *ctx; rabin_context_t *ctx;
unsigned char *current_window_data; unsigned char *current_window_data;
unsigned int blknum;
blknum = chunksize / rabin_polynomial_min_block_size;
if (chunksize % rabin_polynomial_min_block_size)
blknum++;
ctx = (rabin_context_t *)slab_alloc(NULL, sizeof (rabin_context_t)); ctx = (rabin_context_t *)slab_alloc(NULL, sizeof (rabin_context_t));
current_window_data = slab_alloc(NULL, RAB_POLYNOMIAL_WIN_SIZE); current_window_data = slab_alloc(NULL, RAB_POLYNOMIAL_WIN_SIZE);
if(ctx == NULL || current_window_data == NULL) { ctx->blocks = (rabin_blockentry_t *)slab_alloc(NULL,
blknum * rabin_polynomial_min_block_size);
if(ctx == NULL || current_window_data == NULL || ctx->blocks == NULL) {
fprintf(stderr, fprintf(stderr,
"Could not allocate rabin polynomial context, out of memory\n"); "Could not allocate rabin polynomial context, out of memory\n");
return (NULL); return (NULL);
} }
memset(current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE);
ctx->current_window_data = current_window_data;
/* /*
* We should compute the power for the window size. * We should compute the power for the window size.
@ -96,61 +114,258 @@ create_rabin_context() {
* x * polynomial_pow can we written as x << RAB_POLYNOMIAL_WIN_SIZE * x * polynomial_pow can we written as x << RAB_POLYNOMIAL_WIN_SIZE
*/ */
ctx->current_window_data = current_window_data;
reset_rabin_context(ctx);
return (ctx);
}
void
reset_rabin_context(rabin_context_t *ctx)
{
memset(ctx->current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE);
ctx->window_pos = 0; ctx->window_pos = 0;
ctx->cur_roll_checksum = 0; ctx->cur_roll_checksum = 0;
return (ctx); ctx->cur_checksum = 0;
} }
void void
destroy_rabin_context(rabin_context_t *ctx) destroy_rabin_context(rabin_context_t *ctx)
{ {
slab_free(NULL, ctx->current_window_data); slab_free(NULL, ctx->current_window_data);
slab_free(NULL, ctx->blocks);
slab_free(NULL, ctx); slab_free(NULL, ctx);
} }
/** /*
* Given a buffer compute all the rabin chunks and return the end offset of the * Checksum Comparator for qsort
* last chunk in the buffer. The last chunk may not end at the buffer end. The
* bytes till the last chunk end is used as the compression chunk and remaining
* bytes are carried over to the next chunk.
*/ */
ssize_t static int
scan_rabin_chunks(rabin_context_t *ctx, void *buf, ssize_t size, ssize_t offset) cmpblks(const void *a, const void *b)
{ {
size_t i, length, last_offset; rabin_blockentry_t *a1 = (rabin_blockentry_t *)a;
rabin_blockentry_t *b1 = (rabin_blockentry_t *)b;
length = 0; if (a1->checksum < b1->checksum)
return (-1);
else if (a1->checksum == b1->checksum)
return (0);
else if (a1->checksum > b1->checksum)
return (1);
}
/**
* Perform Deduplication based on Rabin Fingerprinting. A 32-byte window is used for
* the rolling checksum and dedup blocks vary in size from 4K-128K.
*/
void
rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset)
{
ssize_t i, last_offset;
unsigned int blknum;
char *buf1 = (char *)buf;
unsigned int length;
ssize_t overhead;
length = offset;
last_offset = 0; last_offset = 0;
blknum = 0;
ctx->valid = 0;
for (i=offset; i<size; i++) { if (*size < RAB_POLYNOMIAL_AVG_BLOCK_SIZE) return;
char cur_byte = *((char *)(buf+i)); for (i=offset; i<*size; i++) {
char cur_byte = buf1[i];
uint64_t pushed_out = ctx->current_window_data[ctx->window_pos]; uint64_t pushed_out = ctx->current_window_data[ctx->window_pos];
ctx->current_window_data[ctx->window_pos] = cur_byte; ctx->current_window_data[ctx->window_pos] = cur_byte;
/* /*
* We want to do: * We want to do:
* cur_roll_checksum = cur_roll_checksum * RAB_POLYNOMIAL_CONST + cur_byte; * cur_roll_checksum = cur_roll_checksum * RAB_POLYNOMIAL_CONST + cur_byte;
* cur_roll_checksum -= pushed_out * polynomial_pow; * cur_roll_checksum -= pushed_out * polynomial_pow;
* cur_checksum = cur_checksum * RAB_POLYNOMIAL_CONST + cur_byte;
* *
* However since RAB_POLYNOMIAL_CONST == 2, we use shifts. * However since RAB_POLYNOMIAL_CONST == 2, we use shifts.
*/ */
ctx->cur_roll_checksum = (ctx->cur_roll_checksum << 1) + cur_byte; ctx->cur_roll_checksum = (ctx->cur_roll_checksum << 1) + cur_byte;
ctx->cur_roll_checksum -= (pushed_out << RAB_POLYNOMIAL_WIN_SIZE); ctx->cur_roll_checksum -= (pushed_out << RAB_POLYNOMIAL_WIN_SIZE);
// CRC64 Calculation swiped from LZMA
ctx->cur_checksum = lzma_crc64_table[0][cur_byte ^ A1(ctx->cur_checksum)] ^ S8(ctx->cur_checksum);
ctx->window_pos++; ctx->window_pos++;
length++; length++;
if (ctx->window_pos == RAB_POLYNOMIAL_WIN_SIZE) // Loop back around if (ctx->window_pos == RAB_POLYNOMIAL_WIN_SIZE) // Loop back around
ctx->window_pos=0; ctx->window_pos=0;
if (length < rabin_polynomial_min_block_size) continue;
// If we hit our special value or reached the max block size update block offset // If we hit our special value or reached the max block size update block offset
if ((ctx->cur_roll_checksum & RAB_POLYNOMIAL_AVG_BLOCK_MASK) == RAB_POLYNOMIAL_CONST || if ((ctx->cur_roll_checksum & rabin_avg_block_mask) == RAB_POLYNOMIAL_CONST ||
length >= rabin_polynomial_max_block_size) { length >= rabin_polynomial_max_block_size) {
ctx->blocks[blknum].offset = last_offset;
ctx->blocks[blknum].index = blknum; // Need to store for sorting
ctx->blocks[blknum].checksum = ctx->cur_checksum;
ctx->blocks[blknum].length = length;
blknum++;
ctx->cur_checksum = 0;
last_offset = i+1; last_offset = i+1;
length = 0; length = 0;
} }
} }
if (last_offset == 0) last_offset = size;
return last_offset; // If we found at least a few chunks, perform dedup.
if (blknum > 2) {
uint64_t prev_cksum;
unsigned int blk, prev_length;
ssize_t pos, matches;
int valid = 1;
char *tmp, *prev_offset;
unsigned int *blkarr, prev_blk;
// Insert the last left-over trailing bytes, if any, into a block.
if (last_offset < *size) {
ctx->blocks[blknum].offset = last_offset;
ctx->blocks[blknum].index = blknum;
ctx->blocks[blknum].checksum = ctx->cur_checksum;
ctx->blocks[blknum].length = *size - last_offset;
blknum++;
ctx->cur_checksum = 0;
last_offset = *size;
}
overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE;
prev_cksum = 0;
prev_length = 0;
prev_offset = 0;
pos = overhead;
/*
* Now sort the block array based on checksums. This will bring virtually
* all similar block entries together. Effectiveness depends on how strong
* our checksum is. We are using CRC64 here so we should be pretty okay.
* TODO: Test with a heavily optimized MD5 (from OpenSSL?) later.
*/
qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t), cmpblks);
blkarr = (unsigned int *)(ctx->cbuf + RABIN_HDR_SIZE);
matches = 0;
/*
* Now make a pass through the sorted block array making identical blocks
* point to the first identical block entry. A simple Run Length Encoding
* sort of. Checksums, length and contents (memcmp()) must match for blocks
* to be considered identical.
* The block index in the chunk is initialized with pointers into the
* sorted block array.
*/
for (blk = 0; blk < blknum; blk++) {
blkarr[ctx->blocks[blk].index] = blk;
if (blk > 0 && ctx->blocks[blk].checksum == prev_cksum &&
ctx->blocks[blk].length == prev_length &&
memcmp(prev_offset, buf1 + ctx->blocks[blk].offset, prev_length) == 0) {
ctx->blocks[blk].length = 0;
ctx->blocks[blk].index = prev_blk;
matches += prev_length;
continue;
}
prev_offset = buf1 + ctx->blocks[blk].offset;
prev_cksum = ctx->blocks[blk].checksum;
prev_length = ctx->blocks[blk].length;
prev_blk = ctx->blocks[blk].index;
}
if (matches < overhead) {
ctx->valid = 0;
return;
}
/*
* Another pass, this time through the block index in the chunk. We insert
* block length into unique block entries. For block entries that are
* identical with another one we store the index number + max rabin block length.
* This way we can differentiate between a unique block length entry and a
* pointer to another block without needing a separate flag.
*/
for (blk = 0; blk < blknum; blk++) {
rabin_blockentry_t *be;
/*
* If blocks are overflowing the allowed chunk size then dedup did not
* help at all. We invalidate the dedup operation.
*/
if (pos > last_offset) {
valid = 0;
break;
}
be = &(ctx->blocks[blkarr[blk]]);
if (be->length > 0) {
prev_offset = buf1 + be->offset;
memcpy(ctx->cbuf + pos, prev_offset, be->length);
pos += be->length;
blkarr[blk] = htonl(be->length);
} else {
blkarr[blk] = htonl(RAB_POLYNOMIAL_MAX_BLOCK_SIZE + be->index + 1);
}
}
cont:
if (valid) {
*((unsigned int *)(ctx->cbuf)) = htonl(blknum);
*((ssize_t *)(ctx->cbuf + sizeof (unsigned int))) = htonll(*size);
*size = pos;
ctx->valid = 1;
}
}
}
void
rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size)
{
unsigned int blknum, blk, oblk, len;
unsigned int *blkarr;
ssize_t orig_size, sz;
ssize_t overhead, pos1, i;
uchar_t *pos2;
blknum = ntohl(*((unsigned int *)(buf)));
orig_size = ntohll(*((ssize_t *)(buf + sizeof (unsigned int))));
blkarr = (unsigned int *)(buf + RABIN_HDR_SIZE);
overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE;
pos1 = overhead;
pos2 = ctx->cbuf;
sz = 0;
ctx->valid = 1;
for (blk = 0; blk < blknum; blk++) {
len = ntohl(blkarr[blk]);
if (len <= RAB_POLYNOMIAL_MAX_BLOCK_SIZE) {
ctx->blocks[blk].length = len;
ctx->blocks[blk].offset = pos1;
pos1 += len;
} else {
ctx->blocks[blk].length = 0;
ctx->blocks[blk].index = len - RAB_POLYNOMIAL_MAX_BLOCK_SIZE - 1;
}
}
for (blk = 0; blk < blknum; blk++) {
if (ctx->blocks[blk].length > 0) {
len = ctx->blocks[blk].length;
pos1 = ctx->blocks[blk].offset;
} else {
oblk = ctx->blocks[blk].index;
len = ctx->blocks[oblk].length;
pos1 = ctx->blocks[oblk].offset;
}
memcpy(pos2, buf + pos1, len);
pos2 += len;
sz += len;
if (sz > orig_size) {
ctx->valid = 0;
break;
}
}
if (ctx->valid && sz < orig_size) {
ctx->valid = 0;
}
*size = orig_size;
} }

View file

@ -56,30 +56,68 @@
* *
*/ */
#ifndef _RABIN_POLY_H_
#define _RABIN_POLY_H_
#include "utils.h"
//List of constants, mostly constraints and defaults for various parameters //List of constants, mostly constraints and defaults for various parameters
//to the Rabin Fingerprinting algorithm //to the Rabin Fingerprinting algorithm
#define RAB_POLYNOMIAL_CONST 2 #define RAB_POLYNOMIAL_CONST 2
// 1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT = Average Rabin Chunk Size // 1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT = Average Rabin Chunk Size
// So we are always looking at power of 2 chunk sizes to avoid doing a modulus // So we are always looking at power of 2 chunk sizes to avoid doing a modulus
// //
// A value of 11 below gives block size of 2048 bytes // A value of 12 below gives avg block size of 4096 bytes
// //
#define RAB_POLYNOMIAL_AVG_BLOCK_SHIFT 11 #define RAB_POLYNOMIAL_AVG_BLOCK_SHIFT 12
#define RAB_POLYNOMIAL_AVG_BLOCK_SIZE (1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT) #define RAB_POLYNOMIAL_AVG_BLOCK_SIZE (1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT)
#define RAB_POLYNOMIAL_AVG_BLOCK_MASK (RAB_POLYNOMIAL_AVG_BLOCK_SIZE - 1) #define RAB_POLYNOMIAL_AVG_BLOCK_MASK (RAB_POLYNOMIAL_AVG_BLOCK_SIZE - 1)
#define RAB_POLYNOMIAL_WIN_SIZE 32 #define RAB_POLYNOMIAL_MIN_BLOCK_SIZE (4096)
#define RAB_POLYNOMIAL_MIN_WIN_SIZE 17 #define RAB_POLYNOMIAL_MAX_BLOCK_SIZE (128 * 1024)
#define RAB_POLYNOMIAL_MAX_WIN_SIZE 63 #define RAB_POLYNOMIAL_WIN_SIZE 32
#define RAB_POLYNOMIAL_MIN_WIN_SIZE 17
#define RAB_POLYNOMIAL_MAX_WIN_SIZE 63
typedef struct {
ssize_t offset;
uint64_t checksum;
unsigned int index;
unsigned int length;
} rabin_blockentry_t;
// An entry in the Rabin block array in the chunk.
// It is either a length value <= RAB_POLYNOMIAL_MAX_BLOCK_SIZE or
// if value > RAB_POLYNOMIAL_MAX_BLOCK_SIZE then
// value - RAB_POLYNOMIAL_MAX_BLOCK_SIZE is index of block with which
// this block is a duplicate.
// Offset can be dynamically calculated.
//
#define RABIN_ENTRY_SIZE (sizeof (unsigned int))
// Header for a chunk deduped using Rabin
// Number of rabin blocks, size of original chunk
//
#define RABIN_HDR_SIZE (sizeof (unsigned int) + sizeof (ssize_t))
typedef struct { typedef struct {
unsigned char *current_window_data; unsigned char *current_window_data;
rabin_blockentry_t *blocks;
unsigned char *cbuf;
unsigned char *buf;
int window_pos; int window_pos;
uint64_t cur_roll_checksum; uint64_t cur_roll_checksum;
uint64_t cur_checksum;
uint64_t block_checksum;
int dedup;
int valid;
} rabin_context_t; } rabin_context_t;
extern rabin_context_t *create_rabin_context(); extern rabin_context_t *create_rabin_context(uint64_t chunksize);
extern void destroy_rabin_context(rabin_context_t *ctx); extern void destroy_rabin_context(rabin_context_t *ctx);
extern ssize_t scan_rabin_chunks(rabin_context_t *ctx, void *buf, extern void rabin_dedup(rabin_context_t *ctx, unsigned char *buf,
ssize_t size, ssize_t offset); ssize_t *size, ssize_t offset);
extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size);
extern void reset_rabin_context(rabin_context_t *ctx);
#endif /* _RABIN_POLY_H_ */

24
utils.c
View file

@ -194,30 +194,6 @@ Read(int fd, void *buf, size_t count)
return (count - rem); return (count - rem);
} }
ssize_t
Read2(int fd, void *buf, size_t count, ssize_t *rabin_count, void *ctx)
{
char *buf2;
ssize_t rcount;
rabin_context_t *rctx = (rabin_context_t *)ctx;
if (!ctx) return (Read(fd, buf, count));
buf2 = buf;
if (*rabin_count) {
buf2 = (char *)buf + *rabin_count;
count -= *rabin_count;
}
rcount = Read(fd, buf2, count);
if (rcount > 0) {
rcount += *rabin_count;
*rabin_count = scan_rabin_chunks(rctx, buf, rcount, *rabin_count);
} else {
if (rcount == 0) rcount = *rabin_count;
*rabin_count = 0;
}
return (rcount);
}
ssize_t ssize_t
Write(int fd, const void *buf, size_t count) Write(int fd, const void *buf, size_t count)
{ {

View file

@ -98,7 +98,7 @@ extern int parse_numeric(ssize_t *val, const char *str);
extern char *bytes_to_size(uint64_t bytes); extern char *bytes_to_size(uint64_t bytes);
extern ssize_t Read(int fd, void *buf, size_t count); extern ssize_t Read(int fd, void *buf, size_t count);
extern ssize_t Write(int fd, const void *buf, size_t count); extern ssize_t Write(int fd, const void *buf, size_t count);
extern ssize_t Read2(int fd, void *buf, size_t count, extern ssize_t Dedup_Read(int fd, uchar_t **buf, size_t count,
ssize_t *rabin_count, void *ctx); ssize_t *rabin_count, void *ctx);
/* /*