iImplement fast TOC listing for metadata streams.

Fix help text.
Removed redundant allocator code.
Actually free memory on exit.
This commit is contained in:
Moinak Ghosh 2014-11-03 20:20:05 +05:30
parent 3259c7ced0
commit b2ad225fbb
4 changed files with 64 additions and 38 deletions

View file

@ -469,21 +469,19 @@ slab_free_real(void *p, void *address, int do_free)
else else
htable[hindx] = buf->next; htable[hindx] = buf->next;
pthread_mutex_unlock(&hbucket_locks[hindx]); pthread_mutex_unlock(&hbucket_locks[hindx]);
found = 1;
ATOMIC_SUB(hash_entries, 1); ATOMIC_SUB(hash_entries, 1);
if (buf->slab == NULL || do_free) { if (buf->slab == NULL || do_free) {
free(buf->ptr); free(buf->ptr);
free(buf); free(buf);
found = 1;
break;
} else { } else {
pthread_mutex_lock(&(buf->slab->slab_lock)); pthread_mutex_lock(&(buf->slab->slab_lock));
buf->next = buf->slab->avail; buf->next = buf->slab->avail;
buf->slab->avail = buf; buf->slab->avail = buf;
pthread_mutex_unlock(&(buf->slab->slab_lock)); pthread_mutex_unlock(&(buf->slab->slab_lock));
found = 1;
break;
} }
break;
} }
pbuf = buf; pbuf = buf;
buf = buf->next; buf = buf->next;

View file

@ -321,6 +321,14 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf)
return (len); return (len);
} }
/*
* When listing TOC we just return dummy data to be thrown away.
*/
if (pctx->list_mode && pctx->meta_stream) {
*buf = pctx->temp_mmap_buf;
return (pctx->temp_mmap_len);
}
if (!pctx->arc_writing) { if (!pctx->arc_writing) {
Sem_Wait(&(pctx->read_sem)); Sem_Wait(&(pctx->read_sem));
} else { } else {

View file

@ -134,13 +134,14 @@ usage(pc_ctx_t *pctx)
" -p Make Pcompress work in streaming mode. Input is stdin, output is stdout.\n\n" " -p Make Pcompress work in streaming mode. Input is stdin, output is stdout.\n\n"
" <target file>\n" " <target file>\n"
" Pathname of the compressed file to be created or '-' for stdout.\n\n" " Pathname of the compressed file to be created or '-' for stdout.\n\n"
" Decompression and Archive extraction\n" " Decompression, Listing and Archive extraction\n"
" ------------------------------------\n" " ---------------------------------------------\n"
" %s -d <compressed file or '-'> [-m] [-K] [<target file or directory>]\n\n" " %s <-d|-i> [-m] [-K] <compressed file or '-'> [<target file or directory>]\n\n"
" -d Extract archive to target dir or current dir.\n"
" -i Only list contents of the archive, do not extract.\n\n"
" -m Enable restoring *all* permissions, ACLs, Extended Attributes etc.\n" " -m Enable restoring *all* permissions, ACLs, Extended Attributes etc.\n"
" Equivalent to the '-p' option in tar.\n" " Equivalent to the '-p' option in tar.\n"
" -K Do not overwrite newer files.\n" " -K Do not overwrite newer files.\n"
" -i Only list contents of the archive, do not extract.\n\n"
" -m and -K are only meaningful if the compressed file is an archive. For single file\n" " -m and -K are only meaningful if the compressed file is an archive. For single file\n"
" compressed mode these options are ignored.\n\n" " compressed mode these options are ignored.\n\n"
" <compressed file>\n" " <compressed file>\n"
@ -384,7 +385,6 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64
#ifndef _MPLV2_LICENSE_ #ifndef _MPLV2_LICENSE_
int hashsize; int hashsize;
int64_t result; int64_t result;
hashsize = lzp_hash_size(level); hashsize = lzp_hash_size(level);
result = lzp_decompress((const uchar_t *)src, (uchar_t *)dst, srclen, result = lzp_decompress((const uchar_t *)src, (uchar_t *)dst, srclen,
hashsize, LZP_DEFAULT_LZPMINLEN, 0); hashsize, LZP_DEFAULT_LZPMINLEN, 0);
@ -392,7 +392,6 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64
memcpy(src, dst, result); memcpy(src, dst, result);
srclen = result; srclen = result;
*dstlen = result; *dstlen = result;
_dstlen = result;
} else { } else {
log_msg(LOG_ERR, 0, "LZP decompression failed."); log_msg(LOG_ERR, 0, "LZP decompression failed.");
return ((int)result); return ((int)result);
@ -1357,6 +1356,17 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
set_threadcounts(&props, &(pctx->nthreads), nprocs, DECOMPRESS_THREADS); set_threadcounts(&props, &(pctx->nthreads), nprocs, DECOMPRESS_THREADS);
if (props.is_single_chunk) if (props.is_single_chunk)
pctx->nthreads = 1; pctx->nthreads = 1;
/*
* If we are trying to list the archive contents, and the archive has a
* metadata stream, then we do not do any data decompression. Only
* metadata is decompressed.
*/
if (pctx->list_mode && pctx->meta_stream) {
pctx->nthreads = 0;
pctx->temp_mmap_buf = (uchar_t *)slab_alloc(NULL, chunksize);
pctx->temp_mmap_len = chunksize;
}
if (pctx->nthreads * props.nthreads > 1) if (pctx->nthreads * props.nthreads > 1)
log_msg(LOG_INFO, 0, "Scaling to %d threads", pctx->nthreads * props.nthreads); log_msg(LOG_INFO, 0, "Scaling to %d threads", pctx->nthreads * props.nthreads);
else else
@ -1458,6 +1468,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
} }
} }
// When doing global dedupe first thread does not wait to start dedupe recovery. // When doing global dedupe first thread does not wait to start dedupe recovery.
if (nprocs > 0)
Sem_Post(&(dary[0]->index_sem)); Sem_Post(&(dary[0]->index_sem));
if (pctx->encrypt_type) { if (pctx->encrypt_type) {
@ -1465,6 +1476,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
crypto_clean_pkey(&(pctx->crypto_ctx)); crypto_clean_pkey(&(pctx->crypto_ctx));
} }
if (!(pctx->list_mode && pctx->meta_stream)) {
w.dary = dary; w.dary = dary;
w.wfd = uncompfd; w.wfd = uncompfd;
w.nprocs = nprocs; w.nprocs = nprocs;
@ -1475,6 +1487,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
UNCOMP_BAIL; UNCOMP_BAIL;
} }
thread = 2; thread = 2;
}
/* /*
* Now read from the compressed file in variable compressed chunk size. * Now read from the compressed file in variable compressed chunk size.
@ -1485,6 +1498,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
pctx->chunk_num = 0; pctx->chunk_num = 0;
np = 0; np = 0;
bail = 0; bail = 0;
if (nprocs == 0)
bail = 1;
while (!bail) { while (!bail) {
int64_t rb; int64_t rb;
@ -1584,13 +1599,15 @@ redo:
pctx->avg_chunk += tdat->len_cmp; pctx->avg_chunk += tdat->len_cmp;
/* /*
* Now read compressed chunk including the checksum. * Now read compressed chunk including the checksum. This is
* a seek if it is just a listing operation. No data is
* processed in that case.
*/ */
rb = tdat->len_cmp + pctx->cksum_bytes + pctx->mac_bytes + rb = tdat->len_cmp + pctx->cksum_bytes + pctx->mac_bytes +
CHUNK_FLAG_SZ; CHUNK_FLAG_SZ;
tdat->rbytes = Read(compfd, tdat->compressed_chunk, rb); tdat->rbytes = Read(compfd, tdat->compressed_chunk, rb);
} else { } else {
int64_t cpos = lseek(compfd, 0, SEEK_CUR); off_t cpos = lseek(compfd, 0, SEEK_CUR);
/* Two values already read */ /* Two values already read */
rb = tdat->len_cmp_be + METADATA_HDR_SZ - 16; rb = tdat->len_cmp_be + METADATA_HDR_SZ - 16;
@ -1621,7 +1638,6 @@ redo:
} }
} }
if (!pctx->main_cancel) { if (!pctx->main_cancel) {
for (p = 0; p < nprocs; p++) { for (p = 0; p < nprocs; p++) {
if (p == np) continue; if (p == np) continue;
@ -1656,9 +1672,9 @@ uncomp_done:
for (i = 0; i < nprocs; i++) { for (i = 0; i < nprocs; i++) {
if (!dary[i]) continue; if (!dary[i]) continue;
if (dary[i]->uncompressed_chunk) if (dary[i]->uncompressed_chunk)
slab_free(NULL, dary[i]->uncompressed_chunk); slab_release(NULL, dary[i]->uncompressed_chunk);
if (dary[i]->compressed_chunk) if (dary[i]->compressed_chunk)
slab_free(NULL, dary[i]->compressed_chunk); slab_release(NULL, dary[i]->compressed_chunk);
if (pctx->_deinit_func) if (pctx->_deinit_func)
pctx->_deinit_func(&(dary[i]->data)); pctx->_deinit_func(&(dary[i]->data));
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) { if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) {
@ -1669,18 +1685,22 @@ uncomp_done:
Sem_Destroy(&(dary[i]->write_done_sem)); Sem_Destroy(&(dary[i]->write_done_sem));
Sem_Destroy(&(dary[i]->index_sem)); Sem_Destroy(&(dary[i]->index_sem));
slab_free(NULL, dary[i]); slab_release(NULL, dary[i]);
} }
slab_free(NULL, dary); slab_release(NULL, dary);
} }
if (!pctx->pipe_mode) { if (!pctx->pipe_mode) {
if (filename && compfd != -1) close(compfd); if (filename && compfd != -1) close(compfd);
if (uncompfd != -1) close(uncompfd); if (uncompfd != -1) close(uncompfd);
} }
if (pctx->archive_mode) { if (pctx->archive_mode) {
if (pctx->meta_stream)
meta_ctx_done(pctx->meta_ctx);
pthread_join(pctx->archive_thread, NULL); pthread_join(pctx->archive_thread, NULL);
if (pctx->meta_stream) {
meta_ctx_done(pctx->meta_ctx);
if (pctx->list_mode) {
slab_release(NULL, pctx->temp_mmap_buf);
}
}
if (pctx->enable_rabin_global) { if (pctx->enable_rabin_global) {
close(pctx->archive_temp_fd); close(pctx->archive_temp_fd);
unlink(pctx->archive_temp_file); unlink(pctx->archive_temp_file);
@ -2825,9 +2845,9 @@ comp_done:
for (i = 0; i < nprocs; i++) { for (i = 0; i < nprocs; i++) {
if (!dary[i]) continue; if (!dary[i]) continue;
if (dary[i]->uncompressed_chunk != (uchar_t *)1) if (dary[i]->uncompressed_chunk != (uchar_t *)1)
slab_free(NULL, dary[i]->uncompressed_chunk); slab_release(NULL, dary[i]->uncompressed_chunk);
if (dary[i]->cmp_seg != (uchar_t *)1) if (dary[i]->cmp_seg != (uchar_t *)1)
slab_free(NULL, dary[i]->cmp_seg); slab_release(NULL, dary[i]->cmp_seg);
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) { if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) {
destroy_dedupe_context(dary[i]->rctx); destroy_dedupe_context(dary[i]->rctx);
} }
@ -2838,13 +2858,13 @@ comp_done:
Sem_Destroy(&(dary[i]->write_done_sem)); Sem_Destroy(&(dary[i]->write_done_sem));
Sem_Destroy(&(dary[i]->index_sem)); Sem_Destroy(&(dary[i]->index_sem));
slab_free(NULL, dary[i]); slab_release(NULL, dary[i]);
} }
slab_free(NULL, dary); slab_release(NULL, dary);
} }
if (pctx->enable_rabin_split) destroy_dedupe_context(rctx); if (pctx->enable_rabin_split) destroy_dedupe_context(rctx);
if (cread_buf != (uchar_t *)1) if (cread_buf != (uchar_t *)1)
slab_free(NULL, cread_buf); slab_release(NULL, cread_buf);
if (!pctx->pipe_mode) { if (!pctx->pipe_mode) {
if (compfd != -1) close(compfd); if (compfd != -1) close(compfd);
} }
@ -2857,7 +2877,7 @@ comp_done:
while (fn) { while (fn) {
fn1 = fn; fn1 = fn;
fn = fn->next; fn = fn->next;
slab_free(NULL, fn1); slab_release(NULL, fn1);
} }
Sem_Destroy(&(pctx->read_sem)); Sem_Destroy(&(pctx->read_sem));
Sem_Destroy(&(pctx->write_sem)); Sem_Destroy(&(pctx->write_sem));