diff --git a/archive/pc_archive.c b/archive/pc_archive.c index 6e2a977..2e2bd12 100644 --- a/archive/pc_archive.c +++ b/archive/pc_archive.c @@ -23,6 +23,13 @@ * */ +/* + * This file includes all the archiving related functions. Pathnames are sorted + * based on extension (or first 4 chars of name if no extension) and size. A simple + * external merge sort is used. This sorting yields better compression ratio. + * + * Sorting is enabled for compression levels greater than 2. + */ #include #include #include @@ -57,14 +64,142 @@ AE_IFIFO Named pipe (fifo) #define ARC_ENTRY_OVRHEAD 500 #define ARC_SCRATCH_BUFF_SIZE (64 *1024) #define MMAP_SIZE (1024 * 1024) +#define SORT_BUF_SIZE (65536) +#define NAMELEN 4 + +typedef struct member_entry { + char name[NAMELEN]; + uint32_t file_pos; // 32-bit file position to limit memory usage. + uint64_t size; +} member_entry_t; + +struct sort_buf { + member_entry_t members[SORT_BUF_SIZE]; // Use 1MB per sorted buffer + int pos, max; + struct sort_buf *next; +}; static struct arc_list_state { uchar_t *pbuf; - uint64_t bufsiz, bufpos, arc_size; + uint64_t bufsiz, bufpos, arc_size, pathlist_size; + uint32_t fcount; int fd; + struct sort_buf *srt, *head; + int srt_pos; } a_state; + pthread_mutex_t nftw_mutex = PTHREAD_MUTEX_INITIALIZER; +/* + * Comparison function for sorting pathname mambers. Sort by name/extension and then + * by size. + */ +static int +compare_members(const void *a, const void *b) { + int rv, i; + member_entry_t *mem1 = (member_entry_t *)a; + member_entry_t *mem2 = (member_entry_t *)b; + + rv = 0; + for (i = 0; i < NAMELEN; i++) { + rv = mem1->name[i] - mem2->name[i]; + if (rv != 0) + return (rv); + } + if (mem1->size > mem2->size) + return (1); + else if (mem1->size < mem2->size) + return (-1); + return (0); +} + +/* + * Tell if path entry mem1 is "less than" path entry mem2. This function + * is used during the merge phase. + */ +static int +compare_members_lt(member_entry_t *mem1, member_entry_t *mem2) { + int rv, i; + + rv = 0; + for (i = 0; i < NAMELEN; i++) { + rv = mem1->name[i] - mem2->name[i]; + if (rv < 0) + return (1); + else if (rv > 0) + return (0); + } + if (mem1->size < mem2->size) + return (1); + return (0); +} + +/* + * Fetch the next entry from the pathlist file. If we are doing sorting then this + * fetches the next entry in ascending order of the predetermined sort keys. + */ +int +read_next(pc_ctx_t *pctx, char *fpath) +{ + short namelen; + ssize_t rbytes; + + if (pctx->enable_archive_sort) { + member_entry_t *mem1, *mem2; + struct sort_buf *srt, *srt1, *psrt, *psrt1; + + /* + * Here we have a set of sorted buffers and we do the external merge phase where + * we pop the buffer entry that is smallest. + */ + srt = (struct sort_buf *)pctx->archive_sort_buf; + if (!srt) return (0); + srt1 = srt; + psrt = srt; + psrt1 = psrt; + mem1 = &(srt->members[srt->pos]); + srt = srt->next; + while (srt) { + mem2 = &(srt->members[srt->pos]); + if (compare_members_lt(mem2, mem1)) { + mem1 = mem2; + srt1 = srt; + psrt1 = psrt; + } + psrt = srt; + srt = srt->next; + } + + if (lseek(pctx->archive_members_fd, mem1->file_pos, SEEK_SET) == (off_t)-1) { + log_msg(LOG_ERR, 1, "Error seeking in archive members file."); + return (-1); + } + srt1->pos++; + if (srt1->pos > srt1->max) { + if (srt1 == pctx->archive_sort_buf) { + pctx->archive_sort_buf = srt1->next; + free(srt1); + } else { + psrt1->next = srt1->next; + free(srt1); + } + } + } + if ((rbytes = Read(pctx->archive_members_fd, &namelen, sizeof(namelen))) != 0) { + if (rbytes < 2) { + log_msg(LOG_ERR, 1, "Error reading archive members file."); + return (-1); + } + rbytes = Read(pctx->archive_members_fd, fpath, namelen); + if (rbytes < namelen) { + log_msg(LOG_ERR, 1, "Error reading archive members file."); + return (-1); + } + fpath[namelen] = '\0'; + } + return (rbytes); +} + /* * Build list of pathnames in a temp file. */ @@ -74,12 +209,18 @@ add_pathname(const char *fpath, const struct stat *sb, { short len; uchar_t *buf; + const char *basename; if (tflag == FTW_DP) return (0); if (tflag == FTW_DNR || tflag == FTW_NS) { log_msg(LOG_WARN, 0, "Cannot access %s\n", fpath); return (0); } + + /* + * Pathname entries are pushed into a memory buffer till buffer is full. The + * buffer is then flushed to disk. This is for decent performance. + */ a_state.arc_size += (sb->st_size + ARC_ENTRY_OVRHEAD); len = strlen(fpath); if (a_state.bufpos + len + 14 > a_state.bufsiz) { @@ -89,12 +230,92 @@ add_pathname(const char *fpath, const struct stat *sb, return (-1); } a_state.bufpos = 0; + a_state.pathlist_size += wrtn; } + + /* + * If we are sorting path entries then sort per buffer and then merge when iterating + * through all the path entries. + */ + if (a_state.srt) { + member_entry_t *member; + int i; + char *dot; + + basename = &fpath[ftwbuf->base]; + if (a_state.srt_pos == SORT_BUF_SIZE) { + struct sort_buf *srt; + + /* + * Sort Buffer is full so sort it. Sorting is done by file extension and size. + * If file has no extension then first 4 chars of the filename are used. + */ + srt = (struct sort_buf *)malloc(sizeof (struct sort_buf)); + if (srt == NULL) { + log_msg(LOG_WARN, 0, "Out of memory for sort buffer. Continuing without sorting."); + a_state.srt = a_state.head; + while (a_state.srt) { + struct sort_buf *srt; + srt = a_state.srt->next; + free(a_state.srt); + a_state.srt = srt; + goto cont; + } + } else { + log_msg(LOG_INFO, 0, "Sorting ..."); + a_state.srt->max = a_state.srt_pos - 1; + qsort(a_state.srt->members, SORT_BUF_SIZE, sizeof (member_entry_t), compare_members); + srt->next = NULL; + srt->pos = 0; + a_state.srt->next = srt; + a_state.srt = srt; + a_state.srt_pos = 0; + } + } + + /* + * The total size of path list file that can be handled when sorting is 4GB to + * limit memory usage. If total accumulated path entries exceed 4GB in bytes, + * we abort sorting. This is large enough to handle all practical scenarios + * except in the case of millions of pathname entries each having PATH_MAX length! + */ + if (a_state.pathlist_size + a_state.bufpos >= UINT_MAX) { + log_msg(LOG_WARN, 0, "Too many pathnames. Continuing without sorting."); + a_state.srt = a_state.head; + while (a_state.srt) { + struct sort_buf *srt; + srt = a_state.srt->next; + free(a_state.srt); + a_state.srt = srt; + goto cont; + } + } + member = &(a_state.srt->members[a_state.srt_pos++]); + member->size = sb->st_size; + member->file_pos = a_state.pathlist_size + a_state.bufpos; + dot = strrchr(basename, '.'); + + // Small NAMELEN so these loops will be unrolled by compiler. + for (i = 0; i < NAMELEN; i++) member->name[i] = 0; + i = 0; + if (!dot) { + while (basename[i] != '\0' && i < NAMELEN) { + member->name[i] = basename[i]; i++; + } + } else { + dot++; + while (dot[i] != '\0' && i < NAMELEN) { + member->name[i] = dot[i]; i++; + } + } + } +cont: buf = a_state.pbuf + a_state.bufpos; *((short *)buf) = len; buf += 2; memcpy(buf, fpath, len); a_state.bufpos += (len + 2); + a_state.fcount++; return (0); } @@ -112,6 +333,25 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) struct archive *arc; struct fn_list *fn; + /* + * If sorting is enabled create the initial sort buffer. + */ + if (pctx->enable_archive_sort) { + struct sort_buf *srt; + srt = (struct sort_buf *)malloc(sizeof (struct sort_buf)); + if (srt == NULL) { + log_msg(LOG_ERR, 0, "Out of memory."); + return (-1); + } + srt->next = NULL; + srt->pos = 0; + pctx->archive_sort_buf = srt; + } + + /* + * Create a temporary file to hold the generated list of pathnames to be archived. + * Storing in a file saves memory usage and allows scalability. + */ tmpfile = pctx->archive_members_file; tmp = get_temp_dir(); strcpy(tmpfile, tmp); @@ -132,19 +372,30 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) } /* - * nftw requires using global state variable. So we lock to be mt-safe. - * This means only one directory tree scan can happen at a time. + * Use nftw() to scan all the directory hierarchies provided on the command + * line and generate a consolidated list of pathnames to be archived. By + * doing this we can sort the pathnames and estimate the total archive size. + * Total archive size is needed by the subsequent compression stages. */ log_msg(LOG_INFO, 0, "Scanning files."); sbuf->st_size = 0; pctx->archive_size = 0; + pctx->archive_members_count = 0; + + /* + * nftw requires using global state variable. So we lock to be mt-safe. + * This means only one directory tree scan can happen at a time. + */ pthread_mutex_lock(&nftw_mutex); fn = pctx->fn; a_state.pbuf = pbuf; a_state.bufsiz = pctx->chunksize; a_state.bufpos = 0; - a_state.arc_size = 0; a_state.fd = fd; + a_state.srt = pctx->archive_sort_buf; + a_state.srt_pos = 0; + a_state.head = a_state.srt; + a_state.pathlist_size = 0; while (fn) { struct stat sb; @@ -155,6 +406,8 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) continue; } + a_state.arc_size = 0; + a_state.fcount = 0; if (S_ISDIR(sb.st_mode)) { err = nftw(fn->filename, add_pathname, 1024, FTW_PHYS); } else { @@ -165,6 +418,7 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) else tflag = FTW_F; add_pathname(fn->filename, &sb, tflag, NULL); + a_state.arc_size = sb.st_size; } if (a_state.bufpos > 0) { ssize_t wrtn = Write(a_state.fd, a_state.pbuf, a_state.bufpos); @@ -176,9 +430,19 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) a_state.bufpos = 0; } pctx->archive_size += a_state.arc_size; + pctx->archive_members_count += a_state.fcount; fn = fn->next; } + + if (a_state.srt == NULL) { + pctx->enable_archive_sort = 0; + } else { + log_msg(LOG_INFO, 0, "Sorting ..."); + a_state.srt->max = a_state.srt_pos - 1; + qsort(a_state.srt->members, a_state.srt_pos, sizeof (member_entry_t), compare_members); + } pthread_mutex_unlock(&nftw_mutex); + sbuf->st_size = pctx->archive_size; lseek(fd, 0, SEEK_SET); free(pbuf); @@ -186,6 +450,11 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) sbuf->st_gid = getegid(); sbuf->st_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + /* + * Generate a pipe. The archiver writes to one end of the pipe and the + * compression stages read from the other end. This allowed plugging in + * archiving with the minimum of changes to the rest of Pcompress. + */ if (pipe(pipefd) == -1) { log_msg(LOG_ERR, 1, "Unable to create archiver pipe.\n"); close(fd); unlink(tmpfile); @@ -209,6 +478,9 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) return (0); } +/* + * This creates a libarchive context for extracting members to disk. + */ int setup_extractor(pc_ctx_t *pctx) { @@ -326,9 +598,8 @@ static void * archiver_thread_func(void *dat) { pc_ctx_t *pctx = (pc_ctx_t *)dat; char fpath[PATH_MAX], *name; - ssize_t rbytes; - short namelen; - int warn; + int warn, rbytes; + uint32_t ctr; struct archive_entry *entry, *spare_entry, *ent; struct archive *arc, *ard; struct archive_entry_linkresolver *resolver; @@ -343,24 +614,16 @@ archiver_thread_func(void *dat) { log_msg(LOG_WARN, 0, "Cannot create link resolver, hardlinks will be duplicated."); } + ctr = 1; ard = archive_read_disk_new(); archive_read_disk_set_standard_lookup(ard); archive_read_disk_set_symlink_physical(ard); /* - * Read next path entry from list file. + * Read next path entry from list file. read_next() also handles sorted reading. */ - while ((rbytes = Read(pctx->archive_members_fd, &namelen, sizeof(namelen))) != 0) { - if (rbytes < 2) { - log_msg(LOG_ERR, 1, "Error reading archive members file."); - break; - } - rbytes = Read(pctx->archive_members_fd, fpath, namelen); - if (rbytes < namelen) { - log_msg(LOG_ERR, 1, "Error reading archive members file."); - break; - } - fpath[namelen] = '\0'; + while ((rbytes = read_next(pctx, fpath)) != 0) { + if (rbytes == -1) break; archive_entry_copy_sourcepath(entry, fpath); if (archive_read_disk_entry_from_file(ard, entry, -1, NULL) != ARCHIVE_OK) { log_msg(LOG_WARN, 1, "archive_read_disk_entry_from_file:\n %s", archive_error_string(ard)); @@ -390,7 +653,8 @@ archiver_thread_func(void *dat) { archive_entry_set_size(entry, 0); } if (pctx->verbose) - log_msg(LOG_INFO, 0, "%10d %s", archive_entry_size(entry), name); + log_msg(LOG_INFO, 0, "%5d/%5d %8d %s", ctr, pctx->archive_members_count, + archive_entry_size(entry), name); archive_entry_linkify(resolver, &entry, &spare_entry); ent = entry; @@ -402,6 +666,7 @@ archiver_thread_func(void *dat) { spare_entry = NULL; } archive_entry_clear(entry); + ctr++; } done: @@ -429,6 +694,7 @@ extractor_thread_func(void *dat) { pc_ctx_t *pctx = (pc_ctx_t *)dat; char cwd[PATH_MAX], got_cwd; int flags, rv; + uint32_t ctr; struct archive_entry *entry; struct archive *awd, *arc; @@ -448,6 +714,7 @@ extractor_thread_func(void *dat) { goto done; } + ctr = 1; awd = archive_write_disk_new(); archive_write_disk_set_options(awd, flags); archive_write_disk_set_standard_lookup(awd); @@ -477,7 +744,7 @@ extractor_thread_func(void *dat) { archive_error_string(arc)); } else if (pctx->verbose) { - log_msg(LOG_INFO, 0, "%10d %s", archive_entry_size(entry), + log_msg(LOG_INFO, 0, "%5d %8d %s", ctr, archive_entry_size(entry), archive_entry_pathname(entry)); } @@ -485,6 +752,7 @@ extractor_thread_func(void *dat) { log_msg(LOG_ERR, 0, "Fatal error aborting extraction."); break; } + ctr++; } if (got_cwd) { diff --git a/pcompress.c b/pcompress.c index 45769d7..c16185f 100644 --- a/pcompress.c +++ b/pcompress.c @@ -2716,7 +2716,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) strcpy(pctx->exec_name, pos); pthread_mutex_lock(&opt_parse); - while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:rLPS:B:Fk:av")) != -1) { + while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:rLPS:B:Fk:avn")) != -1) { int ovr; int64_t chunksize; @@ -2857,6 +2857,10 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) pctx->verbose = 1; break; + case 'n': + pctx->enable_archive_sort = -1; + break; + case '?': default: return (2); @@ -2879,6 +2883,15 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) } } + /* + * Sorting of members when archiving is enabled for compression levels >2, unless + * it is explicitly disabled via '-n'. + */ + if (pctx->level > 2 && pctx->enable_archive_sort != -1) { + pctx->enable_archive_sort = 1; + } + if (pctx->enable_archive_sort == -1) pctx->enable_archive_sort = 0; + if (pctx->rab_blk_size == -1) { if (!pctx->enable_rabin_global) pctx->rab_blk_size = 0; diff --git a/pcompress.h b/pcompress.h index 29b46e6..ed137af 100644 --- a/pcompress.h +++ b/pcompress.h @@ -200,9 +200,11 @@ typedef struct pc_ctx { int encrypt_type; int archive_mode; int verbose; + int enable_archive_sort; char archive_members_file[MAXPATHLEN]; int archive_members_fd, archive_data_fd; - void *archive_ctx; + uint32_t archive_members_count; + void *archive_ctx, *archive_sort_buf; pthread_t archive_thread; int uncompfd, compfd; char archive_temp_file[MAXPATHLEN];