diff --git a/archive/pc_archive.c b/archive/pc_archive.c index d3f89fa..c6be1f7 100644 --- a/archive/pc_archive.c +++ b/archive/pc_archive.c @@ -91,6 +91,121 @@ static struct arc_list_state { pthread_mutex_t nftw_mutex = PTHREAD_MUTEX_INITIALIZER; +/* + * Archive writer callback routines for archive creation operation. + */ +static int +creat_open_callback(struct archive *arc, void *ctx) +{ + pc_ctx_t *pctx = (pc_ctx_t *)ctx; + + sem_init(&(pctx->read_sem), 0, 0); + sem_init(&(pctx->write_sem), 0, 0); + pctx->arc_buf = NULL; + pctx->arc_buf_pos = 0; + pctx->arc_buf_size = 0; + return (ARCHIVE_OK); +} + +static int +creat_close_callback(struct archive *arc, void *ctx) +{ + pc_ctx_t *pctx = (pc_ctx_t *)ctx; + + pctx->arc_closed = 1; + if (pctx->arc_buf) { + sem_post(&(pctx->read_sem)); + } else { + pctx->arc_buf_pos = 0; + } + return (ARCHIVE_OK); +} + +static ssize_t +creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len) +{ + uchar_t *buff = (uchar_t *)buf; + pc_ctx_t *pctx = (pc_ctx_t *)ctx; + size_t remaining; + + if (pctx->arc_closed) { + archive_set_error(arc, ARCHIVE_EOF, "End of file when writing archive."); + return (-1); + } + + if (!pctx->arc_writing) { + sem_wait(&(pctx->write_sem)); + } + + if (pctx->arc_buf == NULL || pctx->arc_buf_size == 0) { + archive_set_error(arc, ARCHIVE_EOF, "End of file when writing archive."); + return (-1); + } + pctx->arc_writing = 1; + + remaining = len; + while (remaining && !pctx->arc_closed) { + uchar_t *tbuf; + + tbuf = pctx->arc_buf + pctx->arc_buf_pos; + if (remaining > pctx->arc_buf_size - pctx->arc_buf_pos) { + size_t nlen = pctx->arc_buf_size - pctx->arc_buf_pos; + memcpy(tbuf, buff, nlen); + remaining -= nlen; + pctx->arc_buf_pos += nlen; + buff += nlen; + pctx->arc_writing = 0; + sem_post(&(pctx->read_sem)); + sem_wait(&(pctx->write_sem)); + pctx->arc_writing = 1; + } else { + memcpy(tbuf, buff, remaining); + pctx->arc_buf_pos += remaining; + remaining = 0; + if (pctx->arc_buf_pos == pctx->arc_buf_size) { + pctx->arc_writing = 0; + sem_post(&(pctx->read_sem)); + } + break; + } + } + + return (len - remaining); +} + +int64_t +archiver_read(void *ctx, void *buf, uint64_t count) +{ + pc_ctx_t *pctx = (pc_ctx_t *)ctx; + + if (pctx->arc_closed) + return (0); + + if (pctx->arc_buf != NULL) { + log_msg(LOG_ERR, 0, "Incorrect sequencing of archiver_read() call."); + return (-1); + } + + pctx->arc_buf = buf; + pctx->arc_buf_size = count; + pctx->arc_buf_pos = 0; + sem_post(&(pctx->write_sem)); + sem_wait(&(pctx->read_sem)); + pctx->arc_buf = NULL; + return (pctx->arc_buf_pos); +} + +int +archiver_close(void *ctx) +{ + pc_ctx_t *pctx = (pc_ctx_t *)ctx; + + pctx->arc_closed = 1; + sem_post(&(pctx->write_sem)); + sem_post(&(pctx->read_sem)); + return (0); +} + /* * Comparison function for sorting pathname mambers. Sort by name/extension and then * by size. @@ -172,6 +287,10 @@ read_next_path(pc_ctx_t *pctx, char *fpath) srt = srt->next; } + /* + * If we are not using mmap then seek to the position of the current entry, otherwise + * just note the entry position. + */ if (pctx->temp_mmap_len == 0) { if (lseek(pctx->archive_members_fd, mem1->file_pos, SEEK_SET) == (off_t)-1) { log_msg(LOG_ERR, 1, "Error seeking in archive members file."); @@ -209,10 +328,10 @@ do_mmap: munmap(pctx->temp_mmap_buf, pctx->temp_mmap_len); adj = pctx->temp_file_pos % pctx->pagesize; pctx->temp_mmap_pos = pctx->temp_file_pos - adj; - pctx->temp_mmap_len = pctx->archive_temp_size - pctx->temp_file_pos; + pctx->temp_mmap_len = pctx->archive_temp_size - pctx->temp_mmap_pos; if (pctx->temp_mmap_len > TEMP_MMAP_SIZE) - pctx->temp_mmap_len = TEMP_MMAP_SIZE; + pctx->temp_mmap_len = TEMP_MMAP_SIZE ; pctx->temp_mmap_buf = mmap(NULL, pctx->temp_mmap_len, PROT_READ, MAP_SHARED, pctx->archive_members_fd, pctx->temp_mmap_pos); if (pctx->temp_mmap_buf == NULL) { @@ -225,6 +344,12 @@ do_mmap: buf = pctx->temp_mmap_buf + (pctx->temp_file_pos - pctx->temp_mmap_pos); namelen = U32_P(buf); pctx->temp_file_pos += 2; + + /* + * If length of pathname entry exceeds current mmap region, repeat mmap + * at the entry offset. Only one repeat attempt is made. If there is a + * failure then we give up. + */ if (pctx->temp_mmap_len - (pctx->temp_file_pos - pctx->temp_mmap_pos) < namelen) { if (!retried) { pctx->temp_file_pos -= 2; @@ -243,6 +368,9 @@ do_mmap: return (namelen); } + /* + * This code is used if mmap is not being used for the pathlist file. + */ if ((rbytes = Read(pctx->archive_members_fd, &namelen, sizeof(namelen))) != 0) { if (rbytes < 2) { log_msg(LOG_ERR, 1, "Error reading archive members file."); @@ -386,7 +514,7 @@ int setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) { char *tmpfile, *tmp; - int err, fd, pipefd[2]; + int err, fd; uchar_t *pbuf; struct archive *arc; struct fn_list *fn; @@ -510,28 +638,16 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) sbuf->st_gid = getegid(); sbuf->st_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; - /* - * Generate a pipe. The archiver writes to one end of the pipe and the - * compression stages read from the other end. This allowed plugging in - * archiving with the minimum of changes to the rest of Pcompress. - */ - if (pipe(pipefd) == -1) { - log_msg(LOG_ERR, 1, "Unable to create archiver pipe.\n"); - close(fd); unlink(tmpfile); - return (-1); - } - - pctx->uncompfd = pipefd[0]; // Read side - pctx->archive_data_fd = pipefd[1]; // Write side arc = archive_write_new(); if (!arc) { log_msg(LOG_ERR, 1, "Unable to create libarchive context.\n"); - close(fd); close(pipefd[0]); close(pipefd[1]); + close(fd); unlink(tmpfile); return (-1); } archive_write_set_format_pax_restricted(arc); - archive_write_open_fd(arc, pctx->archive_data_fd); + archive_write_open(arc, pctx, creat_open_callback, + creat_write_callback, creat_close_callback); pctx->archive_ctx = arc; pctx->archive_members_fd = fd; pctx->temp_mmap_len = TEMP_MMAP_SIZE; @@ -542,6 +658,7 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) pctx->temp_mmap_len = 0; } pctx->temp_mmap_pos = 0; + pctx->arc_writing = 0; return (0); } diff --git a/archive/pc_archive.h b/archive/pc_archive.h index c0c83a2..5fe657e 100644 --- a/archive/pc_archive.h +++ b/archive/pc_archive.h @@ -45,6 +45,8 @@ int setup_archiver(pc_ctx_t *pctx, struct stat *sbuf); int start_archiver(pc_ctx_t *pctx); int setup_extractor(pc_ctx_t *pctx); int start_extractor(pc_ctx_t *pctx); +int64_t archiver_read(void *ctx, void *buf, uint64_t count); +int archiver_close(void *ctx); #ifdef __cplusplus } diff --git a/pcompress.c b/pcompress.c index 4dfa971..c2fb648 100644 --- a/pcompress.c +++ b/pcompress.c @@ -1901,12 +1901,6 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev log_msg(LOG_ERR, 0, "Setup archiver failed."); return (1); } - - /* - * This is a pipe between the libarchive based archiving process and - * the rest of the compression stuff. - */ - uncompfd = pctx->uncompfd; } /* @@ -2292,9 +2286,12 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props, pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pctx->pipe_mode, nprocs); - rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); + rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx, pctx); } else { - rbytes = Read(uncompfd, cread_buf, chunksize); + if (pctx->archive_mode) + rbytes = archiver_read(pctx, cread_buf, chunksize); + else + rbytes = Read(uncompfd, cread_buf, chunksize); } while (!bail) { @@ -2386,6 +2383,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev COMP_BAIL; } } + /* Signal the compression thread to start */ sem_post(&tdat->start_sem); ++(pctx->chunk_num); @@ -2400,9 +2398,12 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev * buffer is in progress. */ if (pctx->enable_rabin_split) { - rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); + rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx, pctx); } else { - rbytes = Read(uncompfd, cread_buf, chunksize); + if (pctx->archive_mode) + rbytes = archiver_read(pctx, cread_buf, chunksize); + else + rbytes = Read(uncompfd, cread_buf, chunksize); } } } @@ -2425,6 +2426,8 @@ comp_done: */ if (!pctx->pipe_mode) { if (uncompfd != -1) close(uncompfd); + if (pctx->archive_mode) + archiver_close(pctx); } if (pctx->t_errored) err = pctx->t_errored; diff --git a/pcompress.h b/pcompress.h index e2d9d54..dcd965a 100644 --- a/pcompress.h +++ b/pcompress.h @@ -202,24 +202,33 @@ typedef struct pc_ctx { int verbose; int enable_archive_sort; int pagesize; + int uncompfd, compfd; + + /* + * Archiving related context data. + */ char archive_members_file[MAXPATHLEN]; int archive_members_fd, archive_data_fd; uint32_t archive_members_count; void *archive_ctx, *archive_sort_buf; pthread_t archive_thread; - int uncompfd, compfd; char archive_temp_file[MAXPATHLEN]; int archive_temp_fd; - uint64_t archive_temp_size; + uint64_t archive_temp_size, archive_size; uchar_t *temp_mmap_buf; uint64_t temp_mmap_pos, temp_file_pos; uint64_t temp_mmap_len; + struct fn_list *fn; + sem_t read_sem, write_sem; + uchar_t *arc_buf; + uint64_t arc_buf_size, arc_buf_pos; + int arc_closed, arc_writing; + unsigned int chunk_num; uint64_t largest_chunk, smallest_chunk, avg_chunk; - uint64_t chunksize, archive_size; + uint64_t chunksize; const char *algo, *filename; char *to_filename; - struct fn_list *fn; char *exec_name; int do_compress, level; int do_uncompress; diff --git a/utils/utils.c b/utils/utils.c index 0f76107..a73de5c 100644 --- a/utils/utils.c +++ b/utils/utils.c @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -211,19 +212,27 @@ Read(int fd, void *buf, uint64_t count) * after the previous rabin boundary. */ int64_t -Read_Adjusted(int fd, uchar_t *buf, uint64_t count, int64_t *rabin_count, void *ctx) +Read_Adjusted(int fd, uchar_t *buf, uint64_t count, int64_t *rabin_count, void *ctx, void *pctx) { uchar_t *buf2; int64_t rcount; dedupe_context_t *rctx = (dedupe_context_t *)ctx; - if (!ctx) return (Read(fd, buf, count)); + if (!ctx) { + if (pctx) + return (archiver_read(pctx, buf, count)); + else + return (Read(fd, buf, count)); + } buf2 = buf; if (*rabin_count) { buf2 = buf + *rabin_count; count -= *rabin_count; } - rcount = Read(fd, buf2, count); + if (pctx) + rcount = archiver_read(pctx, buf2, count); + else + rcount = Read(fd, buf2, count); if (rcount > 0) { rcount += *rabin_count; if (rcount == count) { diff --git a/utils/utils.h b/utils/utils.h index fdfa3a7..a506f83 100644 --- a/utils/utils.h +++ b/utils/utils.h @@ -238,7 +238,7 @@ extern int parse_numeric(int64_t *val, const char *str); extern char *bytes_to_size(uint64_t bytes); extern int64_t Read(int fd, void *buf, uint64_t count); extern int64_t Read_Adjusted(int fd, uchar_t *buf, uint64_t count, - int64_t *rabin_count, void *ctx); + int64_t *rabin_count, void *ctx, void *pctx); extern int64_t Write(int fd, const void *buf, uint64_t count); extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs, algo_threads_type_t typ);