More changes for archiving.

Allow multiple filenames on command line when archiving.
Remove unneded small block writes with libarchive.
This commit is contained in:
Moinak Ghosh 2013-10-27 20:36:48 +05:30
parent 46b11def08
commit 8e4b774c8c
5 changed files with 148 additions and 44 deletions

View file

@ -56,7 +56,6 @@ AE_IFIFO Named pipe (fifo)
#define ARC_ENTRY_OVRHEAD 500 #define ARC_ENTRY_OVRHEAD 500
#define ARC_SCRATCH_BUFF_SIZE (64 *1024) #define ARC_SCRATCH_BUFF_SIZE (64 *1024)
#define DATA_BLOCK_SIZE (8 * 1024)
#define MMAP_SIZE (1024 * 1024) #define MMAP_SIZE (1024 * 1024)
static struct arc_list_state { static struct arc_list_state {
@ -111,6 +110,7 @@ setup_archive(pc_ctx_t *pctx, struct stat *sbuf)
int err, fd, pipefd[2]; int err, fd, pipefd[2];
uchar_t *pbuf; uchar_t *pbuf;
struct archive *arc; struct archive *arc;
struct fn_list *fn;
tmpfile = pctx->archive_members_file; tmpfile = pctx->archive_members_file;
tmp = get_temp_dir(); tmp = get_temp_dir();
@ -135,13 +135,37 @@ setup_archive(pc_ctx_t *pctx, struct stat *sbuf)
* nftw requires using global state variable. So we lock to be mt-safe. * nftw requires using global state variable. So we lock to be mt-safe.
* This means only one directory tree scan can happen at a time. * This means only one directory tree scan can happen at a time.
*/ */
log_msg(LOG_INFO, 0, "Scanning files.");
sbuf->st_size = 0;
pctx->archive_size = 0;
pthread_mutex_lock(&nftw_mutex); pthread_mutex_lock(&nftw_mutex);
fn = pctx->fn;
a_state.pbuf = pbuf; a_state.pbuf = pbuf;
a_state.bufsiz = pctx->chunksize; a_state.bufsiz = pctx->chunksize;
a_state.bufpos = 0; a_state.bufpos = 0;
a_state.arc_size = 0; a_state.arc_size = 0;
a_state.fd = fd; a_state.fd = fd;
err = nftw(pctx->filename, add_pathname, 1024, FTW_PHYS); // 'pctx->filename' has dir name here
while (fn) {
struct stat sb;
if (lstat(fn->filename, &sb) == -1) {
log_msg(LOG_ERR, 1, "Ignoring %s.", fn->filename);
fn = fn->next;
continue;
}
if (S_ISDIR(sb.st_mode)) {
err = nftw(fn->filename, add_pathname, 1024, FTW_PHYS);
} else {
int tflag;
if (S_ISLNK(sb.st_mode))
tflag = FTW_SL;
else
tflag = FTW_F;
add_pathname(fn->filename, &sb, tflag, NULL);
}
if (a_state.bufpos > 0) { if (a_state.bufpos > 0) {
ssize_t wrtn = Write(a_state.fd, a_state.pbuf, a_state.bufpos); ssize_t wrtn = Write(a_state.fd, a_state.pbuf, a_state.bufpos);
if (wrtn < a_state.bufpos) { if (wrtn < a_state.bufpos) {
@ -151,8 +175,10 @@ setup_archive(pc_ctx_t *pctx, struct stat *sbuf)
} }
a_state.bufpos = 0; a_state.bufpos = 0;
} }
pctx->archive_size = a_state.arc_size; pctx->archive_size += a_state.arc_size;
sbuf->st_size = a_state.arc_size; sbuf->st_size += a_state.arc_size;
fn = fn->next;
}
pthread_mutex_unlock(&nftw_mutex); pthread_mutex_unlock(&nftw_mutex);
lseek(fd, 0, SEEK_SET); lseek(fd, 0, SEEK_SET);
free(pbuf); free(pbuf);
@ -209,7 +235,7 @@ copy_file_data(pc_ctx_t *pctx, struct archive *arc,
while (bytes_to_write > 0) { while (bytes_to_write > 0) {
uchar_t *src; uchar_t *src;
size_t wlen, w; size_t wlen;
ssize_t wrtn; ssize_t wrtn;
if (bytes_to_write < MMAP_SIZE) if (bytes_to_write < MMAP_SIZE)
@ -222,23 +248,16 @@ copy_file_data(pc_ctx_t *pctx, struct archive *arc,
wlen = len; wlen = len;
/* /*
* Write data in blocks. * Write the entire mmap-ed buffer. Since we are writing to the compressor
* stage pipe there is no need for blocking.
*/ */
while (wlen > 0) { wrtn = archive_write_data(arc, src, wlen);
if (wlen < DATA_BLOCK_SIZE) if (wrtn < wlen) {
w = wlen;
else
w = DATA_BLOCK_SIZE;
wrtn = archive_write_data(arc, src, w);
if (wrtn < w) {
/* Write failed; this is bad */ /* Write failed; this is bad */
log_msg(LOG_ERR, 0, "Data write error: %s", archive_error_string(arc)); log_msg(LOG_ERR, 0, "Data write error: %s", archive_error_string(arc));
rv = -1; rv = -1;
break;
} }
bytes_to_write -= wrtn; bytes_to_write -= wrtn;
wlen -= wrtn;
}
if (rv == -1) break; if (rv == -1) break;
munmap(mapbuf, len); munmap(mapbuf, len);
} }
@ -343,6 +362,9 @@ archive_thread_func(void *dat) {
if (archive_entry_filetype(entry) != AE_IFREG) { if (archive_entry_filetype(entry) != AE_IFREG) {
archive_entry_set_size(entry, 0); archive_entry_set_size(entry, 0);
} }
if (pctx->verbose)
log_msg(LOG_INFO, 0, "%10d %s", archive_entry_size(entry), name);
archive_entry_linkify(resolver, &entry, &spare_entry); archive_entry_linkify(resolver, &entry, &spare_entry);
ent = entry; ent = entry;
while (ent != NULL) { while (ent != NULL) {

View file

@ -1071,8 +1071,10 @@ start_decompress(pc_ctx_t *pctx, const char *filename, const char *to_filename)
set_threadcounts(&props, &(pctx->nthreads), nprocs, DECOMPRESS_THREADS); set_threadcounts(&props, &(pctx->nthreads), nprocs, DECOMPRESS_THREADS);
if (props.is_single_chunk) if (props.is_single_chunk)
pctx->nthreads = 1; pctx->nthreads = 1;
log_msg(LOG_INFO, 0, "Scaling to %d thread", pctx->nthreads * props.nthreads); if (pctx->nthreads * props.nthreads > 1)
if (pctx->nthreads * props.nthreads > 1) log_msg(LOG_INFO, 0, "s"); log_msg(LOG_INFO, 0, "Scaling to %d threads", pctx->nthreads * props.nthreads);
else
log_msg(LOG_INFO, 0, "Scaling to 1 thread");
nprocs = pctx->nthreads; nprocs = pctx->nthreads;
slab_cache_add(compressed_chunksize); slab_cache_add(compressed_chunksize);
slab_cache_add(chunksize); slab_cache_add(chunksize);
@ -1832,8 +1834,19 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
* the end. The target file name is same as original file with the '.pz' * the end. The target file name is same as original file with the '.pz'
* extension appended unless '-' was specified to output to stdout. * extension appended unless '-' was specified to output to stdout.
*/ */
if (filename) {
strcpy(tmpfile1, filename); strcpy(tmpfile1, filename);
strcpy(tmpfile1, dirname(tmpfile1)); strcpy(tmpfile1, dirname(tmpfile1));
} else {
char *tmp1;
if (!(pctx->archive_mode)) {
log_msg(LOG_ERR, 0, "Inconsistent NULL Filename when Not archiving.");
COMP_BAIL;
}
tmp1 = get_temp_dir();
strcpy(tmpfile1, tmp1);
free(tmp1);
}
tmp = getenv("PCOMPRESS_CACHE_DIR"); tmp = getenv("PCOMPRESS_CACHE_DIR");
if (tmp == NULL || !chk_dir(tmp)) { if (tmp == NULL || !chk_dir(tmp)) {
@ -1953,8 +1966,10 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
flags |= pctx->encrypt_type; flags |= pctx->encrypt_type;
set_threadcounts(&props, &(pctx->nthreads), nprocs, COMPRESS_THREADS); set_threadcounts(&props, &(pctx->nthreads), nprocs, COMPRESS_THREADS);
log_msg(LOG_INFO, 0, "Scaling to %d thread", pctx->nthreads * props.nthreads); if (pctx->nthreads * props.nthreads > 1)
if (pctx->nthreads * props.nthreads > 1) log_msg(LOG_INFO, 0, "s"); log_msg(LOG_INFO, 0, "Scaling to %d threads", pctx->nthreads * props.nthreads);
else
log_msg(LOG_INFO, 0, "Scaling to 1 thread");
nprocs = pctx->nthreads; nprocs = pctx->nthreads;
dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *)); dary = (struct cmp_data **)slab_calloc(NULL, nprocs, sizeof (struct cmp_data *));
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan))
@ -2383,7 +2398,17 @@ comp_done:
if (compfd != -1) close(compfd); if (compfd != -1) close(compfd);
} }
if (pctx->archive_mode) pthread_join(pctx->archive_thread, NULL); if (pctx->archive_mode) {
struct fn_list *fn, *fn1;
pthread_join(pctx->archive_thread, NULL);
fn = pctx->fn;
while (fn) {
fn1 = fn;
fn = fn->next;
slab_free(NULL, fn1);
}
}
if (!pctx->hide_cmp_stats) show_compression_stats(pctx); if (!pctx->hide_cmp_stats) show_compression_stats(pctx);
pctx->_stats_func(!pctx->hide_cmp_stats); pctx->_stats_func(!pctx->hide_cmp_stats);
@ -2578,7 +2603,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
strcpy(pctx->exec_name, pos); strcpy(pctx->exec_name, pos);
pthread_mutex_lock(&opt_parse); pthread_mutex_lock(&opt_parse);
while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:rLPS:B:Fk:")) != -1) { while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:rLPS:B:Fk:av")) != -1) {
int ovr; int ovr;
int64_t chunksize; int64_t chunksize;
@ -2711,6 +2736,14 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
} }
break; break;
case 'a':
pctx->archive_mode = 1;
break;
case 'v':
pctx->verbose = 1;
break;
case '?': case '?':
default: default:
return (2); return (2);
@ -2790,17 +2823,60 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
log_msg(LOG_ERR, 0, "Expected at least one filename."); log_msg(LOG_ERR, 0, "Expected at least one filename.");
return (1); return (1);
} else if (num_rem == 1 || num_rem == 2) { } else if (num_rem == 1 || num_rem == 2 || (num_rem > 0 && pctx->archive_mode)) {
if (pctx->do_compress) { if (pctx->do_compress) {
char apath[MAXPATHLEN]; char apath[MAXPATHLEN];
/*
* If archiving, resolve the list of pathnames on the cmdline.
*/
if (pctx->archive_mode) {
struct fn_list **fn;
int valid_paths;
slab_cache_add(sizeof (struct fn_list));
pctx->filename = NULL;
fn = &(pctx->fn);
valid_paths = 0;
while (num_rem > 0) {
char *filename;
if ((filename = realpath(argv[my_optind], NULL)) != NULL) {
*fn = slab_alloc(NULL, sizeof (struct fn_list));
(*fn)->filename = filename;
(*fn)->next = NULL;
fn = &((*fn)->next);
valid_paths++;
} else {
log_msg(LOG_WARN, 1, "%s", argv[my_optind]);
}
num_rem--;
my_optind++;
/*
* If multiple pathnames are provided, last one must be the archive name.
* This check here handles that case. If only one pathname is provided
* then archive name can be derived and num_rem here will be 0 so it
* exits normally in the loop check above.
*/
if (num_rem == 1) break;
}
if (valid_paths == 0) {
log_msg(LOG_ERR, 0, "No usable paths found to archive.");
return (1);
}
if (valid_paths == 1)
pctx->filename = pctx->fn->filename;
} else {
if ((pctx->filename = realpath(argv[my_optind], NULL)) == NULL) { if ((pctx->filename = realpath(argv[my_optind], NULL)) == NULL) {
log_msg(LOG_ERR, 1, "%s", argv[my_optind]); log_msg(LOG_ERR, 1, "%s", argv[my_optind]);
return (1); return (1);
} }
num_rem--;
if (num_rem == 2) {
my_optind++; my_optind++;
}
if (num_rem > 0) {
if (*(argv[my_optind]) == '-') { if (*(argv[my_optind]) == '-') {
pctx->to_filename = "-"; pctx->to_filename = "-";
pctx->pipe_out = 1; pctx->pipe_out = 1;
@ -2882,11 +2958,10 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[])
if (pctx->do_compress) { if (pctx->do_compress) {
struct stat sbuf; struct stat sbuf;
if (stat(pctx->filename, &sbuf) == -1) { if (pctx->filename && stat(pctx->filename, &sbuf) == -1) {
log_msg(LOG_ERR, 1, "Cannot stat: %s", pctx->filename); log_msg(LOG_ERR, 1, "Cannot stat: %s", pctx->filename);
return (1); return (1);
} }
if (S_ISDIR(sbuf.st_mode)) pctx->archive_mode = 1;
} }
pctx->inited = 1; pctx->inited = 1;

View file

@ -198,6 +198,7 @@ typedef struct pc_ctx {
int lzp_preprocess; int lzp_preprocess;
int encrypt_type; int encrypt_type;
int archive_mode; int archive_mode;
int verbose;
char archive_members_file[MAXPATHLEN]; char archive_members_file[MAXPATHLEN];
int archive_members_fd, archive_data_fd; int archive_members_fd, archive_data_fd;
void *archive_ctx; void *archive_ctx;
@ -207,6 +208,7 @@ typedef struct pc_ctx {
uint64_t largest_chunk, smallest_chunk, avg_chunk; uint64_t largest_chunk, smallest_chunk, avg_chunk;
uint64_t chunksize, archive_size; uint64_t chunksize, archive_size;
const char *algo, *filename, *to_filename; const char *algo, *filename, *to_filename;
struct fn_list *fn;
char *exec_name; char *exec_name;
int do_compress, level; int do_compress, level;
int do_uncompress; int do_uncompress;

View file

@ -50,7 +50,7 @@
processor_info_t proc_info; processor_info_t proc_info;
pthread_mutex_t f_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t f_mutex = PTHREAD_MUTEX_INITIALIZER;
static int cur_log_level = 1; static int cur_log_level = 2;
static log_dest_t ldest = {LOG_OUTPUT, LOG_INFO, NULL}; static log_dest_t ldest = {LOG_OUTPUT, LOG_INFO, NULL};
static char *f_name_list[512]; static char *f_name_list[512];
static int f_count = 512, f_inited = 0; static int f_count = 512, f_inited = 0;

View file

@ -222,6 +222,11 @@ typedef struct{
int64_t sharedram; int64_t sharedram;
} my_sysinfo; } my_sysinfo;
struct fn_list {
char *filename;
struct fn_list *next;
};
#ifndef _IN_UTILS_ #ifndef _IN_UTILS_
extern processor_info_t proc_info; extern processor_info_t proc_info;
#endif #endif