Working MAC OS X port.

Compatibility layer for semaphore handling.
This commit is contained in:
Moinak Ghosh 2014-05-04 21:11:31 +05:30
parent 7830473c65
commit 63bef473cc
7 changed files with 182 additions and 73 deletions

View file

@ -115,8 +115,8 @@ arc_open_callback(struct archive *arc, void *ctx)
{
pc_ctx_t *pctx = (pc_ctx_t *)ctx;
sem_init(&(pctx->read_sem), 0, 0);
sem_init(&(pctx->write_sem), 0, 0);
Sem_Init(&(pctx->read_sem), 0, 0);
Sem_Init(&(pctx->write_sem), 0, 0);
pctx->arc_buf = NULL;
pctx->arc_buf_pos = 0;
pctx->arc_buf_size = 0;
@ -130,7 +130,7 @@ creat_close_callback(struct archive *arc, void *ctx)
pctx->arc_closed = 1;
if (pctx->arc_buf) {
sem_post(&(pctx->read_sem));
Sem_Post(&(pctx->read_sem));
} else {
pctx->arc_buf_pos = 0;
}
@ -150,7 +150,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len
}
if (!pctx->arc_writing) {
sem_wait(&(pctx->write_sem));
Sem_Wait(&(pctx->write_sem));
}
if (pctx->arc_buf == NULL || pctx->arc_buf_size == 0) {
@ -182,8 +182,8 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len
pctx->ctype = pctx->btype;
} else {
pctx->arc_writing = 0;
sem_post(&(pctx->read_sem));
sem_wait(&(pctx->write_sem));
Sem_Post(&(pctx->read_sem));
Sem_Wait(&(pctx->write_sem));
tbuf = pctx->arc_buf + pctx->arc_buf_pos;
pctx->arc_writing = 1;
if (remaining > 0)
@ -199,8 +199,8 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len
pctx->arc_buf_pos += nlen;
buff += nlen;
pctx->arc_writing = 0;
sem_post(&(pctx->read_sem));
sem_wait(&(pctx->write_sem));
Sem_Post(&(pctx->read_sem));
Sem_Wait(&(pctx->write_sem));
pctx->arc_writing = 1;
} else {
memcpy(tbuf, buff, remaining);
@ -208,7 +208,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len
remaining = 0;
if (pctx->arc_buf_pos == pctx->arc_buf_size) {
pctx->arc_writing = 0;
sem_post(&(pctx->read_sem));
Sem_Post(&(pctx->read_sem));
}
break;
}
@ -234,8 +234,8 @@ archiver_read(void *ctx, void *buf, uint64_t count)
pctx->arc_buf_size = count;
pctx->arc_buf_pos = 0;
pctx->btype = TYPE_UNKNOWN;
sem_post(&(pctx->write_sem));
sem_wait(&(pctx->read_sem));
Sem_Post(&(pctx->write_sem));
Sem_Wait(&(pctx->read_sem));
pctx->arc_buf = NULL;
return (pctx->arc_buf_pos);
}
@ -248,8 +248,8 @@ archiver_close(void *ctx)
pctx->arc_closed = 1;
pctx->arc_buf = NULL;
pctx->arc_buf_size = 0;
sem_post(&(pctx->write_sem));
sem_post(&(pctx->read_sem));
Sem_Post(&(pctx->write_sem));
Sem_Post(&(pctx->read_sem));
return (0);
}
@ -260,7 +260,7 @@ extract_close_callback(struct archive *arc, void *ctx)
pctx->arc_closed = 1;
if (pctx->arc_buf) {
sem_post(&(pctx->write_sem));
Sem_Post(&(pctx->write_sem));
} else {
pctx->arc_buf_size = 0;
}
@ -280,10 +280,10 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf)
}
if (!pctx->arc_writing) {
sem_wait(&(pctx->read_sem));
Sem_Wait(&(pctx->read_sem));
} else {
sem_post(&(pctx->write_sem));
sem_wait(&(pctx->read_sem));
Sem_Post(&(pctx->write_sem));
Sem_Wait(&(pctx->read_sem));
}
if (pctx->arc_buf == NULL || pctx->arc_buf_size == 0) {
@ -315,8 +315,8 @@ archiver_write(void *ctx, void *buf, uint64_t count)
pctx->arc_buf = buf;
pctx->arc_buf_size = count;
sem_post(&(pctx->read_sem));
sem_wait(&(pctx->write_sem));
Sem_Post(&(pctx->read_sem));
Sem_Wait(&(pctx->write_sem));
pctx->arc_buf = NULL;
return (pctx->arc_buf_size);
}

View file

@ -403,10 +403,10 @@ perform_decompress(void *dat)
pctx = tdat->pctx;
redo:
sem_wait(&tdat->start_sem);
Sem_Wait(&tdat->start_sem);
if (unlikely(tdat->cancel)) {
tdat->len_cmp = 0;
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->cmp_done_sem);
return (0);
}
@ -461,7 +461,7 @@ redo:
pctx->main_cancel = 1;
tdat->len_cmp = 0;
pctx->t_errored = 1;
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->cmp_done_sem);
return (NULL);
}
DEBUG_STAT_EN(en = get_wtime_millis());
@ -480,7 +480,7 @@ redo:
*/
pctx->main_cancel = 1;
tdat->len_cmp = 0;
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->cmp_done_sem);
return (NULL);
}
DEBUG_STAT_EN(en = get_wtime_millis());
@ -511,7 +511,7 @@ redo:
pctx->main_cancel = 1;
tdat->len_cmp = 0;
pctx->t_errored = 1;
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->cmp_done_sem);
return (NULL);
}
@ -638,7 +638,7 @@ redo:
* to wait for the previous thread's dedupe recovery to complete.
*/
if (pctx->enable_rabin_global) {
sem_wait(tdat->rctx->index_sem);
Sem_Wait(tdat->rctx->index_sem);
}
}
@ -658,7 +658,7 @@ redo:
}
cont:
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->cmp_done_sem);
goto redo;
}
@ -1269,10 +1269,10 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
tdat->level = level;
tdat->data = NULL;
tdat->props = &props;
sem_init(&(tdat->start_sem), 0, 0);
sem_init(&(tdat->cmp_done_sem), 0, 0);
sem_init(&(tdat->write_done_sem), 0, 1);
sem_init(&(tdat->index_sem), 0, 0);
Sem_Init(&(tdat->start_sem), 0, 0);
Sem_Init(&(tdat->cmp_done_sem), 0, 0);
Sem_Init(&(tdat->write_done_sem), 0, 1);
Sem_Init(&(tdat->index_sem), 0, 0);
if (pctx->_init_func) {
if (pctx->_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize,
@ -1329,7 +1329,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
}
}
// When doing global dedupe first thread does not wait to start dedupe recovery.
sem_post(&(dary[0]->index_sem));
Sem_Post(&(dary[0]->index_sem));
if (pctx->encrypt_type) {
/* Erase encryption key bytes stored as a plain array. No longer reqd. */
@ -1362,7 +1362,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
for (p = 0; p < nprocs; p++) {
np = p;
tdat = dary[p];
sem_wait(&tdat->write_done_sem);
Sem_Wait(&tdat->write_done_sem);
if (pctx->main_cancel) break;
tdat->id = pctx->chunk_num;
if (tdat->rctx) tdat->rctx->id = tdat->id;
@ -1443,7 +1443,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
UNCOMP_BAIL;
}
}
sem_post(&tdat->start_sem);
Sem_Post(&tdat->start_sem);
++(pctx->chunk_num);
}
}
@ -1453,7 +1453,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename)
for (p = 0; p < nprocs; p++) {
if (p == np) continue;
tdat = dary[p];
sem_wait(&tdat->write_done_sem);
Sem_Wait(&tdat->write_done_sem);
}
}
uncomp_done:
@ -1463,8 +1463,8 @@ uncomp_done:
tdat = dary[i];
tdat->cancel = 1;
tdat->len_cmp = 0;
sem_post(&tdat->start_sem);
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->start_sem);
Sem_Post(&tdat->cmp_done_sem);
pthread_join(tdat->thr, NULL);
}
pthread_join(writer_thr, NULL);
@ -1490,6 +1490,11 @@ uncomp_done:
if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) {
destroy_dedupe_context(dary[i]->rctx);
}
Sem_Destroy(&(dary[i]->start_sem));
Sem_Destroy(&(dary[i]->cmp_done_sem));
Sem_Destroy(&(dary[i]->write_done_sem));
Sem_Destroy(&(dary[i]->index_sem));
slab_free(NULL, dary[i]);
}
slab_free(NULL, dary);
@ -1504,6 +1509,8 @@ uncomp_done:
close(pctx->archive_temp_fd);
unlink(pctx->archive_temp_file);
}
Sem_Destroy(&(pctx->read_sem));
Sem_Destroy(&(pctx->write_sem));
}
if (!pctx->hide_cmp_stats) show_compression_stats(pctx);
@ -1522,10 +1529,10 @@ perform_compress(void *dat) {
pctx = tdat->pctx;
redo:
sem_wait(&tdat->start_sem);
Sem_Wait(&tdat->start_sem);
if (unlikely(tdat->cancel)) {
tdat->len_cmp = 0;
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->cmp_done_sem);
return (0);
}
@ -1699,7 +1706,7 @@ plain_index:
pctx->main_cancel = 1;
tdat->len_cmp = 0;
pctx->t_errored = 1;
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->cmp_done_sem);
return (0);
}
DEBUG_STAT_EN(en = get_wtime_millis());
@ -1781,7 +1788,7 @@ plain_index:
U32_P(mac_ptr) = htonl(crc);
}
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->cmp_done_sem);
goto redo;
}
@ -1797,7 +1804,7 @@ writer_thread(void *dat) {
repeat:
for (p = 0; p < w->nprocs; p++) {
tdat = w->dary[p];
sem_wait(&tdat->cmp_done_sem);
Sem_Wait(&tdat->cmp_done_sem);
if (tdat->len_cmp == 0) {
goto do_cancel;
}
@ -1824,16 +1831,16 @@ repeat:
do_cancel:
pctx->main_cancel = 1;
tdat->cancel = 1;
sem_post(&tdat->start_sem);
Sem_Post(&tdat->start_sem);
if (tdat->rctx && pctx->enable_rabin_global)
sem_post(tdat->rctx->index_sem_next);
sem_post(&tdat->write_done_sem);
Sem_Post(tdat->rctx->index_sem_next);
Sem_Post(&tdat->write_done_sem);
return (0);
}
if (tdat->decompressing && tdat->rctx && pctx->enable_rabin_global) {
sem_post(tdat->rctx->index_sem_next);
Sem_Post(tdat->rctx->index_sem_next);
}
sem_post(&tdat->write_done_sem);
Sem_Post(&tdat->write_done_sem);
}
goto repeat;
}
@ -2198,10 +2205,10 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
tdat->data = NULL;
tdat->rctx = NULL;
tdat->props = &props;
sem_init(&(tdat->start_sem), 0, 0);
sem_init(&(tdat->cmp_done_sem), 0, 0);
sem_init(&(tdat->write_done_sem), 0, 1);
sem_init(&(tdat->index_sem), 0, 0);
Sem_Init(&(tdat->start_sem), 0, 0);
Sem_Init(&(tdat->cmp_done_sem), 0, 0);
Sem_Init(&(tdat->write_done_sem), 0, 1);
Sem_Init(&(tdat->index_sem), 0, 0);
if (pctx->_init_func) {
if (pctx->_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize,
@ -2249,7 +2256,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
tdat->rctx->index_sem_next = &(dary[(i + 1) % nprocs]->index_sem);
}
// When doing global dedupe first thread does not wait to access the index.
sem_post(&(dary[0]->index_sem));
Sem_Post(&(dary[0]->index_sem));
}
w.dary = dary;
@ -2396,7 +2403,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
tdat = dary[p];
if (pctx->main_cancel) break;
/* Wait for previous chunk compression to complete. */
sem_wait(&tdat->write_done_sem);
Sem_Wait(&tdat->write_done_sem);
if (pctx->main_cancel) break;
if (rbytes == 0) { /* EOF */
@ -2479,7 +2486,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
}
/* Signal the compression thread to start */
sem_post(&tdat->start_sem);
Sem_Post(&tdat->start_sem);
++(pctx->chunk_num);
if (single_chunk) {
@ -2512,7 +2519,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev
for (p = 0; p < nprocs; p++) {
if (p == np) continue;
tdat = dary[p];
sem_wait(&tdat->write_done_sem);
Sem_Wait(&tdat->write_done_sem);
}
} else {
err = 1;
@ -2535,8 +2542,8 @@ comp_done:
tdat = dary[i];
tdat->cancel = 1;
tdat->len_cmp = 0;
sem_post(&tdat->start_sem);
sem_post(&tdat->cmp_done_sem);
Sem_Post(&tdat->start_sem);
Sem_Post(&tdat->cmp_done_sem);
pthread_join(tdat->thr, NULL);
if (pctx->encrypt_type)
hmac_cleanup(&tdat->chunk_hmac);
@ -2601,6 +2608,11 @@ comp_done:
}
if (pctx->_deinit_func)
pctx->_deinit_func(&(dary[i]->data));
Sem_Destroy(&(dary[i]->start_sem));
Sem_Destroy(&(dary[i]->cmp_done_sem));
Sem_Destroy(&(dary[i]->write_done_sem));
Sem_Destroy(&(dary[i]->index_sem));
slab_free(NULL, dary[i]);
}
slab_free(NULL, dary);
@ -2622,6 +2634,8 @@ comp_done:
fn = fn->next;
slab_free(NULL, fn1);
}
Sem_Destroy(&(pctx->read_sem));
Sem_Destroy(&(pctx->write_sem));
}
if (!pctx->hide_cmp_stats) show_compression_stats(pctx);
pctx->_stats_func(!pctx->hide_cmp_stats);

View file

@ -233,7 +233,7 @@ typedef struct pc_ctx {
uint64_t temp_mmap_pos, temp_file_pos;
uint64_t temp_mmap_len;
struct fn_list *fn;
sem_t read_sem, write_sem;
Sem_t read_sem, write_sem;
uchar_t *arc_buf;
uint64_t arc_buf_size, arc_buf_pos;
int arc_closed, arc_writing;
@ -276,10 +276,10 @@ struct cmp_data {
compress_func_ptr compress;
compress_func_ptr decompress;
int cancel;
sem_t start_sem;
sem_t cmp_done_sem;
sem_t write_done_sem;
sem_t index_sem;
Sem_t start_sem;
Sem_t cmp_done_sem;
Sem_t write_done_sem;
Sem_t index_sem;
void *data;
pthread_t thr;
mac_ctx_t chunk_hmac;

View file

@ -506,8 +506,8 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of
* in order to maintain proper sequencing and avoid deadlocks.
*/
if (ctx->arc) {
sem_wait(ctx->index_sem);
sem_post(ctx->index_sem_next);
Sem_Wait(ctx->index_sem);
Sem_Post(ctx->index_sem_next);
}
return (0);
}
@ -765,8 +765,8 @@ process_blocks:
DEBUG_STAT_EN(fprintf(stderr, "Original size: %" PRId64 ", blknum: %u\n", *size, blknum));
DEBUG_STAT_EN(fprintf(stderr, "Number of maxlen blocks: %u\n", max_count));
if (blknum <=2 && ctx->arc) {
sem_wait(ctx->index_sem);
sem_post(ctx->index_sem_next);
Sem_Wait(ctx->index_sem);
Sem_Post(ctx->index_sem_next);
}
if (blknum > 2) {
uint64_t pos, matchlen, pos1 = 0;
@ -828,7 +828,7 @@ process_blocks:
*/
length = 0;
DEBUG_STAT_EN(w1 = get_wtime_millis());
sem_wait(ctx->index_sem);
Sem_Wait(ctx->index_sem);
DEBUG_STAT_EN(w2 = get_wtime_millis());
for (i=0; i<blknum; i++) {
hash_entry_t *he;
@ -878,7 +878,7 @@ process_blocks:
/*
* Signal the next thread in sequence to access the index.
*/
sem_post(ctx->index_sem_next);
Sem_Post(ctx->index_sem_next);
/*
* Write final pending block length value (if any).
@ -1002,7 +1002,7 @@ process_blocks:
*/
if (i == 0) {
DEBUG_STAT_EN(w1 = get_wtime_millis());
sem_wait(ctx->index_sem);
Sem_Wait(ctx->index_sem);
DEBUG_STAT_EN(w2 = get_wtime_millis());
}
@ -1010,7 +1010,7 @@ process_blocks:
len = (blks-i) * sizeof (global_blockentry_t);
if (db_segcache_write(cfg, ctx->id, (uchar_t *)&(ctx->g_blocks[i]),
len, blks-i, ctx->file_offset) == -1) {
sem_post(ctx->index_sem_next);
Sem_Post(ctx->index_sem_next);
ctx->valid = 0;
return (0);
}
@ -1067,7 +1067,7 @@ process_blocks:
/*
* Signal the next thread in sequence to access the index.
*/
sem_post(ctx->index_sem_next);
Sem_Post(ctx->index_sem_next);
/*
* Now go through all the matching segments for all the current segments
@ -1583,7 +1583,7 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size)
blknum -= 2;
src1 = buf + RABIN_HDR_SIZE + dedupe_index_sz;
sem_wait(ctx->index_sem);
Sem_Wait(ctx->index_sem);
for (blk=0; blk<blknum;) {
len = LE32(U32_P(g_dedupe_idx));
g_dedupe_idx += RABIN_ENTRY_SIZE;

View file

@ -182,8 +182,8 @@ typedef struct {
int level, delta_flag, dedupe_flag, deltac_min_distance;
uint64_t file_offset; // For global dedupe
archive_config_t *arc;
sem_t *index_sem;
sem_t *index_sem_next;
Sem_t *index_sem;
Sem_t *index_sem_next;
uchar_t *similarity_cksums;
uint32_t pagesize;
int out_fd;

View file

@ -636,3 +636,82 @@ is_incompressible(int type)
ic = (st == TYPE_JPEG) | (st == TYPE_PACKJPG) | (st == TYPE_AUDIO_COMPRESSED);
return (ic);
}
/************************************************************
* Portability wrappers for synchronization primitives.
***********************************************************/
#ifdef __APPLE__
pthread_mutex_t semctr_mutex = PTHREAD_MUTEX_INITIALIZER;
unsigned int __sem__ctr = 0;
static unsigned int
semctr_next()
{
unsigned int val;
pthread_mutex_lock(&semctr_mutex);
val = __sem__ctr;
__sem__ctr++;
pthread_mutex_unlock(&semctr_mutex);
return val;
}
int
Sem_Init(Sem_t *sem, int pshared, int value)
{
sprintf(sem->name, "%u", semctr_next());
sem->sem1 = sem_open(sem->name, O_CREAT|O_EXCL, S_IRUSR|S_IWUSR, value);
if (sem->sem1 == SEM_FAILED)
return (-1);
if (!pshared)
sem_unlink(sem->name);
return (0);
}
int
Sem_Destroy(Sem_t *sem)
{
if (sem_close(sem->sem1) == -1)
return (-1);
sem_unlink(sem->name);
return (0);
}
int
Sem_Post(Sem_t *sem)
{
return (sem_post(sem->sem1));
}
int
Sem_Wait(Sem_t *sem)
{
return (sem_wait(sem->sem1));
}
#else
int
Sem_Init(Sem_t *sem, int pshared, int value)
{
return(sem_init(&sem->sem, pshared, value));
}
int
Sem_Destroy(Sem_t *sem)
{
return (sem_destroy(&sem->sem));
}
int
Sem_Post(Sem_t *sem)
{
return (sem_post(&sem->sem));
}
int
Sem_Wait(Sem_t *sem)
{
return (sem_wait(&sem->sem));
}
#endif

View file

@ -40,6 +40,8 @@
#include <assert.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <semaphore.h>
#include <cpuid.h>
#if defined(sun) || defined(__sun)
#include <sys/byteorder.h>
@ -407,6 +409,20 @@ int is_incompressible(int type);
int clock_gettime(int clk_id, struct timespec *ts);
#endif
/*
* Routines to handle compatibility.
*/
typedef struct _compat_sem {
char name[15];
sem_t sem, *sem1;
} Sem_t;
int Sem_Init(Sem_t *sem, int pshared, int value);
int Sem_Destroy(Sem_t *sem);
int Sem_Post(Sem_t *sem);
int Sem_Wait(Sem_t *sem);
/*
* Roundup v to the nearest power of 2. From Bit Twiddling Hacks:
* http://graphics.stanford.edu/~seander/bithacks.html