diff --git a/Makefile b/Makefile index 73d60c9..8a9a0ec 100644 --- a/Makefile +++ b/Makefile @@ -55,8 +55,11 @@ LDLIBS = -ldl -lbz2 $(ZLIB_DIR) -lz -lm ifdef DEBUG LINK = g++ -m64 -pthread -msse3 -COMPILE = gcc -m64 -g -msse3 -c -COMPILE_cpp = g++ -m64 -g -msse3 -c +COMPILE = gcc -m64 -O -g -msse3 -c +COMPILE_cpp = g++ -m64 -O -g -msse3 -c +ifdef DEBUG_NO_SLAB +CPPFLAGS += -DDEBUG_NO_SLAB +endif else LINK = g++ -m64 -pthread -msse3 COMPILE = gcc -m64 -O3 -msse3 -c diff --git a/allocator.c b/allocator.c index 9b301cf..8da6497 100644 --- a/allocator.c +++ b/allocator.c @@ -48,6 +48,7 @@ #include "utils.h" #include "allocator.h" +#ifndef DEBUG_NO_SLAB /* * Number of slabs: * 256 bytes to 1M in power of 2 steps: 13 @@ -482,7 +483,37 @@ slab_free(void *p, void *address) pthread_mutex_unlock(&hbucket_locks[hindx]); free(address); fprintf(stderr, "Freed buf(%p) not in slab allocations!\n", address); + abort(); fflush(stderr); } } +#else +void +slab_init() {} + +void +slab_cleanup(int quiet) {} + +void +*slab_alloc(void *p, size_t size) +{ + return (malloc(size)); +} + +void +*slab_calloc(void *p, size_t items, size_t size) +{ + return (calloc(items, size)); +} + +void +slab_free(void *p, void *address) +{ + free(address); +} + +int +slab_cache_add(size_t size) {} + +#endif \ No newline at end of file diff --git a/main.c b/main.c index cc4dd5b..983e666 100644 --- a/main.c +++ b/main.c @@ -58,6 +58,7 @@ struct wdata { struct cmp_data **dary; int wfd; int nprocs; + ssize_t chunksize; }; @@ -113,12 +114,7 @@ usage(void) " %s -d \n" "3) To operate as a pipe, read from stdin and write to stdout:\n" " %s -p ...\n" - "4) To use Rabin Fingerprinting to adjust chunk boundaries:\n" - " %s -r -c ...\n" - " In this case will specify the max chunk size and chunks\n" - " will be variable-length delimited at the rabin boundary closest to\n" - " bytes. This should improve chunked compression.\n" - " This option is obviously valid only when compressing.\n" + "4) Rabin Deduplication: Work in progress.\n" "5) Number of threads can optionally be specified: -t <1 - 256 count>\n" "6) Pass '-M' to display memory allocator statistics\n" "7) Pass '-C' to display compression statistics\n\n", @@ -185,6 +181,7 @@ redo: rseg = tdat->compressed_chunk + tdat->rbytes; _chunksize = ntohll(*((ssize_t *)rseg)); } + if (HDR & COMPRESSED) { rv = tdat->decompress(cseg, tdat->len_cmp, tdat->uncompressed_chunk, &_chunksize, tdat->level, tdat->data); @@ -193,6 +190,28 @@ redo: } tdat->len_cmp = _chunksize; + /* Rebuild chunk from dedup blocks. */ + if (enable_rabin_scan && (HDR & FLAG_DEDUP)) { + rabin_context_t *rctx; + uchar_t *tmp; + + rctx = tdat->rctx; + reset_rabin_context(tdat->rctx); + rctx->cbuf = tdat->compressed_chunk; + rabin_inverse_dedup(rctx, tdat->uncompressed_chunk, &(tdat->len_cmp)); + if (!rctx->valid) { + fprintf(stderr, "ERROR: Chunk %d, dedup recovery failed.\n", tdat->id); + rv = -1; + tdat->len_cmp = 0; + goto cont; + } + _chunksize = tdat->len_cmp; + tmp = tdat->uncompressed_chunk; + tdat->uncompressed_chunk = tdat->compressed_chunk; + tdat->compressed_chunk = tmp; + tdat->cmp_seg = tdat->uncompressed_chunk; + } + if (rv == -1) { tdat->len_cmp = 0; fprintf(stderr, "ERROR: Chunk %d, decompression failed.\n", tdat->id); @@ -221,9 +240,10 @@ cont: * ---------------------- * File Header: * Algorithm string: 8 bytes. - * Version number: 4 bytes. + * Version number: 2 bytes. + * Global Flags: 2 bytes. * Chunk size: 8 bytes. - * Compression Level: 4 bytes; + * Compression Level: 4 bytes. * * Chunk Header: * Compressed length: 8 bytes. @@ -232,12 +252,15 @@ cont: * * Chunk Flags, 8 bits: * I I I I I I I I - * | | | | - * | | | `- 0 - Uncompressed - * | | | 1 - Compressed - * | | | - * | | `------------- 1 - Bzip2 (Adaptive Mode) - * | `---------------- 1 - Lzma (Adaptive Mode) + * | | | | | + * | '-----' | `- 0 - Uncompressed + * | | | 1 - Compressed + * | | | + * | | `---- 1 - Chunk was Deduped + * | | + * | | 1 - Bzip2 (Adaptive Mode) + * | `---------------- 2 - Lzma (Adaptive Mode) + * | 3 - PPMD (Adaptive Mode) * | * `---------------------- 1 - Last Chunk flag * @@ -255,12 +278,16 @@ start_decompress(const char *filename, const char *to_filename) struct wdata w; int compfd = -1, i, p; int uncompfd = -1, err, np, bail; - int version, nprocs, thread = 0, level; + int nprocs, thread = 0, level; + short version, flags; ssize_t chunksize, compressed_chunksize; struct cmp_data **dary, *tdat; pthread_t writer_thr; err = 0; + flags = 0; + thread = 0; + /* * Open files and do sanity checks. */ @@ -303,17 +330,19 @@ start_decompress(const char *filename, const char *to_filename) } if (Read(compfd, &version, sizeof (version)) < sizeof (version) || + Read(compfd, &flags, sizeof (flags)) < sizeof (flags) || Read(compfd, &chunksize, sizeof (chunksize)) < sizeof (chunksize) || Read(compfd, &level, sizeof (level)) < sizeof (level)) { perror("Read: "); UNCOMP_BAIL; } - version = ntohl(version); + version = ntohs(version); + flags = ntohs(flags); chunksize = ntohll(chunksize); level = ntohl(level); - if (version != 1) { + if (version != VERSION) { fprintf(stderr, "Unsupported version: %d\n", version); err = 1; goto uncomp_done; @@ -322,6 +351,10 @@ start_decompress(const char *filename, const char *to_filename) compressed_chunksize = chunksize + (chunksize >> 6) + sizeof (uint64_t) + sizeof (chunksize); + if (flags & FLAG_DEDUP) { + enable_rabin_scan = 1; + } + if (nthreads == 0) nprocs = sysconf(_SC_NPROCESSORS_ONLN); else @@ -346,7 +379,11 @@ start_decompress(const char *filename, const char *to_filename) fprintf(stderr, "Out of memory\n"); UNCOMP_BAIL; } - tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); + if (enable_rabin_scan) + tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, + compressed_chunksize + CHDR_SZ); + else + tdat->uncompressed_chunk = (uchar_t *)slab_alloc(NULL, chunksize); if (!tdat->uncompressed_chunk) { fprintf(stderr, "Out of memory\n"); UNCOMP_BAIL; @@ -362,6 +399,10 @@ start_decompress(const char *filename, const char *to_filename) sem_init(&(tdat->write_done_sem), 0, 1); if (_init_func) _init_func(&(tdat->data), &(tdat->level), chunksize); + if (enable_rabin_scan) + tdat->rctx = create_rabin_context(chunksize); + else + tdat->rctx = NULL; if (pthread_create(&(tdat->thr), NULL, perform_decompress, (void *)tdat) != 0) { perror("Error in thread creation: "); @@ -373,6 +414,7 @@ start_decompress(const char *filename, const char *to_filename) w.dary = dary; w.wfd = uncompfd; w.nprocs = nprocs; + w.chunksize = chunksize; if (pthread_create(&writer_thr, NULL, writer_thread, (void *)(&w)) != 0) { perror("Error in thread creation: "); UNCOMP_BAIL; @@ -480,6 +522,9 @@ uncomp_done: slab_free(NULL, dary[i]->compressed_chunk); if (_deinit_func) _deinit_func(&(tdat->data)); + if (enable_rabin_scan) { + destroy_rabin_context(dary[i]->rctx); + } slab_free(NULL, dary[i]); } slab_free(NULL, dary); @@ -507,11 +552,31 @@ redo: return (0); } - /* - * Compute checksum of original uncompressed chunk. - */ - tdat->crc64 = lzma_crc64(tdat->uncompressed_chunk, tdat->rbytes, 0); + /* Perform Dedup if enabled. */ + if (enable_rabin_scan) { + rabin_context_t *rctx; + ssize_t rbytes; + /* + * Compute checksum of original uncompressed chunk. + */ + tdat->crc64 = lzma_crc64(tdat->cmp_seg, tdat->rbytes, 0); + + rctx = tdat->rctx; + rbytes = tdat->rbytes; + reset_rabin_context(tdat->rctx); + rctx->cbuf = tdat->uncompressed_chunk; + rabin_dedup(tdat->rctx, tdat->cmp_seg, &(tdat->rbytes), 0); + if (!rctx->valid) { + memcpy(tdat->uncompressed_chunk, tdat->cmp_seg, rbytes); + tdat->rbytes = rbytes; + } + } else { + /* + * Compute checksum of original uncompressed chunk. + */ + tdat->crc64 = lzma_crc64(tdat->uncompressed_chunk, tdat->rbytes, 0); + } _chunksize = tdat->rbytes; rv = tdat->compress(tdat->uncompressed_chunk, tdat->rbytes, tdat->compressed_chunk + CHDR_SZ, &_chunksize, tdat->level, @@ -533,6 +598,9 @@ redo: type = COMPRESSED; } + if (enable_rabin_scan && tdat->rctx->valid) { + type |= CHUNK_FLAG_DEDUP; + } /* * Insert compressed chunk length and CRC64 checksum into * chunk header. @@ -548,7 +616,7 @@ redo: type |= (rv << 4); /* - * If last chunk is less than chunksize, store this length as well. + * If chunk is less than max chunksize, store this length as well. */ if (tdat->rbytes < tdat->chunksize) { type |= CHSIZE_MASK; @@ -615,15 +683,15 @@ do_cancel: */ #define COMP_BAIL err = 1; goto comp_done -static void +void start_compress(const char *filename, uint64_t chunksize, int level) { struct wdata w; - char tmpfile[MAXPATHLEN]; + char tmpfile1[MAXPATHLEN]; char to_filename[MAXPATHLEN]; ssize_t compressed_chunksize; - ssize_t n_chunksize, rbytes, rabin_count; - int version; + ssize_t n_chunksize, rbytes; + short version, flags; struct stat sbuf; int compfd = -1, uncompfd = -1, err; int i, thread = 0, bail; @@ -647,11 +715,14 @@ start_compress(const char *filename, uint64_t chunksize, int level) compressed_chunksize = chunksize + (chunksize >> 6) + sizeof (chunksize) + sizeof (uint64_t) + sizeof (chunksize); err = 0; + flags = 0; + thread = 0; + slab_cache_add(chunksize); + slab_cache_add(compressed_chunksize + CHDR_SZ); + slab_cache_add(sizeof (struct cmp_data)); if (enable_rabin_scan) { - rctx = create_rabin_context(); - if (rctx == NULL) - err_exit(0, "Initializing Rabin Polynomial failed\n"); + flags |= FLAG_DEDUP; } /* A host of sanity checks. */ @@ -687,11 +758,11 @@ start_compress(const char *filename, uint64_t chunksize, int level) * the end. The target file name is same as original file with the '.pz' * extension appended. */ - strcpy(tmpfile, filename); - strcpy(tmpfile, dirname(tmpfile)); - strcat(tmpfile, "/.pcompXXXXXX"); + strcpy(tmpfile1, filename); + strcpy(tmpfile1, dirname(tmpfile1)); + strcat(tmpfile1, "/.pcompXXXXXX"); snprintf(to_filename, sizeof (to_filename), "%s" COMP_EXTN, filename); - if ((compfd = mkstemp(tmpfile)) == -1) { + if ((compfd = mkstemp(tmpfile1)) == -1) { perror("mkstemp "); COMP_BAIL; } @@ -717,12 +788,12 @@ start_compress(const char *filename, uint64_t chunksize, int level) nprocs = nthreads; fprintf(stderr, "Scaling to %d threads\n", nprocs); - slab_cache_add(chunksize); - slab_cache_add(compressed_chunksize + CHDR_SZ); - slab_cache_add(sizeof (struct cmp_data)); dary = (struct cmp_data **)slab_alloc(NULL, sizeof (struct cmp_data *) * nprocs); - cread_buf = (uchar_t *)slab_alloc(NULL, chunksize); + if (enable_rabin_scan) + cread_buf = (uchar_t *)slab_alloc(NULL, compressed_chunksize + CHDR_SZ); + else + cread_buf = (uchar_t *)slab_alloc(NULL, chunksize); if (!cread_buf) { fprintf(stderr, "Out of memory\n"); COMP_BAIL; @@ -756,6 +827,10 @@ start_compress(const char *filename, uint64_t chunksize, int level) sem_init(&(tdat->write_done_sem), 0, 1); if (_init_func) _init_func(&(tdat->data), &(tdat->level), chunksize); + if (enable_rabin_scan) + tdat->rctx = create_rabin_context(chunksize); + else + tdat->rctx = NULL; if (pthread_create(&(tdat->thr), NULL, perform_compress, (void *)tdat) != 0) { @@ -779,12 +854,15 @@ start_compress(const char *filename, uint64_t chunksize, int level) */ memset(cread_buf, 0, ALGO_SZ); strncpy(cread_buf, algo, ALGO_SZ); - version = htonl(VERSION); + version = htons(VERSION); + flags = htons(flags); n_chunksize = htonll(chunksize); level = htonl(level); pos = cread_buf + ALGO_SZ; memcpy(pos, &version, sizeof (version)); pos += sizeof (version); + memcpy(pos, &flags, sizeof (flags)); + pos += sizeof (flags); memcpy(pos, &n_chunksize, sizeof (n_chunksize)); pos += sizeof (n_chunksize); memcpy(pos, &level, sizeof (level)); @@ -808,8 +886,8 @@ start_compress(const char *filename, uint64_t chunksize, int level) /* * Read the first chunk into a spare buffer (a simple double-buffering). */ - rabin_count = 0; - rbytes = Read2(uncompfd, cread_buf, chunksize, &rabin_count, rctx); + rbytes = Read(uncompfd, cread_buf, chunksize); + while (!bail) { uchar_t *tmp; @@ -825,18 +903,23 @@ start_compress(const char *filename, uint64_t chunksize, int level) /* * Once previous chunk is done swap already read buffer and * it's size into the thread data. + * Normally it goes into uncompressed_chunk, because that's what it is. + * With dedup enabled however, we do some jugglery to save additional + * memory usage and avoid a memcpy, so it goes into the compressed_chunk + * area: + * cmp_seg -> dedup -> uncompressed_chunk -> compression -> cmp_seg */ tdat->id = chunk_num; - tmp = tdat->uncompressed_chunk; - tdat->uncompressed_chunk = cread_buf; - cread_buf = tmp; tdat->rbytes = rbytes; - if (rabin_count) { - memcpy(cread_buf, - tdat->uncompressed_chunk + rabin_count, - rbytes - rabin_count); - tdat->rbytes = rabin_count; - rabin_count = rbytes - rabin_count; + if (enable_rabin_scan) { + tmp = tdat->cmp_seg; + tdat->cmp_seg = cread_buf; + cread_buf = tmp; + tdat->compressed_chunk = tdat->cmp_seg + sizeof (chunksize) + sizeof (uint64_t); + } else { + tmp = tdat->uncompressed_chunk; + tdat->uncompressed_chunk = cread_buf; + cread_buf = tmp; } if (rbytes < chunksize) { if (rbytes < 0) { @@ -857,7 +940,7 @@ start_compress(const char *filename, uint64_t chunksize, int level) * Read the next buffer we want to process while previous * buffer is in progress. */ - rbytes = Read2(uncompfd, cread_buf, chunksize, &rabin_count, rctx); + rbytes = Read(uncompfd, cread_buf, chunksize); } } @@ -887,7 +970,7 @@ comp_done: if (err) { if (compfd != -1 && !pipe_mode) - unlink(tmpfile); + unlink(tmpfile1); fprintf(stderr, "Error compressing file: %s\n", filename); } else { /* @@ -912,9 +995,9 @@ comp_done: if (fchown(compfd, sbuf.st_uid, sbuf.st_gid) == -1) perror("chown "); - if (rename(tmpfile, to_filename) == -1) { + if (rename(tmpfile1, to_filename) == -1) { perror("Cannot rename temporary file "); - unlink(tmpfile); + unlink(tmpfile1); } } } @@ -922,6 +1005,9 @@ comp_done: for (i = 0; i < nprocs; i++) { slab_free(NULL, dary[i]->uncompressed_chunk); slab_free(NULL, dary[i]->cmp_seg); + if (enable_rabin_scan) { + destroy_rabin_context(dary[i]->rctx); + } if (_deinit_func) _deinit_func(&(dary[i]->data)); slab_free(NULL, dary[i]); @@ -982,7 +1068,7 @@ init_algo(const char *algo, int bail) _stats_func = ppmd_stats; rv = 0; - /* adapt2 and adapt ordering of the checks matters here. */ + /* adapt2 and adapt ordering of the checks matter here. */ } else if (memcmp(algorithm, "adapt2", 6) == 0) { _compress_func = adapt_compress; _decompress_func = adapt_decompress; @@ -1069,9 +1155,9 @@ main(int argc, char *argv[]) hide_cmp_stats = 0; break; - case 'r': - enable_rabin_scan = 1; - break; + //case 'r': + //enable_rabin_scan = 1; + //break; case '?': default: @@ -1097,7 +1183,7 @@ main(int argc, char *argv[]) } if (enable_rabin_scan && !do_compress) { - fprintf(stderr, "Rabin Fingerprinting is only used during compression.\n"); + fprintf(stderr, "Rabin Deduplication is only used during compression.\n"); usage(); exit(1); } @@ -1138,7 +1224,8 @@ main(int argc, char *argv[]) usage(); exit(1); } - } else { + } else if (num_rem > 2) { + fprintf(stderr, "Too many filenames.\n"); usage(); exit(1); } diff --git a/pcompress.h b/pcompress.h index 180ca43..ebf5c22 100644 --- a/pcompress.h +++ b/pcompress.h @@ -32,10 +32,13 @@ extern "C" { #endif +#include + #define CHDR_SZ 1 #define ALGO_SZ 8 #define MIN_CHUNK 2048 -#define VERSION 1 +#define VERSION 2 +#define FLAG_DEDUP 1 #define COMPRESSED 1 #define UNCOMPRESSED 0 @@ -45,6 +48,7 @@ extern "C" { #define COMPRESS_LZMA 1 #define COMPRESS_BZIP2 2 #define COMPRESS_PPMD 3 +#define CHUNK_FLAG_DEDUP 2 #define COMP_EXTN ".pz" /* Pointer type for compress and decompress functions. */ @@ -106,6 +110,7 @@ struct cmp_data { uchar_t *cmp_seg; uchar_t *compressed_chunk; uchar_t *uncompressed_chunk; + rabin_context_t *rctx; ssize_t rbytes; ssize_t chunksize; ssize_t len_cmp; diff --git a/rabin/rabin_polynomial.c b/rabin/rabin_polynomial.c index bd3a130..02e82d0 100755 --- a/rabin/rabin_polynomial.c +++ b/rabin/rabin_polynomial.c @@ -62,28 +62,46 @@ #include #include +// CRC64 pieces from LZMA's implementation ----------------- +#include + +#ifdef WORDS_BIGENDIAN +# define A1(x) ((x) >> 56) +#else +# define A1 A +#endif + +extern const uint64_t lzma_crc64_table[4][256]; +// --------------------------------------------------------- + #include "rabin_polynomial.h" -unsigned int rabin_polynomial_max_block_size = RAB_POLYNOMIAL_AVG_BLOCK_SIZE; +unsigned int rabin_polynomial_max_block_size = RAB_POLYNOMIAL_MAX_BLOCK_SIZE; +unsigned int rabin_polynomial_min_block_size = RAB_POLYNOMIAL_MIN_BLOCK_SIZE; +unsigned int rabin_avg_block_mask = RAB_POLYNOMIAL_AVG_BLOCK_MASK; /* - * Initialize the algorithm with the default params. Not thread-safe. + * Initialize the algorithm with the default params. */ rabin_context_t * -create_rabin_context() { +create_rabin_context(uint64_t chunksize) { rabin_context_t *ctx; unsigned char *current_window_data; + unsigned int blknum; + + blknum = chunksize / rabin_polynomial_min_block_size; + if (chunksize % rabin_polynomial_min_block_size) + blknum++; ctx = (rabin_context_t *)slab_alloc(NULL, sizeof (rabin_context_t)); current_window_data = slab_alloc(NULL, RAB_POLYNOMIAL_WIN_SIZE); - if(ctx == NULL || current_window_data == NULL) { + ctx->blocks = (rabin_blockentry_t *)slab_alloc(NULL, + blknum * rabin_polynomial_min_block_size); + if(ctx == NULL || current_window_data == NULL || ctx->blocks == NULL) { fprintf(stderr, "Could not allocate rabin polynomial context, out of memory\n"); return (NULL); } - - memset(current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE); - ctx->current_window_data = current_window_data; /* * We should compute the power for the window size. @@ -96,61 +114,258 @@ create_rabin_context() { * x * polynomial_pow can we written as x << RAB_POLYNOMIAL_WIN_SIZE */ + ctx->current_window_data = current_window_data; + reset_rabin_context(ctx); + return (ctx); +} + +void +reset_rabin_context(rabin_context_t *ctx) +{ + memset(ctx->current_window_data, 0, RAB_POLYNOMIAL_WIN_SIZE); ctx->window_pos = 0; ctx->cur_roll_checksum = 0; - return (ctx); + ctx->cur_checksum = 0; } void destroy_rabin_context(rabin_context_t *ctx) { slab_free(NULL, ctx->current_window_data); + slab_free(NULL, ctx->blocks); slab_free(NULL, ctx); } -/** - * Given a buffer compute all the rabin chunks and return the end offset of the - * last chunk in the buffer. The last chunk may not end at the buffer end. The - * bytes till the last chunk end is used as the compression chunk and remaining - * bytes are carried over to the next chunk. +/* + * Checksum Comparator for qsort */ -ssize_t -scan_rabin_chunks(rabin_context_t *ctx, void *buf, ssize_t size, ssize_t offset) +static int +cmpblks(const void *a, const void *b) { - size_t i, length, last_offset; + rabin_blockentry_t *a1 = (rabin_blockentry_t *)a; + rabin_blockentry_t *b1 = (rabin_blockentry_t *)b; - length = 0; + if (a1->checksum < b1->checksum) + return (-1); + else if (a1->checksum == b1->checksum) + return (0); + else if (a1->checksum > b1->checksum) + return (1); +} + +/** + * Perform Deduplication based on Rabin Fingerprinting. A 32-byte window is used for + * the rolling checksum and dedup blocks vary in size from 4K-128K. + */ +void +rabin_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size, ssize_t offset) +{ + ssize_t i, last_offset; + unsigned int blknum; + char *buf1 = (char *)buf; + unsigned int length; + ssize_t overhead; + + length = offset; last_offset = 0; + blknum = 0; + ctx->valid = 0; - for (i=offset; icurrent_window_data[ctx->window_pos]; ctx->current_window_data[ctx->window_pos] = cur_byte; /* * We want to do: * cur_roll_checksum = cur_roll_checksum * RAB_POLYNOMIAL_CONST + cur_byte; * cur_roll_checksum -= pushed_out * polynomial_pow; + * cur_checksum = cur_checksum * RAB_POLYNOMIAL_CONST + cur_byte; * * However since RAB_POLYNOMIAL_CONST == 2, we use shifts. */ ctx->cur_roll_checksum = (ctx->cur_roll_checksum << 1) + cur_byte; ctx->cur_roll_checksum -= (pushed_out << RAB_POLYNOMIAL_WIN_SIZE); + // CRC64 Calculation swiped from LZMA + ctx->cur_checksum = lzma_crc64_table[0][cur_byte ^ A1(ctx->cur_checksum)] ^ S8(ctx->cur_checksum); ctx->window_pos++; length++; if (ctx->window_pos == RAB_POLYNOMIAL_WIN_SIZE) // Loop back around ctx->window_pos=0; - + + if (length < rabin_polynomial_min_block_size) continue; + // If we hit our special value or reached the max block size update block offset - if ((ctx->cur_roll_checksum & RAB_POLYNOMIAL_AVG_BLOCK_MASK) == RAB_POLYNOMIAL_CONST || + if ((ctx->cur_roll_checksum & rabin_avg_block_mask) == RAB_POLYNOMIAL_CONST || length >= rabin_polynomial_max_block_size) { + ctx->blocks[blknum].offset = last_offset; + ctx->blocks[blknum].index = blknum; // Need to store for sorting + ctx->blocks[blknum].checksum = ctx->cur_checksum; + ctx->blocks[blknum].length = length; + + blknum++; + ctx->cur_checksum = 0; last_offset = i+1; length = 0; } } - if (last_offset == 0) last_offset = size; - return last_offset; + // If we found at least a few chunks, perform dedup. + if (blknum > 2) { + uint64_t prev_cksum; + unsigned int blk, prev_length; + ssize_t pos, matches; + int valid = 1; + char *tmp, *prev_offset; + unsigned int *blkarr, prev_blk; + + // Insert the last left-over trailing bytes, if any, into a block. + if (last_offset < *size) { + ctx->blocks[blknum].offset = last_offset; + ctx->blocks[blknum].index = blknum; + ctx->blocks[blknum].checksum = ctx->cur_checksum; + ctx->blocks[blknum].length = *size - last_offset; + blknum++; + ctx->cur_checksum = 0; + last_offset = *size; + } + + overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE; + prev_cksum = 0; + prev_length = 0; + prev_offset = 0; + pos = overhead; + + /* + * Now sort the block array based on checksums. This will bring virtually + * all similar block entries together. Effectiveness depends on how strong + * our checksum is. We are using CRC64 here so we should be pretty okay. + * TODO: Test with a heavily optimized MD5 (from OpenSSL?) later. + */ + qsort(ctx->blocks, blknum, sizeof (rabin_blockentry_t), cmpblks); + blkarr = (unsigned int *)(ctx->cbuf + RABIN_HDR_SIZE); + matches = 0; + + /* + * Now make a pass through the sorted block array making identical blocks + * point to the first identical block entry. A simple Run Length Encoding + * sort of. Checksums, length and contents (memcmp()) must match for blocks + * to be considered identical. + * The block index in the chunk is initialized with pointers into the + * sorted block array. + */ + for (blk = 0; blk < blknum; blk++) { + blkarr[ctx->blocks[blk].index] = blk; + + if (blk > 0 && ctx->blocks[blk].checksum == prev_cksum && + ctx->blocks[blk].length == prev_length && + memcmp(prev_offset, buf1 + ctx->blocks[blk].offset, prev_length) == 0) { + ctx->blocks[blk].length = 0; + ctx->blocks[blk].index = prev_blk; + matches += prev_length; + continue; + } + + prev_offset = buf1 + ctx->blocks[blk].offset; + prev_cksum = ctx->blocks[blk].checksum; + prev_length = ctx->blocks[blk].length; + prev_blk = ctx->blocks[blk].index; + } + + if (matches < overhead) { + ctx->valid = 0; + return; + } + /* + * Another pass, this time through the block index in the chunk. We insert + * block length into unique block entries. For block entries that are + * identical with another one we store the index number + max rabin block length. + * This way we can differentiate between a unique block length entry and a + * pointer to another block without needing a separate flag. + */ + for (blk = 0; blk < blknum; blk++) { + rabin_blockentry_t *be; + + /* + * If blocks are overflowing the allowed chunk size then dedup did not + * help at all. We invalidate the dedup operation. + */ + if (pos > last_offset) { + valid = 0; + break; + } + be = &(ctx->blocks[blkarr[blk]]); + if (be->length > 0) { + prev_offset = buf1 + be->offset; + memcpy(ctx->cbuf + pos, prev_offset, be->length); + pos += be->length; + blkarr[blk] = htonl(be->length); + } else { + blkarr[blk] = htonl(RAB_POLYNOMIAL_MAX_BLOCK_SIZE + be->index + 1); + } + } + +cont: + if (valid) { + *((unsigned int *)(ctx->cbuf)) = htonl(blknum); + *((ssize_t *)(ctx->cbuf + sizeof (unsigned int))) = htonll(*size); + *size = pos; + ctx->valid = 1; + } + } +} + +void +rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size) +{ + unsigned int blknum, blk, oblk, len; + unsigned int *blkarr; + ssize_t orig_size, sz; + ssize_t overhead, pos1, i; + uchar_t *pos2; + + blknum = ntohl(*((unsigned int *)(buf))); + orig_size = ntohll(*((ssize_t *)(buf + sizeof (unsigned int)))); + blkarr = (unsigned int *)(buf + RABIN_HDR_SIZE); + overhead = blknum * RABIN_ENTRY_SIZE + RABIN_HDR_SIZE; + pos1 = overhead; + pos2 = ctx->cbuf; + sz = 0; + ctx->valid = 1; + + for (blk = 0; blk < blknum; blk++) { + len = ntohl(blkarr[blk]); + if (len <= RAB_POLYNOMIAL_MAX_BLOCK_SIZE) { + ctx->blocks[blk].length = len; + ctx->blocks[blk].offset = pos1; + pos1 += len; + } else { + ctx->blocks[blk].length = 0; + ctx->blocks[blk].index = len - RAB_POLYNOMIAL_MAX_BLOCK_SIZE - 1; + } + } + for (blk = 0; blk < blknum; blk++) { + if (ctx->blocks[blk].length > 0) { + len = ctx->blocks[blk].length; + pos1 = ctx->blocks[blk].offset; + } else { + oblk = ctx->blocks[blk].index; + len = ctx->blocks[oblk].length; + pos1 = ctx->blocks[oblk].offset; + } + memcpy(pos2, buf + pos1, len); + pos2 += len; + sz += len; + if (sz > orig_size) { + ctx->valid = 0; + break; + } + } + if (ctx->valid && sz < orig_size) { + ctx->valid = 0; + } + *size = orig_size; } diff --git a/rabin/rabin_polynomial.h b/rabin/rabin_polynomial.h index c25582d..110999d 100644 --- a/rabin/rabin_polynomial.h +++ b/rabin/rabin_polynomial.h @@ -56,30 +56,68 @@ * */ +#ifndef _RABIN_POLY_H_ +#define _RABIN_POLY_H_ + +#include "utils.h" + //List of constants, mostly constraints and defaults for various parameters //to the Rabin Fingerprinting algorithm -#define RAB_POLYNOMIAL_CONST 2 +#define RAB_POLYNOMIAL_CONST 2 // 1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT = Average Rabin Chunk Size // So we are always looking at power of 2 chunk sizes to avoid doing a modulus // -// A value of 11 below gives block size of 2048 bytes +// A value of 12 below gives avg block size of 4096 bytes // -#define RAB_POLYNOMIAL_AVG_BLOCK_SHIFT 11 -#define RAB_POLYNOMIAL_AVG_BLOCK_SIZE (1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT) -#define RAB_POLYNOMIAL_AVG_BLOCK_MASK (RAB_POLYNOMIAL_AVG_BLOCK_SIZE - 1) -#define RAB_POLYNOMIAL_WIN_SIZE 32 -#define RAB_POLYNOMIAL_MIN_WIN_SIZE 17 -#define RAB_POLYNOMIAL_MAX_WIN_SIZE 63 +#define RAB_POLYNOMIAL_AVG_BLOCK_SHIFT 12 +#define RAB_POLYNOMIAL_AVG_BLOCK_SIZE (1 << RAB_POLYNOMIAL_AVG_BLOCK_SHIFT) +#define RAB_POLYNOMIAL_AVG_BLOCK_MASK (RAB_POLYNOMIAL_AVG_BLOCK_SIZE - 1) +#define RAB_POLYNOMIAL_MIN_BLOCK_SIZE (4096) +#define RAB_POLYNOMIAL_MAX_BLOCK_SIZE (128 * 1024) +#define RAB_POLYNOMIAL_WIN_SIZE 32 +#define RAB_POLYNOMIAL_MIN_WIN_SIZE 17 +#define RAB_POLYNOMIAL_MAX_WIN_SIZE 63 + +typedef struct { + ssize_t offset; + uint64_t checksum; + unsigned int index; + unsigned int length; +} rabin_blockentry_t; + +// An entry in the Rabin block array in the chunk. +// It is either a length value <= RAB_POLYNOMIAL_MAX_BLOCK_SIZE or +// if value > RAB_POLYNOMIAL_MAX_BLOCK_SIZE then +// value - RAB_POLYNOMIAL_MAX_BLOCK_SIZE is index of block with which +// this block is a duplicate. +// Offset can be dynamically calculated. +// +#define RABIN_ENTRY_SIZE (sizeof (unsigned int)) + +// Header for a chunk deduped using Rabin +// Number of rabin blocks, size of original chunk +// +#define RABIN_HDR_SIZE (sizeof (unsigned int) + sizeof (ssize_t)) typedef struct { unsigned char *current_window_data; + rabin_blockentry_t *blocks; + unsigned char *cbuf; + unsigned char *buf; int window_pos; uint64_t cur_roll_checksum; + uint64_t cur_checksum; + uint64_t block_checksum; + int dedup; + int valid; } rabin_context_t; -extern rabin_context_t *create_rabin_context(); +extern rabin_context_t *create_rabin_context(uint64_t chunksize); extern void destroy_rabin_context(rabin_context_t *ctx); -extern ssize_t scan_rabin_chunks(rabin_context_t *ctx, void *buf, - ssize_t size, ssize_t offset); +extern void rabin_dedup(rabin_context_t *ctx, unsigned char *buf, + ssize_t *size, ssize_t offset); +extern void rabin_inverse_dedup(rabin_context_t *ctx, uchar_t *buf, ssize_t *size); +extern void reset_rabin_context(rabin_context_t *ctx); +#endif /* _RABIN_POLY_H_ */ \ No newline at end of file diff --git a/utils.c b/utils.c index e80281c..00c0bc2 100644 --- a/utils.c +++ b/utils.c @@ -194,30 +194,6 @@ Read(int fd, void *buf, size_t count) return (count - rem); } -ssize_t -Read2(int fd, void *buf, size_t count, ssize_t *rabin_count, void *ctx) -{ - char *buf2; - ssize_t rcount; - rabin_context_t *rctx = (rabin_context_t *)ctx; - - if (!ctx) return (Read(fd, buf, count)); - buf2 = buf; - if (*rabin_count) { - buf2 = (char *)buf + *rabin_count; - count -= *rabin_count; - } - rcount = Read(fd, buf2, count); - if (rcount > 0) { - rcount += *rabin_count; - *rabin_count = scan_rabin_chunks(rctx, buf, rcount, *rabin_count); - } else { - if (rcount == 0) rcount = *rabin_count; - *rabin_count = 0; - } - return (rcount); -} - ssize_t Write(int fd, const void *buf, size_t count) { diff --git a/utils.h b/utils.h index 8142e98..ce1f9c3 100644 --- a/utils.h +++ b/utils.h @@ -98,7 +98,7 @@ extern int parse_numeric(ssize_t *val, const char *str); extern char *bytes_to_size(uint64_t bytes); extern ssize_t Read(int fd, void *buf, size_t count); extern ssize_t Write(int fd, const void *buf, size_t count); -extern ssize_t Read2(int fd, void *buf, size_t count, +extern ssize_t Dedup_Read(int fd, uchar_t **buf, size_t count, ssize_t *rabin_count, void *ctx); /*