From 63bef473cc0110ac90cace6430cd329c7c7c3df9 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Sun, 4 May 2014 21:11:31 +0530 Subject: [PATCH] Working MAC OS X port. Compatibility layer for semaphore handling. --- archive/pc_archive.c | 38 +++++++++---------- pcompress.c | 88 +++++++++++++++++++++++++------------------- pcompress.h | 10 ++--- rabin/rabin_dedup.c | 20 +++++----- rabin/rabin_dedup.h | 4 +- utils/utils.c | 79 +++++++++++++++++++++++++++++++++++++++ utils/utils.h | 16 ++++++++ 7 files changed, 182 insertions(+), 73 deletions(-) diff --git a/archive/pc_archive.c b/archive/pc_archive.c index fbc5635..8ea1803 100644 --- a/archive/pc_archive.c +++ b/archive/pc_archive.c @@ -115,8 +115,8 @@ arc_open_callback(struct archive *arc, void *ctx) { pc_ctx_t *pctx = (pc_ctx_t *)ctx; - sem_init(&(pctx->read_sem), 0, 0); - sem_init(&(pctx->write_sem), 0, 0); + Sem_Init(&(pctx->read_sem), 0, 0); + Sem_Init(&(pctx->write_sem), 0, 0); pctx->arc_buf = NULL; pctx->arc_buf_pos = 0; pctx->arc_buf_size = 0; @@ -130,7 +130,7 @@ creat_close_callback(struct archive *arc, void *ctx) pctx->arc_closed = 1; if (pctx->arc_buf) { - sem_post(&(pctx->read_sem)); + Sem_Post(&(pctx->read_sem)); } else { pctx->arc_buf_pos = 0; } @@ -150,7 +150,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len } if (!pctx->arc_writing) { - sem_wait(&(pctx->write_sem)); + Sem_Wait(&(pctx->write_sem)); } if (pctx->arc_buf == NULL || pctx->arc_buf_size == 0) { @@ -182,8 +182,8 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len pctx->ctype = pctx->btype; } else { pctx->arc_writing = 0; - sem_post(&(pctx->read_sem)); - sem_wait(&(pctx->write_sem)); + Sem_Post(&(pctx->read_sem)); + Sem_Wait(&(pctx->write_sem)); tbuf = pctx->arc_buf + pctx->arc_buf_pos; pctx->arc_writing = 1; if (remaining > 0) @@ -199,8 +199,8 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len pctx->arc_buf_pos += nlen; buff += nlen; pctx->arc_writing = 0; - sem_post(&(pctx->read_sem)); - sem_wait(&(pctx->write_sem)); + Sem_Post(&(pctx->read_sem)); + Sem_Wait(&(pctx->write_sem)); pctx->arc_writing = 1; } else { memcpy(tbuf, buff, remaining); @@ -208,7 +208,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len remaining = 0; if (pctx->arc_buf_pos == pctx->arc_buf_size) { pctx->arc_writing = 0; - sem_post(&(pctx->read_sem)); + Sem_Post(&(pctx->read_sem)); } break; } @@ -234,8 +234,8 @@ archiver_read(void *ctx, void *buf, uint64_t count) pctx->arc_buf_size = count; pctx->arc_buf_pos = 0; pctx->btype = TYPE_UNKNOWN; - sem_post(&(pctx->write_sem)); - sem_wait(&(pctx->read_sem)); + Sem_Post(&(pctx->write_sem)); + Sem_Wait(&(pctx->read_sem)); pctx->arc_buf = NULL; return (pctx->arc_buf_pos); } @@ -248,8 +248,8 @@ archiver_close(void *ctx) pctx->arc_closed = 1; pctx->arc_buf = NULL; pctx->arc_buf_size = 0; - sem_post(&(pctx->write_sem)); - sem_post(&(pctx->read_sem)); + Sem_Post(&(pctx->write_sem)); + Sem_Post(&(pctx->read_sem)); return (0); } @@ -260,7 +260,7 @@ extract_close_callback(struct archive *arc, void *ctx) pctx->arc_closed = 1; if (pctx->arc_buf) { - sem_post(&(pctx->write_sem)); + Sem_Post(&(pctx->write_sem)); } else { pctx->arc_buf_size = 0; } @@ -280,10 +280,10 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf) } if (!pctx->arc_writing) { - sem_wait(&(pctx->read_sem)); + Sem_Wait(&(pctx->read_sem)); } else { - sem_post(&(pctx->write_sem)); - sem_wait(&(pctx->read_sem)); + Sem_Post(&(pctx->write_sem)); + Sem_Wait(&(pctx->read_sem)); } if (pctx->arc_buf == NULL || pctx->arc_buf_size == 0) { @@ -315,8 +315,8 @@ archiver_write(void *ctx, void *buf, uint64_t count) pctx->arc_buf = buf; pctx->arc_buf_size = count; - sem_post(&(pctx->read_sem)); - sem_wait(&(pctx->write_sem)); + Sem_Post(&(pctx->read_sem)); + Sem_Wait(&(pctx->write_sem)); pctx->arc_buf = NULL; return (pctx->arc_buf_size); } diff --git a/pcompress.c b/pcompress.c index 72272fa..8553608 100644 --- a/pcompress.c +++ b/pcompress.c @@ -403,10 +403,10 @@ perform_decompress(void *dat) pctx = tdat->pctx; redo: - sem_wait(&tdat->start_sem); + Sem_Wait(&tdat->start_sem); if (unlikely(tdat->cancel)) { tdat->len_cmp = 0; - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->cmp_done_sem); return (0); } @@ -461,7 +461,7 @@ redo: pctx->main_cancel = 1; tdat->len_cmp = 0; pctx->t_errored = 1; - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->cmp_done_sem); return (NULL); } DEBUG_STAT_EN(en = get_wtime_millis()); @@ -480,7 +480,7 @@ redo: */ pctx->main_cancel = 1; tdat->len_cmp = 0; - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->cmp_done_sem); return (NULL); } DEBUG_STAT_EN(en = get_wtime_millis()); @@ -511,7 +511,7 @@ redo: pctx->main_cancel = 1; tdat->len_cmp = 0; pctx->t_errored = 1; - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->cmp_done_sem); return (NULL); } @@ -638,7 +638,7 @@ redo: * to wait for the previous thread's dedupe recovery to complete. */ if (pctx->enable_rabin_global) { - sem_wait(tdat->rctx->index_sem); + Sem_Wait(tdat->rctx->index_sem); } } @@ -658,7 +658,7 @@ redo: } cont: - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->cmp_done_sem); goto redo; } @@ -1269,10 +1269,10 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) tdat->level = level; tdat->data = NULL; tdat->props = &props; - sem_init(&(tdat->start_sem), 0, 0); - sem_init(&(tdat->cmp_done_sem), 0, 0); - sem_init(&(tdat->write_done_sem), 0, 1); - sem_init(&(tdat->index_sem), 0, 0); + Sem_Init(&(tdat->start_sem), 0, 0); + Sem_Init(&(tdat->cmp_done_sem), 0, 0); + Sem_Init(&(tdat->write_done_sem), 0, 1); + Sem_Init(&(tdat->index_sem), 0, 0); if (pctx->_init_func) { if (pctx->_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, @@ -1329,7 +1329,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) } } // When doing global dedupe first thread does not wait to start dedupe recovery. - sem_post(&(dary[0]->index_sem)); + Sem_Post(&(dary[0]->index_sem)); if (pctx->encrypt_type) { /* Erase encryption key bytes stored as a plain array. No longer reqd. */ @@ -1362,7 +1362,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) for (p = 0; p < nprocs; p++) { np = p; tdat = dary[p]; - sem_wait(&tdat->write_done_sem); + Sem_Wait(&tdat->write_done_sem); if (pctx->main_cancel) break; tdat->id = pctx->chunk_num; if (tdat->rctx) tdat->rctx->id = tdat->id; @@ -1443,7 +1443,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) UNCOMP_BAIL; } } - sem_post(&tdat->start_sem); + Sem_Post(&tdat->start_sem); ++(pctx->chunk_num); } } @@ -1453,7 +1453,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) for (p = 0; p < nprocs; p++) { if (p == np) continue; tdat = dary[p]; - sem_wait(&tdat->write_done_sem); + Sem_Wait(&tdat->write_done_sem); } } uncomp_done: @@ -1463,8 +1463,8 @@ uncomp_done: tdat = dary[i]; tdat->cancel = 1; tdat->len_cmp = 0; - sem_post(&tdat->start_sem); - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->start_sem); + Sem_Post(&tdat->cmp_done_sem); pthread_join(tdat->thr, NULL); } pthread_join(writer_thr, NULL); @@ -1490,6 +1490,11 @@ uncomp_done: if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) { destroy_dedupe_context(dary[i]->rctx); } + Sem_Destroy(&(dary[i]->start_sem)); + Sem_Destroy(&(dary[i]->cmp_done_sem)); + Sem_Destroy(&(dary[i]->write_done_sem)); + Sem_Destroy(&(dary[i]->index_sem)); + slab_free(NULL, dary[i]); } slab_free(NULL, dary); @@ -1504,6 +1509,8 @@ uncomp_done: close(pctx->archive_temp_fd); unlink(pctx->archive_temp_file); } + Sem_Destroy(&(pctx->read_sem)); + Sem_Destroy(&(pctx->write_sem)); } if (!pctx->hide_cmp_stats) show_compression_stats(pctx); @@ -1522,10 +1529,10 @@ perform_compress(void *dat) { pctx = tdat->pctx; redo: - sem_wait(&tdat->start_sem); + Sem_Wait(&tdat->start_sem); if (unlikely(tdat->cancel)) { tdat->len_cmp = 0; - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->cmp_done_sem); return (0); } @@ -1699,7 +1706,7 @@ plain_index: pctx->main_cancel = 1; tdat->len_cmp = 0; pctx->t_errored = 1; - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->cmp_done_sem); return (0); } DEBUG_STAT_EN(en = get_wtime_millis()); @@ -1781,7 +1788,7 @@ plain_index: U32_P(mac_ptr) = htonl(crc); } - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->cmp_done_sem); goto redo; } @@ -1797,7 +1804,7 @@ writer_thread(void *dat) { repeat: for (p = 0; p < w->nprocs; p++) { tdat = w->dary[p]; - sem_wait(&tdat->cmp_done_sem); + Sem_Wait(&tdat->cmp_done_sem); if (tdat->len_cmp == 0) { goto do_cancel; } @@ -1824,16 +1831,16 @@ repeat: do_cancel: pctx->main_cancel = 1; tdat->cancel = 1; - sem_post(&tdat->start_sem); + Sem_Post(&tdat->start_sem); if (tdat->rctx && pctx->enable_rabin_global) - sem_post(tdat->rctx->index_sem_next); - sem_post(&tdat->write_done_sem); + Sem_Post(tdat->rctx->index_sem_next); + Sem_Post(&tdat->write_done_sem); return (0); } if (tdat->decompressing && tdat->rctx && pctx->enable_rabin_global) { - sem_post(tdat->rctx->index_sem_next); + Sem_Post(tdat->rctx->index_sem_next); } - sem_post(&tdat->write_done_sem); + Sem_Post(&tdat->write_done_sem); } goto repeat; } @@ -2198,10 +2205,10 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev tdat->data = NULL; tdat->rctx = NULL; tdat->props = &props; - sem_init(&(tdat->start_sem), 0, 0); - sem_init(&(tdat->cmp_done_sem), 0, 0); - sem_init(&(tdat->write_done_sem), 0, 1); - sem_init(&(tdat->index_sem), 0, 0); + Sem_Init(&(tdat->start_sem), 0, 0); + Sem_Init(&(tdat->cmp_done_sem), 0, 0); + Sem_Init(&(tdat->write_done_sem), 0, 1); + Sem_Init(&(tdat->index_sem), 0, 0); if (pctx->_init_func) { if (pctx->_init_func(&(tdat->data), &(tdat->level), props.nthreads, chunksize, @@ -2249,7 +2256,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev tdat->rctx->index_sem_next = &(dary[(i + 1) % nprocs]->index_sem); } // When doing global dedupe first thread does not wait to access the index. - sem_post(&(dary[0]->index_sem)); + Sem_Post(&(dary[0]->index_sem)); } w.dary = dary; @@ -2396,7 +2403,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev tdat = dary[p]; if (pctx->main_cancel) break; /* Wait for previous chunk compression to complete. */ - sem_wait(&tdat->write_done_sem); + Sem_Wait(&tdat->write_done_sem); if (pctx->main_cancel) break; if (rbytes == 0) { /* EOF */ @@ -2479,7 +2486,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev } /* Signal the compression thread to start */ - sem_post(&tdat->start_sem); + Sem_Post(&tdat->start_sem); ++(pctx->chunk_num); if (single_chunk) { @@ -2512,7 +2519,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev for (p = 0; p < nprocs; p++) { if (p == np) continue; tdat = dary[p]; - sem_wait(&tdat->write_done_sem); + Sem_Wait(&tdat->write_done_sem); } } else { err = 1; @@ -2535,8 +2542,8 @@ comp_done: tdat = dary[i]; tdat->cancel = 1; tdat->len_cmp = 0; - sem_post(&tdat->start_sem); - sem_post(&tdat->cmp_done_sem); + Sem_Post(&tdat->start_sem); + Sem_Post(&tdat->cmp_done_sem); pthread_join(tdat->thr, NULL); if (pctx->encrypt_type) hmac_cleanup(&tdat->chunk_hmac); @@ -2601,6 +2608,11 @@ comp_done: } if (pctx->_deinit_func) pctx->_deinit_func(&(dary[i]->data)); + Sem_Destroy(&(dary[i]->start_sem)); + Sem_Destroy(&(dary[i]->cmp_done_sem)); + Sem_Destroy(&(dary[i]->write_done_sem)); + Sem_Destroy(&(dary[i]->index_sem)); + slab_free(NULL, dary[i]); } slab_free(NULL, dary); @@ -2622,6 +2634,8 @@ comp_done: fn = fn->next; slab_free(NULL, fn1); } + Sem_Destroy(&(pctx->read_sem)); + Sem_Destroy(&(pctx->write_sem)); } if (!pctx->hide_cmp_stats) show_compression_stats(pctx); pctx->_stats_func(!pctx->hide_cmp_stats); diff --git a/pcompress.h b/pcompress.h index 27e5688..fcac6fc 100644 --- a/pcompress.h +++ b/pcompress.h @@ -233,7 +233,7 @@ typedef struct pc_ctx { uint64_t temp_mmap_pos, temp_file_pos; uint64_t temp_mmap_len; struct fn_list *fn; - sem_t read_sem, write_sem; + Sem_t read_sem, write_sem; uchar_t *arc_buf; uint64_t arc_buf_size, arc_buf_pos; int arc_closed, arc_writing; @@ -276,10 +276,10 @@ struct cmp_data { compress_func_ptr compress; compress_func_ptr decompress; int cancel; - sem_t start_sem; - sem_t cmp_done_sem; - sem_t write_done_sem; - sem_t index_sem; + Sem_t start_sem; + Sem_t cmp_done_sem; + Sem_t write_done_sem; + Sem_t index_sem; void *data; pthread_t thr; mac_ctx_t chunk_hmac; diff --git a/rabin/rabin_dedup.c b/rabin/rabin_dedup.c index fe8377b..04f95b5 100755 --- a/rabin/rabin_dedup.c +++ b/rabin/rabin_dedup.c @@ -506,8 +506,8 @@ dedupe_compress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size, uint64_t of * in order to maintain proper sequencing and avoid deadlocks. */ if (ctx->arc) { - sem_wait(ctx->index_sem); - sem_post(ctx->index_sem_next); + Sem_Wait(ctx->index_sem); + Sem_Post(ctx->index_sem_next); } return (0); } @@ -765,8 +765,8 @@ process_blocks: DEBUG_STAT_EN(fprintf(stderr, "Original size: %" PRId64 ", blknum: %u\n", *size, blknum)); DEBUG_STAT_EN(fprintf(stderr, "Number of maxlen blocks: %u\n", max_count)); if (blknum <=2 && ctx->arc) { - sem_wait(ctx->index_sem); - sem_post(ctx->index_sem_next); + Sem_Wait(ctx->index_sem); + Sem_Post(ctx->index_sem_next); } if (blknum > 2) { uint64_t pos, matchlen, pos1 = 0; @@ -828,7 +828,7 @@ process_blocks: */ length = 0; DEBUG_STAT_EN(w1 = get_wtime_millis()); - sem_wait(ctx->index_sem); + Sem_Wait(ctx->index_sem); DEBUG_STAT_EN(w2 = get_wtime_millis()); for (i=0; iindex_sem_next); + Sem_Post(ctx->index_sem_next); /* * Write final pending block length value (if any). @@ -1002,7 +1002,7 @@ process_blocks: */ if (i == 0) { DEBUG_STAT_EN(w1 = get_wtime_millis()); - sem_wait(ctx->index_sem); + Sem_Wait(ctx->index_sem); DEBUG_STAT_EN(w2 = get_wtime_millis()); } @@ -1010,7 +1010,7 @@ process_blocks: len = (blks-i) * sizeof (global_blockentry_t); if (db_segcache_write(cfg, ctx->id, (uchar_t *)&(ctx->g_blocks[i]), len, blks-i, ctx->file_offset) == -1) { - sem_post(ctx->index_sem_next); + Sem_Post(ctx->index_sem_next); ctx->valid = 0; return (0); } @@ -1067,7 +1067,7 @@ process_blocks: /* * Signal the next thread in sequence to access the index. */ - sem_post(ctx->index_sem_next); + Sem_Post(ctx->index_sem_next); /* * Now go through all the matching segments for all the current segments @@ -1583,7 +1583,7 @@ dedupe_decompress(dedupe_context_t *ctx, uchar_t *buf, uint64_t *size) blknum -= 2; src1 = buf + RABIN_HDR_SIZE + dedupe_index_sz; - sem_wait(ctx->index_sem); + Sem_Wait(ctx->index_sem); for (blk=0; blkname, "%u", semctr_next()); + sem->sem1 = sem_open(sem->name, O_CREAT|O_EXCL, S_IRUSR|S_IWUSR, value); + if (sem->sem1 == SEM_FAILED) + return (-1); + if (!pshared) + sem_unlink(sem->name); + return (0); +} + +int +Sem_Destroy(Sem_t *sem) +{ + if (sem_close(sem->sem1) == -1) + return (-1); + sem_unlink(sem->name); + return (0); +} + +int +Sem_Post(Sem_t *sem) +{ + return (sem_post(sem->sem1)); +} + +int +Sem_Wait(Sem_t *sem) +{ + return (sem_wait(sem->sem1)); +} + +#else + +int +Sem_Init(Sem_t *sem, int pshared, int value) +{ + return(sem_init(&sem->sem, pshared, value)); +} + +int +Sem_Destroy(Sem_t *sem) +{ + return (sem_destroy(&sem->sem)); +} + +int +Sem_Post(Sem_t *sem) +{ + return (sem_post(&sem->sem)); +} + +int +Sem_Wait(Sem_t *sem) +{ + return (sem_wait(&sem->sem)); +} +#endif + diff --git a/utils/utils.h b/utils/utils.h index 80156f8..f9e6135 100644 --- a/utils/utils.h +++ b/utils/utils.h @@ -40,6 +40,8 @@ #include #include #include +#include +#include #include #if defined(sun) || defined(__sun) #include @@ -407,6 +409,20 @@ int is_incompressible(int type); int clock_gettime(int clk_id, struct timespec *ts); #endif +/* + * Routines to handle compatibility. + */ +typedef struct _compat_sem { + char name[15]; + sem_t sem, *sem1; +} Sem_t; + +int Sem_Init(Sem_t *sem, int pshared, int value); +int Sem_Destroy(Sem_t *sem); +int Sem_Post(Sem_t *sem); +int Sem_Wait(Sem_t *sem); + + /* * Roundup v to the nearest power of 2. From Bit Twiddling Hacks: * http://graphics.stanford.edu/~seander/bithacks.html