From ef3c30b190cb323f76f03e6399e2a63be4ce5563 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Sun, 20 Mar 2005 05:17:25 +0000 Subject: [PATCH] Benchmark updates. --- benchmarks/Makefile.am | 2 +- benchmarks/berkeleyDB/bdbHashThreaded.c | 68 +++++++------ benchmarks/berkeleyDB/bdbHashWriteRequests.c | 49 ++++++---- benchmarks/berkeleyDB/genericBerkeleyDBCode.c | 40 ++++---- benchmarks/berkeleyDB/genericBerkeleyDBCode.h | 5 +- benchmarks/linearHashNTA.c | 4 +- benchmarks/linearHashNTAMultiReader.c | 3 +- benchmarks/linearHashNTAThreaded.c | 12 +-- benchmarks/linearHashNTAWriteRequests.c | 18 ++-- src/lladd/operations/linearHashNTA.c | 95 ++++++++++++++++++- test/lladd/check_multiplexer.c | 2 +- 11 files changed, 199 insertions(+), 99 deletions(-) diff --git a/benchmarks/Makefile.am b/benchmarks/Makefile.am index 2b1a208..7f141c2 100644 --- a/benchmarks/Makefile.am +++ b/benchmarks/Makefile.am @@ -3,7 +3,7 @@ LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \ $(top_builddir)/src/libdfa/librw.a bin_PROGRAMS=naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \ arrayListSet logicalMultiReaders linearHashNTA linkedListNTA pageOrientedListNTA \ - linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests + linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests AM_CFLAGS= -g -Wall -pedantic -std=gnu99 SUBDIRS=berkeleyDB diff --git a/benchmarks/berkeleyDB/bdbHashThreaded.c b/benchmarks/berkeleyDB/bdbHashThreaded.c index 3735d92..72fb543 100644 --- a/benchmarks/berkeleyDB/bdbHashThreaded.c +++ b/benchmarks/berkeleyDB/bdbHashThreaded.c @@ -20,33 +20,17 @@ #define ENV_DIRECTORY "TXNAPP" +//#define DEBUG_BDB 1 + #include "genericBerkeleyDBCode.c" + int activeThreads = 0; int max_active = 0; pthread_mutex_t mutex; -/* -void add_cat(DB_ENV *, DB *, char *, ...); -void run_xact(DB_ENV *, DB *, int, int); -//void add_color(DB_ENV *, DB *, char *, int); -// void add_fruit(DB_ENV *, DB *, char *, char *); -void *checkpoint_thread(void *); -void log_archlist(DB_ENV *); -void *logfile_thread(void *); -void db_open(DB_ENV *, DB **, char *, int); -void env_dir_create(void); -void env_open(DB_ENV **); -void usage(void); - - - -DB_ENV *dbenv; -DB *db_cats; //, *db_color, *db_fruit; -*/ - -int alwaysCommit; +//int alwaysCommit; int num_xact; int insert_per_xact; void * runThread(void * arg); @@ -54,23 +38,35 @@ int main(int argc, char *argv[]) { extern int optind; - pthread_t ptid; + int ch, ret; - assert(argc == 3 || argc == 4); + // assert(argc == 3 || argc == 4); + assert(argc == 5); + + int alwaysCommit = atoi(argv[3]); // 1; // (argc >= 4); + + int type = atoi(argv[4]) == 1 ? DB_HASH : DB_RECNO; + + printf("type: %d always commit: %d\n", type, alwaysCommit); /* threads have static thread sizes. Ughh. */ - - alwaysCommit = (argc == 4); - pthread_attr_t attr; pthread_attr_init(&attr); - pthread_mutex_init(&mutex, NULL); pthread_attr_setstacksize (&attr, 4 * PTHREAD_STACK_MIN); + + pthread_mutex_init(&mutex, NULL); + pthread_mutex_lock(&mutex); - initDB(&attr); + initDB(&attr, type); + + + + + + int r; int num_threads = atoi(argv[1]); @@ -82,22 +78,22 @@ main(int argc, char *argv[]) num_xact = 1; insert_per_xact = atoi(argv[2]); } + // @todo the test has been crashing for multi-threaded long transactions. + assert(num_threads == 1 || alwaysCommit == 1); + +#ifdef DEBUG_BDB + printf("num_xact = %d\n insert_per_xact=%d\n", num_xact, insert_per_xact); +#endif pthread_t * threads = malloc(num_threads * sizeof(pthread_t)); int i ; for(i = 0; i < num_threads; i++) { - if ((ret = pthread_create(&(threads[i]), &attr, runThread, (void *)i)) != 0){ fprintf(stderr, "txnapp: failed spawning worker thread: %s\n", strerror(ret)); exit (1); } - /* - for(r = 0; r < num_xact; r ++) { - run_xact(dbenv, db_cats, 1+r*insert_per_xact, insert_per_xact); - } - */ } pthread_mutex_unlock(&mutex); @@ -108,6 +104,9 @@ main(int argc, char *argv[]) free(threads); + db->close(db, 0); + dbenv->close(dbenv, 0); + return (0); } @@ -116,15 +115,12 @@ void * runThread(void * arg) { int offset = (int) arg; pthread_mutex_lock(&mutex); - activeThreads++; if(activeThreads > max_active) { max_active = activeThreads; } - pthread_mutex_unlock(&mutex); - int r; for(r = 0; r < num_xact; r ++) { diff --git a/benchmarks/berkeleyDB/bdbHashWriteRequests.c b/benchmarks/berkeleyDB/bdbHashWriteRequests.c index ccb61ac..81728e4 100644 --- a/benchmarks/berkeleyDB/bdbHashWriteRequests.c +++ b/benchmarks/berkeleyDB/bdbHashWriteRequests.c @@ -41,42 +41,52 @@ main(int argc, char *argv[]) int ch, ret; - assert(argc == 3); + assert(argc == 3 || argc == 4); + + int alwaysCommit = (argc == 4); + /* threads have static thread sizes. Ughh. */ pthread_attr_t attr; pthread_attr_init(&attr); - + pthread_attr_setstacksize (&attr, 4 * PTHREAD_STACK_MIN); + pthread_mutex_init(&mutex, NULL); pthread_cond_init(&never, NULL); - - pthread_attr_setstacksize (&attr, 4 * PTHREAD_STACK_MIN); - + pthread_mutex_lock(&mutex); + initDB(&attr, DB_HASH); int l; for(l = 0; l < COUNTER_RESOLUTION; l++) { buckets[l] = 0; } - + int r; int num_threads = atoi(argv[1]); - num_xact = 1; - insert_per_xact = atoi(argv[2]); + if(alwaysCommit) { + num_xact = atoi(argv[2]); + insert_per_xact = 1; + } else { + num_xact = 1; + insert_per_xact = atoi(argv[2]); + } + + + + pthread_t * threads = malloc(num_threads * sizeof(pthread_t)); int i ; for(i = 0; i < num_threads; i++) { - if ((ret = pthread_create(&(threads[i]), &attr, runThread, (void *)i)) != 0){ fprintf(stderr, "txnapp: failed spawning worker thread: %s\n", strerror(ret)); exit (1); } - } pthread_mutex_unlock(&mutex); @@ -93,6 +103,10 @@ main(int argc, char *argv[]) printf("%3.4f\t%d\n", exp(((double)k)/log_multiplier)/1000000000.0, buckets[k]); } + + db->close(db, 0); + dbenv->close(dbenv, 0); + return (0); } @@ -107,11 +121,10 @@ void * runThread(void * arg) { } pthread_mutex_unlock(&mutex); - int r; - double sum_x_squared = 0; - double sum = 0; + // double sum_x_squared = 0; + // double sum = 0; double log_multiplier = COUNTER_RESOLUTION / log(MAX_SECONDS * 1000000000.0); @@ -122,9 +135,7 @@ void * runThread(void * arg) { timeout.tv_sec = timeout_tv.tv_sec; timeout.tv_nsec = 1000 * timeout_tv.tv_usec; - timeout.tv_nsec = (int)(1000000000.0 * ((double)random() / (double)RAND_MAX)); - timeout.tv_sec++; // struct timeval start; @@ -141,21 +152,19 @@ void * runThread(void * arg) { run_xact(dbenv, db_cats, offset*(1+r)*insert_per_xact, insert_per_xact); - gettimeofday(&endtime_tv, NULL); + gettimeofday(&endtime_tv, NULL); endtime.tv_sec = endtime_tv.tv_sec; endtime.tv_nsec = 1000 * endtime_tv.tv_usec; double microSecondsPassed = 1000000000.0 * (double)(endtime.tv_sec - timeout.tv_sec); - microSecondsPassed = (microSecondsPassed + (double)endtime.tv_nsec) - (double)timeout.tv_nsec; assert(microSecondsPassed > 0.0); - - sum += microSecondsPassed; - sum_x_squared += (microSecondsPassed * microSecondsPassed) ; + // sum += microSecondsPassed; + // sum_x_squared += (microSecondsPassed * microSecondsPassed) ; int bucket = (log_multiplier * log(microSecondsPassed)); diff --git a/benchmarks/berkeleyDB/genericBerkeleyDBCode.c b/benchmarks/berkeleyDB/genericBerkeleyDBCode.c index 8e9cb8a..28df1d3 100644 --- a/benchmarks/berkeleyDB/genericBerkeleyDBCode.c +++ b/benchmarks/berkeleyDB/genericBerkeleyDBCode.c @@ -50,6 +50,12 @@ env_open(DB_ENV **dbenvp) exit (1); }*/ + dbenv->set_tx_max(dbenv, 32000); + int max; + dbenv->get_tx_max(dbenv, &max); + printf("Max xact count: %d\n", max); + + /* * Open a transactional environment: * create if it doesn't exist @@ -58,7 +64,7 @@ env_open(DB_ENV **dbenvp) * read/write owner only */ if ((ret = dbenv->open(dbenv, ENV_DIRECTORY, - DB_CREATE |/* DB_INIT_LOCK |*/ DB_INIT_LOG | + DB_CREATE |/* DB_INIT_LOCK |*/ DB_INIT_LOG | /*| DB_PRIVATE*/ DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD, S_IRUSR | S_IWUSR)) != 0) { dbenv->err(dbenv, ret, "dbenv->open: %s", ENV_DIRECTORY); @@ -151,11 +157,11 @@ log_archlist(DB_ENV *dbenv) free (begin); } } +DB *db; void -db_open(DB_ENV *dbenv, DB **dbp, char *name, int dups) +db_open(DB_ENV *dbenv, DB **dbp, char *name, int type) { - DB *db; int ret; /* Create the database handle. */ @@ -176,7 +182,7 @@ db_open(DB_ENV *dbenv, DB **dbp, char *name, int dups) * free-threaded handle * read/write owner only */ - if ((ret = db->open(db, NULL, name, NULL, /*DB_BTREE*//* DB_RECNO */DB_HASH, /*DB_DIRECT_LOG | DB_DIRECT_DB | */ + if ((ret = db->open(db, NULL, name, NULL, type, /*DB_DIRECT_LOG | DB_DIRECT_DB | */ DB_AUTO_COMMIT | DB_DIRTY_READ | DB_TXN_SYNC | DB_CREATE | DB_THREAD, S_IRUSR | S_IWUSR)) != 0) { (void)db->close(db, 0); dbenv->err(dbenv, ret, "db->open: %s", name); @@ -243,7 +249,7 @@ retry: /* Begin the transaction. */ // assert(count == 1); for(q = offset; q < offset + count; q++) { - keyPtr = q; + keyPtr = q+1; valPtr = q; /* switch (ret = db->del(db, tid, &key, 0)) { case 0: @@ -262,9 +268,8 @@ retry: /* Begin the transaction. */ abort(); exit (1); } */ - - // switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) { - //switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) { + + // pthread_mutex_lock(&hack); switch (ret = db->put(db, tid, &key, &data, 0)) { case 0: break; @@ -283,23 +288,22 @@ retry: /* Begin the transaction. */ } goto retry; default: - abort(); // Error invalidates benchmark! /* Error: run recovery. */ dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q); + abort(); // Error invalidates benchmark! exit (1); } - } - // DBT getkey, getdata; @todo, check insertions! + // pthread_mutex_unlock(&hack); + + } - /* Success: commit the change. */ - /* if ((ret = dbc->c_close(dbc)) != 0) { - dbenv->err(dbenv, ret, "dbc->c_close"); - exit (1); - }*/ if ((ret = tid->commit(tid, 0)) != 0) { dbenv->err(dbenv, ret, "DB_TXN->commit"); exit (1); } +#ifdef DEBUG_BDB + printf("Called commit\n"); +#endif } void @@ -310,7 +314,7 @@ usage() } void -initDB(/*DB_ENV ** dbenv, */pthread_attr_t * attr) +initDB(/*DB_ENV ** dbenv, */pthread_attr_t * attr, int type) { pthread_t ptid; @@ -337,7 +341,7 @@ initDB(/*DB_ENV ** dbenv, */pthread_attr_t * attr) exit (1); } */ - db_open(dbenv, &db_cats, "cats", 1); + db_open(dbenv, &db_cats, "cats", type); diff --git a/benchmarks/berkeleyDB/genericBerkeleyDBCode.h b/benchmarks/berkeleyDB/genericBerkeleyDBCode.h index 8aa9856..5ecd7ce 100644 --- a/benchmarks/berkeleyDB/genericBerkeleyDBCode.h +++ b/benchmarks/berkeleyDB/genericBerkeleyDBCode.h @@ -2,7 +2,10 @@ void run_xact(DB_ENV *, DB *, int, int); void *checkpoint_thread(void *); void log_archlist(DB_ENV *); void *logfile_thread(void *); -void db_open(DB_ENV *, DB **, char *, int); +/** + @param type DB_RECNO, DB_HASH or DB_BTREE +*/ +void db_open(DB_ENV *, DB **, char *, int type); void env_dir_create(void); void env_open(DB_ENV **); void usage(void); diff --git a/benchmarks/linearHashNTA.c b/benchmarks/linearHashNTA.c index 4bc18f2..03d7f30 100644 --- a/benchmarks/linearHashNTA.c +++ b/benchmarks/linearHashNTA.c @@ -21,8 +21,8 @@ int main(int argc, char** argv) { Tinit(); int xid = Tbegin(); - recordid hash = ThashCreate(xid, sizeof(int), sizeof(int)); - //recordid hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); + //recordid hash = ThashCreate(xid, sizeof(int), sizeof(int)); + recordid hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); Tcommit(xid); diff --git a/benchmarks/linearHashNTAMultiReader.c b/benchmarks/linearHashNTAMultiReader.c index e282f2b..162abe6 100644 --- a/benchmarks/linearHashNTAMultiReader.c +++ b/benchmarks/linearHashNTAMultiReader.c @@ -86,7 +86,8 @@ int main(int argc, char** argv) { Tinit(); int xid = Tbegin(); - hash = ThashCreate(xid, sizeof(int), sizeof(int)); + // hash = ThashCreate(xid, sizeof(int), sizeof(int)); + hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); int k; diff --git a/benchmarks/linearHashNTAThreaded.c b/benchmarks/linearHashNTAThreaded.c index c7dfddf..273ecec 100644 --- a/benchmarks/linearHashNTAThreaded.c +++ b/benchmarks/linearHashNTAThreaded.c @@ -129,8 +129,8 @@ int main(int argc, char** argv) { Tinit(); int xid = Tbegin(); - hash = ThashCreate(xid, sizeof(int), sizeof(int)); - // hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); + // hash = ThashCreate(xid, sizeof(int), sizeof(int)); + hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); Tcommit(xid); @@ -143,7 +143,7 @@ int main(int argc, char** argv) { pthread_mutex_init(&mutex, NULL); pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN); - pthread_attr_setschedpolicy(&attr, SCHED_FIFO); + // pthread_attr_setschedpolicy(&attr, SCHED_FIFO); pthread_mutex_lock(&mutex); @@ -160,11 +160,5 @@ int main(int argc, char** argv) { pthread_join(workers[k],NULL); } - - /* printf("mean: (max, avg) %f, %f\n", max_mean, avg_mean / (double)thread_count); - - printf("variance: (max, avg) %f, %f\n", max_var, avg_var / (double)thread_count); - */ - Tdeinit(); } diff --git a/benchmarks/linearHashNTAWriteRequests.c b/benchmarks/linearHashNTAWriteRequests.c index 1ff7567..7838f8a 100644 --- a/benchmarks/linearHashNTAWriteRequests.c +++ b/benchmarks/linearHashNTAWriteRequests.c @@ -12,6 +12,8 @@ pthread_cond_t never; pthread_mutex_t mutex; +int alwaysCommit; + #define MAX_SECONDS 100 #define COUNTER_RESOLUTION 240 @@ -97,13 +99,13 @@ static void * go (void * arg_ptr) { assert(timeout.tv_sec <= start.tv_sec); assert(timeout.tv_nsec <= start.tv_nsec || timeout.tv_sec < start.tv_sec); */ - - xid = Tbegin(); - + if(alwaysCommit) { + xid = Tbegin(); + } ThashInsert(xid, hash, (byte*)&j, sizeof(int), (byte*)&j, sizeof(int)); - - Tcommit(xid); // used to be outside of loop! - + if(alwaysCommit) { + Tcommit(xid); + } gettimeofday(&endtime_tv, NULL); @@ -168,10 +170,12 @@ static void * go (void * arg_ptr) { int main(int argc, char** argv) { - assert(argc == 3); + assert(argc == 4); int thread_count = atoi(argv[1]); count = atoi(argv[2]); + alwaysCommit = atoi(argv[3]); + unlink("storefile.txt"); unlink("logfile.txt"); diff --git a/src/lladd/operations/linearHashNTA.c b/src/lladd/operations/linearHashNTA.c index 6096945..0822757 100644 --- a/src/lladd/operations/linearHashNTA.c +++ b/src/lladd/operations/linearHashNTA.c @@ -9,9 +9,9 @@ #include #include #include - - - +#include +#include +#include "../logger/logMemory.h" /** re-entrant implementation of a linear hash hable, using nensted top actions. @@ -536,3 +536,92 @@ int linearHashNTAIterator_value(int xid, void * impl, byte ** value) { return (it->lastValue == NULL) ? 0 : it->lastValueSize; } + + +//--------------------------------- async hash operations happen below here + +typedef struct { + // recordid hash; + int value_len; + int key_len; +} asyncHashInsert_t; + +void ThashInsertAsync(int xid, lladdConsumer_t * cons, recordid hash, byte * value, int value_len, byte * key, int key_len) { + + Tconsumer_push(xid, cons, key, key_len, value, value_len); + +} + +void ThashInsertConsume(int xid, recordid hash, lladdIterator_t * it) { + + while(Titerator_next(xid, it)) { + + byte * key; + byte * value; + + int key_len = Titerator_key(xid, it, &key); + int value_len = Titerator_value(xid, it, &value); + + ThashInsert(xid, hash, key, key_len, value, value_len); + + Titerator_tupleDone(xid, it); + } +} + +typedef struct { + lladdIterator_t * it; + recordid hash; + int xid; +} hashAsyncWorker_arg; + +void * ThashAsyncWorker(void * argp) { + hashAsyncWorker_arg * arg = (hashAsyncWorker_arg*)argp; + + + // lladdIterator_t * it = (lladdIterator_t *) arg; + // recordid hash; + while(Titerator_next(arg->xid, arg->it)) { + lladdFifo_t * fifo; + int fifo_size = Titerator_value(arg->xid, arg->it, (byte**)&fifo); + assert(fifo_size == sizeof(lladdFifo_t)); + + ThashInsertConsume(arg->xid, arg->hash, fifo->iterator); + + Titerator_tupleDone(arg->xid, arg->it); + } + return NULL; +} + +/*lladdMultiplexer_t **/ +lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads, + int mainFifoLen, int numFifos, + int subFifoLen, int dirtyFifoLen, + lladdIterator_t ** dirtyIterator) { + + lladdFifo_t * mainFifo = logMemoryFifo(mainFifoLen, 0); + lladdFifo_t * dirtyFifos = logMemoryFifo(dirtyFifoLen, 0); + lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(numFifos, subFifoLen, NULL, dirtyFifos); + lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, mainFifo->iterator, &multiplexHashLogByKey, fifoPool); + + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN); + + lladdMultiplexer_start(mux, &attr); + + int i = 0; + + + for(i = 0; i < numWorkerThreads; i++) { + pthread_t thread; + pthread_create(&thread, &attr, ThashAsyncWorker, mux->fifoPool->dirtyPoolFifo->iterator); + pthread_detach(thread); + } + + if(dirtyIterator) { + *dirtyIterator = mux->fifoPool->dirtyPoolFifo->iterator; + } + + return mainFifo->consumer; +} diff --git a/test/lladd/check_multiplexer.c b/test/lladd/check_multiplexer.c index 078f3a2..983aa3d 100644 --- a/test/lladd/check_multiplexer.c +++ b/test/lladd/check_multiplexer.c @@ -212,7 +212,7 @@ START_TEST(multiplexTest) { pthread_cond_init(&never, NULL); pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN); - pthread_attr_setschedpolicy(&attr, SCHED_FIFO); + // pthread_attr_setschedpolicy(&attr, SCHED_FIFO); pthread_mutex_lock(&mutex); lladdMultiplexer_start(mux, &attr);