Avoid using pipe during archive creation. Use semaphores and direct memory copy.

This commit is contained in:
Moinak Ghosh 2013-11-02 23:43:59 +05:30
parent a374ca5909
commit 7ed532133e
6 changed files with 176 additions and 36 deletions

View file

@ -91,6 +91,121 @@ static struct arc_list_state {
pthread_mutex_t nftw_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t nftw_mutex = PTHREAD_MUTEX_INITIALIZER;
/*
* Archive writer callback routines for archive creation operation.
*/
static int
creat_open_callback(struct archive *arc, void *ctx)
{
pc_ctx_t *pctx = (pc_ctx_t *)ctx;
sem_init(&(pctx->read_sem), 0, 0);
sem_init(&(pctx->write_sem), 0, 0);
pctx->arc_buf = NULL;
pctx->arc_buf_pos = 0;
pctx->arc_buf_size = 0;
return (ARCHIVE_OK);
}
static int
creat_close_callback(struct archive *arc, void *ctx)
{
pc_ctx_t *pctx = (pc_ctx_t *)ctx;
pctx->arc_closed = 1;
if (pctx->arc_buf) {
sem_post(&(pctx->read_sem));
} else {
pctx->arc_buf_pos = 0;
}
return (ARCHIVE_OK);
}
static ssize_t
creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len)
{
uchar_t *buff = (uchar_t *)buf;
pc_ctx_t *pctx = (pc_ctx_t *)ctx;
size_t remaining;
if (pctx->arc_closed) {
archive_set_error(arc, ARCHIVE_EOF, "End of file when writing archive.");
return (-1);
}
if (!pctx->arc_writing) {
sem_wait(&(pctx->write_sem));
}
if (pctx->arc_buf == NULL || pctx->arc_buf_size == 0) {
archive_set_error(arc, ARCHIVE_EOF, "End of file when writing archive.");
return (-1);
}
pctx->arc_writing = 1;
remaining = len;
while (remaining && !pctx->arc_closed) {
uchar_t *tbuf;
tbuf = pctx->arc_buf + pctx->arc_buf_pos;
if (remaining > pctx->arc_buf_size - pctx->arc_buf_pos) {
size_t nlen = pctx->arc_buf_size - pctx->arc_buf_pos;
memcpy(tbuf, buff, nlen);
remaining -= nlen;
pctx->arc_buf_pos += nlen;
buff += nlen;
pctx->arc_writing = 0;
sem_post(&(pctx->read_sem));
sem_wait(&(pctx->write_sem));
pctx->arc_writing = 1;
} else {
memcpy(tbuf, buff, remaining);
pctx->arc_buf_pos += remaining;
remaining = 0;
if (pctx->arc_buf_pos == pctx->arc_buf_size) {
pctx->arc_writing = 0;
sem_post(&(pctx->read_sem));
}
break;
}
}
return (len - remaining);
}
int64_t
archiver_read(void *ctx, void *buf, uint64_t count)
{
pc_ctx_t *pctx = (pc_ctx_t *)ctx;
if (pctx->arc_closed)
return (0);
if (pctx->arc_buf != NULL) {
log_msg(LOG_ERR, 0, "Incorrect sequencing of archiver_read() call.");
return (-1);
}
pctx->arc_buf = buf;
pctx->arc_buf_size = count;
pctx->arc_buf_pos = 0;
sem_post(&(pctx->write_sem));
sem_wait(&(pctx->read_sem));
pctx->arc_buf = NULL;
return (pctx->arc_buf_pos);
}
int
archiver_close(void *ctx)
{
pc_ctx_t *pctx = (pc_ctx_t *)ctx;
pctx->arc_closed = 1;
sem_post(&(pctx->write_sem));
sem_post(&(pctx->read_sem));
return (0);
}
/* /*
* Comparison function for sorting pathname mambers. Sort by name/extension and then * Comparison function for sorting pathname mambers. Sort by name/extension and then
* by size. * by size.
@ -172,6 +287,10 @@ read_next_path(pc_ctx_t *pctx, char *fpath)
srt = srt->next; srt = srt->next;
} }
/*
* If we are not using mmap then seek to the position of the current entry, otherwise
* just note the entry position.
*/
if (pctx->temp_mmap_len == 0) { if (pctx->temp_mmap_len == 0) {
if (lseek(pctx->archive_members_fd, mem1->file_pos, SEEK_SET) == (off_t)-1) { if (lseek(pctx->archive_members_fd, mem1->file_pos, SEEK_SET) == (off_t)-1) {
log_msg(LOG_ERR, 1, "Error seeking in archive members file."); log_msg(LOG_ERR, 1, "Error seeking in archive members file.");
@ -209,10 +328,10 @@ do_mmap:
munmap(pctx->temp_mmap_buf, pctx->temp_mmap_len); munmap(pctx->temp_mmap_buf, pctx->temp_mmap_len);
adj = pctx->temp_file_pos % pctx->pagesize; adj = pctx->temp_file_pos % pctx->pagesize;
pctx->temp_mmap_pos = pctx->temp_file_pos - adj; pctx->temp_mmap_pos = pctx->temp_file_pos - adj;
pctx->temp_mmap_len = pctx->archive_temp_size - pctx->temp_file_pos; pctx->temp_mmap_len = pctx->archive_temp_size - pctx->temp_mmap_pos;
if (pctx->temp_mmap_len > TEMP_MMAP_SIZE) if (pctx->temp_mmap_len > TEMP_MMAP_SIZE)
pctx->temp_mmap_len = TEMP_MMAP_SIZE; pctx->temp_mmap_len = TEMP_MMAP_SIZE ;
pctx->temp_mmap_buf = mmap(NULL, pctx->temp_mmap_len, PROT_READ, pctx->temp_mmap_buf = mmap(NULL, pctx->temp_mmap_len, PROT_READ,
MAP_SHARED, pctx->archive_members_fd, pctx->temp_mmap_pos); MAP_SHARED, pctx->archive_members_fd, pctx->temp_mmap_pos);
if (pctx->temp_mmap_buf == NULL) { if (pctx->temp_mmap_buf == NULL) {
@ -225,6 +344,12 @@ do_mmap:
buf = pctx->temp_mmap_buf + (pctx->temp_file_pos - pctx->temp_mmap_pos); buf = pctx->temp_mmap_buf + (pctx->temp_file_pos - pctx->temp_mmap_pos);
namelen = U32_P(buf); namelen = U32_P(buf);
pctx->temp_file_pos += 2; pctx->temp_file_pos += 2;
/*
* If length of pathname entry exceeds current mmap region, repeat mmap
* at the entry offset. Only one repeat attempt is made. If there is a
* failure then we give up.
*/
if (pctx->temp_mmap_len - (pctx->temp_file_pos - pctx->temp_mmap_pos) < namelen) { if (pctx->temp_mmap_len - (pctx->temp_file_pos - pctx->temp_mmap_pos) < namelen) {
if (!retried) { if (!retried) {
pctx->temp_file_pos -= 2; pctx->temp_file_pos -= 2;
@ -243,6 +368,9 @@ do_mmap:
return (namelen); return (namelen);
} }
/*
* This code is used if mmap is not being used for the pathlist file.
*/
if ((rbytes = Read(pctx->archive_members_fd, &namelen, sizeof(namelen))) != 0) { if ((rbytes = Read(pctx->archive_members_fd, &namelen, sizeof(namelen))) != 0) {
if (rbytes < 2) { if (rbytes < 2) {
log_msg(LOG_ERR, 1, "Error reading archive members file."); log_msg(LOG_ERR, 1, "Error reading archive members file.");
@ -386,7 +514,7 @@ int
setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) setup_archiver(pc_ctx_t *pctx, struct stat *sbuf)
{ {
char *tmpfile, *tmp; char *tmpfile, *tmp;
int err, fd, pipefd[2]; int err, fd;
uchar_t *pbuf; uchar_t *pbuf;
struct archive *arc; struct archive *arc;
struct fn_list *fn; struct fn_list *fn;
@ -510,28 +638,16 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf)
sbuf->st_gid = getegid(); sbuf->st_gid = getegid();
sbuf->st_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; sbuf->st_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
/*
* Generate a pipe. The archiver writes to one end of the pipe and the
* compression stages read from the other end. This allowed plugging in
* archiving with the minimum of changes to the rest of Pcompress.
*/
if (pipe(pipefd) == -1) {
log_msg(LOG_ERR, 1, "Unable to create archiver pipe.\n");
close(fd); unlink(tmpfile);
return (-1);
}
pctx->uncompfd = pipefd[0]; // Read side
pctx->archive_data_fd = pipefd[1]; // Write side
arc = archive_write_new(); arc = archive_write_new();
if (!arc) { if (!arc) {
log_msg(LOG_ERR, 1, "Unable to create libarchive context.\n"); log_msg(LOG_ERR, 1, "Unable to create libarchive context.\n");
close(fd); close(pipefd[0]); close(pipefd[1]); close(fd);
unlink(tmpfile); unlink(tmpfile);
return (-1); return (-1);
} }
archive_write_set_format_pax_restricted(arc); archive_write_set_format_pax_restricted(arc);
archive_write_open_fd(arc, pctx->archive_data_fd); archive_write_open(arc, pctx, creat_open_callback,
creat_write_callback, creat_close_callback);
pctx->archive_ctx = arc; pctx->archive_ctx = arc;
pctx->archive_members_fd = fd; pctx->archive_members_fd = fd;
pctx->temp_mmap_len = TEMP_MMAP_SIZE; pctx->temp_mmap_len = TEMP_MMAP_SIZE;
@ -542,6 +658,7 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf)
pctx->temp_mmap_len = 0; pctx->temp_mmap_len = 0;
} }
pctx->temp_mmap_pos = 0; pctx->temp_mmap_pos = 0;
pctx->arc_writing = 0;
return (0); return (0);
} }

View file

@ -45,6 +45,8 @@ int setup_archiver(pc_ctx_t *pctx, struct stat *sbuf);
int start_archiver(pc_ctx_t *pctx); int start_archiver(pc_ctx_t *pctx);
int setup_extractor(pc_ctx_t *pctx); int setup_extractor(pc_ctx_t *pctx);
int start_extractor(pc_ctx_t *pctx); int start_extractor(pc_ctx_t *pctx);
int64_t archiver_read(void *ctx, void *buf, uint64_t count);
int archiver_close(void *ctx);
#ifdef __cplusplus #ifdef __cplusplus
} }

View file

@ -1901,12 +1901,6 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
log_msg(LOG_ERR, 0, "Setup archiver failed."); log_msg(LOG_ERR, 0, "Setup archiver failed.");
return (1); return (1);
} }
/*
* This is a pipe between the libarchive based archiving process and
* the rest of the compression stuff.
*/
uncompfd = pctx->uncompfd;
} }
/* /*
@ -2292,9 +2286,12 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props, rctx = create_dedupe_context(chunksize, 0, pctx->rab_blk_size, pctx->algo, &props,
pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL, pctx->enable_delta_encode, pctx->enable_fixed_scan, VERSION, COMPRESS, 0, NULL,
pctx->pipe_mode, nprocs); pctx->pipe_mode, nprocs);
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx, pctx);
} else { } else {
rbytes = Read(uncompfd, cread_buf, chunksize); if (pctx->archive_mode)
rbytes = archiver_read(pctx, cread_buf, chunksize);
else
rbytes = Read(uncompfd, cread_buf, chunksize);
} }
while (!bail) { while (!bail) {
@ -2386,6 +2383,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
COMP_BAIL; COMP_BAIL;
} }
} }
/* Signal the compression thread to start */ /* Signal the compression thread to start */
sem_post(&tdat->start_sem); sem_post(&tdat->start_sem);
++(pctx->chunk_num); ++(pctx->chunk_num);
@ -2400,9 +2398,12 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
* buffer is in progress. * buffer is in progress.
*/ */
if (pctx->enable_rabin_split) { if (pctx->enable_rabin_split) {
rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx); rbytes = Read_Adjusted(uncompfd, cread_buf, chunksize, &rabin_count, rctx, pctx);
} else { } else {
rbytes = Read(uncompfd, cread_buf, chunksize); if (pctx->archive_mode)
rbytes = archiver_read(pctx, cread_buf, chunksize);
else
rbytes = Read(uncompfd, cread_buf, chunksize);
} }
} }
} }
@ -2425,6 +2426,8 @@ comp_done:
*/ */
if (!pctx->pipe_mode) { if (!pctx->pipe_mode) {
if (uncompfd != -1) close(uncompfd); if (uncompfd != -1) close(uncompfd);
if (pctx->archive_mode)
archiver_close(pctx);
} }
if (pctx->t_errored) err = pctx->t_errored; if (pctx->t_errored) err = pctx->t_errored;

View file

@ -202,24 +202,33 @@ typedef struct pc_ctx {
int verbose; int verbose;
int enable_archive_sort; int enable_archive_sort;
int pagesize; int pagesize;
int uncompfd, compfd;
/*
* Archiving related context data.
*/
char archive_members_file[MAXPATHLEN]; char archive_members_file[MAXPATHLEN];
int archive_members_fd, archive_data_fd; int archive_members_fd, archive_data_fd;
uint32_t archive_members_count; uint32_t archive_members_count;
void *archive_ctx, *archive_sort_buf; void *archive_ctx, *archive_sort_buf;
pthread_t archive_thread; pthread_t archive_thread;
int uncompfd, compfd;
char archive_temp_file[MAXPATHLEN]; char archive_temp_file[MAXPATHLEN];
int archive_temp_fd; int archive_temp_fd;
uint64_t archive_temp_size; uint64_t archive_temp_size, archive_size;
uchar_t *temp_mmap_buf; uchar_t *temp_mmap_buf;
uint64_t temp_mmap_pos, temp_file_pos; uint64_t temp_mmap_pos, temp_file_pos;
uint64_t temp_mmap_len; uint64_t temp_mmap_len;
struct fn_list *fn;
sem_t read_sem, write_sem;
uchar_t *arc_buf;
uint64_t arc_buf_size, arc_buf_pos;
int arc_closed, arc_writing;
unsigned int chunk_num; unsigned int chunk_num;
uint64_t largest_chunk, smallest_chunk, avg_chunk; uint64_t largest_chunk, smallest_chunk, avg_chunk;
uint64_t chunksize, archive_size; uint64_t chunksize;
const char *algo, *filename; const char *algo, *filename;
char *to_filename; char *to_filename;
struct fn_list *fn;
char *exec_name; char *exec_name;
int do_compress, level; int do_compress, level;
int do_uncompress; int do_uncompress;

View file

@ -42,6 +42,7 @@
#include <rabin_dedup.h> #include <rabin_dedup.h>
#include <cpuid.h> #include <cpuid.h>
#include <xxhash.h> #include <xxhash.h>
#include <pc_archive.h>
#include <sys/sysinfo.h> #include <sys/sysinfo.h>
@ -211,19 +212,27 @@ Read(int fd, void *buf, uint64_t count)
* after the previous rabin boundary. * after the previous rabin boundary.
*/ */
int64_t int64_t
Read_Adjusted(int fd, uchar_t *buf, uint64_t count, int64_t *rabin_count, void *ctx) Read_Adjusted(int fd, uchar_t *buf, uint64_t count, int64_t *rabin_count, void *ctx, void *pctx)
{ {
uchar_t *buf2; uchar_t *buf2;
int64_t rcount; int64_t rcount;
dedupe_context_t *rctx = (dedupe_context_t *)ctx; dedupe_context_t *rctx = (dedupe_context_t *)ctx;
if (!ctx) return (Read(fd, buf, count)); if (!ctx) {
if (pctx)
return (archiver_read(pctx, buf, count));
else
return (Read(fd, buf, count));
}
buf2 = buf; buf2 = buf;
if (*rabin_count) { if (*rabin_count) {
buf2 = buf + *rabin_count; buf2 = buf + *rabin_count;
count -= *rabin_count; count -= *rabin_count;
} }
rcount = Read(fd, buf2, count); if (pctx)
rcount = archiver_read(pctx, buf2, count);
else
rcount = Read(fd, buf2, count);
if (rcount > 0) { if (rcount > 0) {
rcount += *rabin_count; rcount += *rabin_count;
if (rcount == count) { if (rcount == count) {

View file

@ -238,7 +238,7 @@ extern int parse_numeric(int64_t *val, const char *str);
extern char *bytes_to_size(uint64_t bytes); extern char *bytes_to_size(uint64_t bytes);
extern int64_t Read(int fd, void *buf, uint64_t count); extern int64_t Read(int fd, void *buf, uint64_t count);
extern int64_t Read_Adjusted(int fd, uchar_t *buf, uint64_t count, extern int64_t Read_Adjusted(int fd, uchar_t *buf, uint64_t count,
int64_t *rabin_count, void *ctx); int64_t *rabin_count, void *ctx, void *pctx);
extern int64_t Write(int fd, const void *buf, uint64_t count); extern int64_t Write(int fd, const void *buf, uint64_t count);
extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs, extern void set_threadcounts(algo_props_t *props, int *nthreads, int nprocs,
algo_threads_type_t typ); algo_threads_type_t typ);