Benchmark updates.

This commit is contained in:
Sears Russell 2005-03-20 05:17:25 +00:00
parent 26d50f19e3
commit ef3c30b190
11 changed files with 199 additions and 99 deletions

View file

@ -20,33 +20,17 @@
#define ENV_DIRECTORY "TXNAPP" #define ENV_DIRECTORY "TXNAPP"
//#define DEBUG_BDB 1
#include "genericBerkeleyDBCode.c" #include "genericBerkeleyDBCode.c"
int activeThreads = 0; int activeThreads = 0;
int max_active = 0; int max_active = 0;
pthread_mutex_t mutex; pthread_mutex_t mutex;
/*
void add_cat(DB_ENV *, DB *, char *, ...); //int alwaysCommit;
void run_xact(DB_ENV *, DB *, int, int);
//void add_color(DB_ENV *, DB *, char *, int);
// void add_fruit(DB_ENV *, DB *, char *, char *);
void *checkpoint_thread(void *);
void log_archlist(DB_ENV *);
void *logfile_thread(void *);
void db_open(DB_ENV *, DB **, char *, int);
void env_dir_create(void);
void env_open(DB_ENV **);
void usage(void);
DB_ENV *dbenv;
DB *db_cats; //, *db_color, *db_fruit;
*/
int alwaysCommit;
int num_xact; int num_xact;
int insert_per_xact; int insert_per_xact;
void * runThread(void * arg); void * runThread(void * arg);
@ -54,23 +38,35 @@ int
main(int argc, char *argv[]) main(int argc, char *argv[])
{ {
extern int optind; extern int optind;
pthread_t ptid;
int ch, ret; int ch, ret;
assert(argc == 3 || argc == 4); // assert(argc == 3 || argc == 4);
assert(argc == 5);
int alwaysCommit = atoi(argv[3]); // 1; // (argc >= 4);
int type = atoi(argv[4]) == 1 ? DB_HASH : DB_RECNO;
printf("type: %d always commit: %d\n", type, alwaysCommit);
/* threads have static thread sizes. Ughh. */ /* threads have static thread sizes. Ughh. */
alwaysCommit = (argc == 4);
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_mutex_init(&mutex, NULL);
pthread_attr_setstacksize (&attr, 4 * PTHREAD_STACK_MIN); pthread_attr_setstacksize (&attr, 4 * PTHREAD_STACK_MIN);
pthread_mutex_init(&mutex, NULL);
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
initDB(&attr); initDB(&attr, type);
int r; int r;
int num_threads = atoi(argv[1]); int num_threads = atoi(argv[1]);
@ -82,22 +78,22 @@ main(int argc, char *argv[])
num_xact = 1; num_xact = 1;
insert_per_xact = atoi(argv[2]); insert_per_xact = atoi(argv[2]);
} }
// @todo the test has been crashing for multi-threaded long transactions.
assert(num_threads == 1 || alwaysCommit == 1);
#ifdef DEBUG_BDB
printf("num_xact = %d\n insert_per_xact=%d\n", num_xact, insert_per_xact);
#endif
pthread_t * threads = malloc(num_threads * sizeof(pthread_t)); pthread_t * threads = malloc(num_threads * sizeof(pthread_t));
int i ; int i ;
for(i = 0; i < num_threads; i++) { for(i = 0; i < num_threads; i++) {
if ((ret = pthread_create(&(threads[i]), &attr, runThread, (void *)i)) != 0){ if ((ret = pthread_create(&(threads[i]), &attr, runThread, (void *)i)) != 0){
fprintf(stderr, fprintf(stderr,
"txnapp: failed spawning worker thread: %s\n", "txnapp: failed spawning worker thread: %s\n",
strerror(ret)); strerror(ret));
exit (1); exit (1);
} }
/*
for(r = 0; r < num_xact; r ++) {
run_xact(dbenv, db_cats, 1+r*insert_per_xact, insert_per_xact);
}
*/
} }
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
@ -108,6 +104,9 @@ main(int argc, char *argv[])
free(threads); free(threads);
db->close(db, 0);
dbenv->close(dbenv, 0);
return (0); return (0);
} }
@ -116,15 +115,12 @@ void * runThread(void * arg) {
int offset = (int) arg; int offset = (int) arg;
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
activeThreads++; activeThreads++;
if(activeThreads > max_active) { if(activeThreads > max_active) {
max_active = activeThreads; max_active = activeThreads;
} }
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
int r; int r;
for(r = 0; r < num_xact; r ++) { for(r = 0; r < num_xact; r ++) {

View file

@ -41,18 +41,21 @@ main(int argc, char *argv[])
int ch, ret; int ch, ret;
assert(argc == 3); assert(argc == 3 || argc == 4);
int alwaysCommit = (argc == 4);
/* threads have static thread sizes. Ughh. */ /* threads have static thread sizes. Ughh. */
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_attr_setstacksize (&attr, 4 * PTHREAD_STACK_MIN);
pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&never, NULL); pthread_cond_init(&never, NULL);
pthread_attr_setstacksize (&attr, 4 * PTHREAD_STACK_MIN);
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
initDB(&attr, DB_HASH);
int l; int l;
@ -63,20 +66,27 @@ main(int argc, char *argv[])
int r; int r;
int num_threads = atoi(argv[1]); int num_threads = atoi(argv[1]);
if(alwaysCommit) {
num_xact = atoi(argv[2]);
insert_per_xact = 1;
} else {
num_xact = 1; num_xact = 1;
insert_per_xact = atoi(argv[2]); insert_per_xact = atoi(argv[2]);
}
pthread_t * threads = malloc(num_threads * sizeof(pthread_t)); pthread_t * threads = malloc(num_threads * sizeof(pthread_t));
int i ; int i ;
for(i = 0; i < num_threads; i++) { for(i = 0; i < num_threads; i++) {
if ((ret = pthread_create(&(threads[i]), &attr, runThread, (void *)i)) != 0){ if ((ret = pthread_create(&(threads[i]), &attr, runThread, (void *)i)) != 0){
fprintf(stderr, fprintf(stderr,
"txnapp: failed spawning worker thread: %s\n", "txnapp: failed spawning worker thread: %s\n",
strerror(ret)); strerror(ret));
exit (1); exit (1);
} }
} }
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
@ -93,6 +103,10 @@ main(int argc, char *argv[])
printf("%3.4f\t%d\n", exp(((double)k)/log_multiplier)/1000000000.0, buckets[k]); printf("%3.4f\t%d\n", exp(((double)k)/log_multiplier)/1000000000.0, buckets[k]);
} }
db->close(db, 0);
dbenv->close(dbenv, 0);
return (0); return (0);
} }
@ -107,11 +121,10 @@ void * runThread(void * arg) {
} }
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
int r; int r;
double sum_x_squared = 0; // double sum_x_squared = 0;
double sum = 0; // double sum = 0;
double log_multiplier = COUNTER_RESOLUTION / log(MAX_SECONDS * 1000000000.0); double log_multiplier = COUNTER_RESOLUTION / log(MAX_SECONDS * 1000000000.0);
@ -122,9 +135,7 @@ void * runThread(void * arg) {
timeout.tv_sec = timeout_tv.tv_sec; timeout.tv_sec = timeout_tv.tv_sec;
timeout.tv_nsec = 1000 * timeout_tv.tv_usec; timeout.tv_nsec = 1000 * timeout_tv.tv_usec;
timeout.tv_nsec = (int)(1000000000.0 * ((double)random() / (double)RAND_MAX)); timeout.tv_nsec = (int)(1000000000.0 * ((double)random() / (double)RAND_MAX));
timeout.tv_sec++; timeout.tv_sec++;
// struct timeval start; // struct timeval start;
@ -148,14 +159,12 @@ void * runThread(void * arg) {
double microSecondsPassed = 1000000000.0 * (double)(endtime.tv_sec - timeout.tv_sec); double microSecondsPassed = 1000000000.0 * (double)(endtime.tv_sec - timeout.tv_sec);
microSecondsPassed = (microSecondsPassed + (double)endtime.tv_nsec) - (double)timeout.tv_nsec; microSecondsPassed = (microSecondsPassed + (double)endtime.tv_nsec) - (double)timeout.tv_nsec;
assert(microSecondsPassed > 0.0); assert(microSecondsPassed > 0.0);
// sum += microSecondsPassed;
sum += microSecondsPassed; // sum_x_squared += (microSecondsPassed * microSecondsPassed) ;
sum_x_squared += (microSecondsPassed * microSecondsPassed) ;
int bucket = (log_multiplier * log(microSecondsPassed)); int bucket = (log_multiplier * log(microSecondsPassed));

View file

@ -50,6 +50,12 @@ env_open(DB_ENV **dbenvp)
exit (1); exit (1);
}*/ }*/
dbenv->set_tx_max(dbenv, 32000);
int max;
dbenv->get_tx_max(dbenv, &max);
printf("Max xact count: %d\n", max);
/* /*
* Open a transactional environment: * Open a transactional environment:
* create if it doesn't exist * create if it doesn't exist
@ -58,7 +64,7 @@ env_open(DB_ENV **dbenvp)
* read/write owner only * read/write owner only
*/ */
if ((ret = dbenv->open(dbenv, ENV_DIRECTORY, if ((ret = dbenv->open(dbenv, ENV_DIRECTORY,
DB_CREATE |/* DB_INIT_LOCK |*/ DB_INIT_LOG | DB_CREATE |/* DB_INIT_LOCK |*/ DB_INIT_LOG | /*| DB_PRIVATE*/
DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD, DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD,
S_IRUSR | S_IWUSR)) != 0) { S_IRUSR | S_IWUSR)) != 0) {
dbenv->err(dbenv, ret, "dbenv->open: %s", ENV_DIRECTORY); dbenv->err(dbenv, ret, "dbenv->open: %s", ENV_DIRECTORY);
@ -151,11 +157,11 @@ log_archlist(DB_ENV *dbenv)
free (begin); free (begin);
} }
} }
DB *db;
void void
db_open(DB_ENV *dbenv, DB **dbp, char *name, int dups) db_open(DB_ENV *dbenv, DB **dbp, char *name, int type)
{ {
DB *db;
int ret; int ret;
/* Create the database handle. */ /* Create the database handle. */
@ -176,7 +182,7 @@ db_open(DB_ENV *dbenv, DB **dbp, char *name, int dups)
* free-threaded handle * free-threaded handle
* read/write owner only * read/write owner only
*/ */
if ((ret = db->open(db, NULL, name, NULL, /*DB_BTREE*//* DB_RECNO */DB_HASH, /*DB_DIRECT_LOG | DB_DIRECT_DB | */ if ((ret = db->open(db, NULL, name, NULL, type, /*DB_DIRECT_LOG | DB_DIRECT_DB | */
DB_AUTO_COMMIT | DB_DIRTY_READ | DB_TXN_SYNC | DB_CREATE | DB_THREAD, S_IRUSR | S_IWUSR)) != 0) { DB_AUTO_COMMIT | DB_DIRTY_READ | DB_TXN_SYNC | DB_CREATE | DB_THREAD, S_IRUSR | S_IWUSR)) != 0) {
(void)db->close(db, 0); (void)db->close(db, 0);
dbenv->err(dbenv, ret, "db->open: %s", name); dbenv->err(dbenv, ret, "db->open: %s", name);
@ -243,7 +249,7 @@ retry: /* Begin the transaction. */
// assert(count == 1); // assert(count == 1);
for(q = offset; q < offset + count; q++) { for(q = offset; q < offset + count; q++) {
keyPtr = q; keyPtr = q+1;
valPtr = q; valPtr = q;
/* switch (ret = db->del(db, tid, &key, 0)) { /* switch (ret = db->del(db, tid, &key, 0)) {
case 0: case 0:
@ -263,8 +269,7 @@ retry: /* Begin the transaction. */
exit (1); exit (1);
} */ } */
// switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) { // pthread_mutex_lock(&hack);
//switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) {
switch (ret = db->put(db, tid, &key, &data, 0)) { switch (ret = db->put(db, tid, &key, &data, 0)) {
case 0: case 0:
break; break;
@ -283,23 +288,22 @@ retry: /* Begin the transaction. */
} }
goto retry; goto retry;
default: default:
abort(); // Error invalidates benchmark!
/* Error: run recovery. */ /* Error: run recovery. */
dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q); dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q);
abort(); // Error invalidates benchmark!
exit (1); exit (1);
} }
} // pthread_mutex_unlock(&hack);
// DBT getkey, getdata; @todo, check insertions!
}
/* Success: commit the change. */
/* if ((ret = dbc->c_close(dbc)) != 0) {
dbenv->err(dbenv, ret, "dbc->c_close");
exit (1);
}*/
if ((ret = tid->commit(tid, 0)) != 0) { if ((ret = tid->commit(tid, 0)) != 0) {
dbenv->err(dbenv, ret, "DB_TXN->commit"); dbenv->err(dbenv, ret, "DB_TXN->commit");
exit (1); exit (1);
} }
#ifdef DEBUG_BDB
printf("Called commit\n");
#endif
} }
void void
@ -310,7 +314,7 @@ usage()
} }
void void
initDB(/*DB_ENV ** dbenv, */pthread_attr_t * attr) initDB(/*DB_ENV ** dbenv, */pthread_attr_t * attr, int type)
{ {
pthread_t ptid; pthread_t ptid;
@ -337,7 +341,7 @@ initDB(/*DB_ENV ** dbenv, */pthread_attr_t * attr)
exit (1); exit (1);
} }
*/ */
db_open(dbenv, &db_cats, "cats", 1); db_open(dbenv, &db_cats, "cats", type);

View file

@ -2,7 +2,10 @@ void run_xact(DB_ENV *, DB *, int, int);
void *checkpoint_thread(void *); void *checkpoint_thread(void *);
void log_archlist(DB_ENV *); void log_archlist(DB_ENV *);
void *logfile_thread(void *); void *logfile_thread(void *);
void db_open(DB_ENV *, DB **, char *, int); /**
@param type DB_RECNO, DB_HASH or DB_BTREE
*/
void db_open(DB_ENV *, DB **, char *, int type);
void env_dir_create(void); void env_dir_create(void);
void env_open(DB_ENV **); void env_open(DB_ENV **);
void usage(void); void usage(void);

View file

@ -21,8 +21,8 @@ int main(int argc, char** argv) {
Tinit(); Tinit();
int xid = Tbegin(); int xid = Tbegin();
recordid hash = ThashCreate(xid, sizeof(int), sizeof(int)); //recordid hash = ThashCreate(xid, sizeof(int), sizeof(int));
//recordid hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); recordid hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
Tcommit(xid); Tcommit(xid);

View file

@ -86,7 +86,8 @@ int main(int argc, char** argv) {
Tinit(); Tinit();
int xid = Tbegin(); int xid = Tbegin();
hash = ThashCreate(xid, sizeof(int), sizeof(int)); // hash = ThashCreate(xid, sizeof(int), sizeof(int));
hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
int k; int k;

View file

@ -129,8 +129,8 @@ int main(int argc, char** argv) {
Tinit(); Tinit();
int xid = Tbegin(); int xid = Tbegin();
hash = ThashCreate(xid, sizeof(int), sizeof(int)); // hash = ThashCreate(xid, sizeof(int), sizeof(int));
// hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
Tcommit(xid); Tcommit(xid);
@ -143,7 +143,7 @@ int main(int argc, char** argv) {
pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&mutex, NULL);
pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN); pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN);
pthread_attr_setschedpolicy(&attr, SCHED_FIFO); // pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
@ -160,11 +160,5 @@ int main(int argc, char** argv) {
pthread_join(workers[k],NULL); pthread_join(workers[k],NULL);
} }
/* printf("mean: (max, avg) %f, %f\n", max_mean, avg_mean / (double)thread_count);
printf("variance: (max, avg) %f, %f\n", max_var, avg_var / (double)thread_count);
*/
Tdeinit(); Tdeinit();
} }

View file

@ -12,6 +12,8 @@
pthread_cond_t never; pthread_cond_t never;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int alwaysCommit;
#define MAX_SECONDS 100 #define MAX_SECONDS 100
#define COUNTER_RESOLUTION 240 #define COUNTER_RESOLUTION 240
@ -97,13 +99,13 @@ static void * go (void * arg_ptr) {
assert(timeout.tv_sec <= start.tv_sec); assert(timeout.tv_sec <= start.tv_sec);
assert(timeout.tv_nsec <= start.tv_nsec || timeout.tv_sec < start.tv_sec); assert(timeout.tv_nsec <= start.tv_nsec || timeout.tv_sec < start.tv_sec);
*/ */
if(alwaysCommit) {
xid = Tbegin(); xid = Tbegin();
}
ThashInsert(xid, hash, (byte*)&j, sizeof(int), (byte*)&j, sizeof(int)); ThashInsert(xid, hash, (byte*)&j, sizeof(int), (byte*)&j, sizeof(int));
if(alwaysCommit) {
Tcommit(xid); // used to be outside of loop! Tcommit(xid);
}
gettimeofday(&endtime_tv, NULL); gettimeofday(&endtime_tv, NULL);
@ -168,10 +170,12 @@ static void * go (void * arg_ptr) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
assert(argc == 3); assert(argc == 4);
int thread_count = atoi(argv[1]); int thread_count = atoi(argv[1]);
count = atoi(argv[2]); count = atoi(argv[2]);
alwaysCommit = atoi(argv[3]);
unlink("storefile.txt"); unlink("storefile.txt");
unlink("logfile.txt"); unlink("logfile.txt");

View file

@ -9,9 +9,9 @@
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
#include <lladd/operations/noop.h> #include <lladd/operations/noop.h>
#include <lladd/fifo.h>
#include <lladd/multiplexer.h>
#include "../logger/logMemory.h"
/** /**
re-entrant implementation of a linear hash hable, using nensted top actions. re-entrant implementation of a linear hash hable, using nensted top actions.
@ -536,3 +536,92 @@ int linearHashNTAIterator_value(int xid, void * impl, byte ** value) {
return (it->lastValue == NULL) ? 0 : it->lastValueSize; return (it->lastValue == NULL) ? 0 : it->lastValueSize;
} }
//--------------------------------- async hash operations happen below here
typedef struct {
// recordid hash;
int value_len;
int key_len;
} asyncHashInsert_t;
void ThashInsertAsync(int xid, lladdConsumer_t * cons, recordid hash, byte * value, int value_len, byte * key, int key_len) {
Tconsumer_push(xid, cons, key, key_len, value, value_len);
}
void ThashInsertConsume(int xid, recordid hash, lladdIterator_t * it) {
while(Titerator_next(xid, it)) {
byte * key;
byte * value;
int key_len = Titerator_key(xid, it, &key);
int value_len = Titerator_value(xid, it, &value);
ThashInsert(xid, hash, key, key_len, value, value_len);
Titerator_tupleDone(xid, it);
}
}
typedef struct {
lladdIterator_t * it;
recordid hash;
int xid;
} hashAsyncWorker_arg;
void * ThashAsyncWorker(void * argp) {
hashAsyncWorker_arg * arg = (hashAsyncWorker_arg*)argp;
// lladdIterator_t * it = (lladdIterator_t *) arg;
// recordid hash;
while(Titerator_next(arg->xid, arg->it)) {
lladdFifo_t * fifo;
int fifo_size = Titerator_value(arg->xid, arg->it, (byte**)&fifo);
assert(fifo_size == sizeof(lladdFifo_t));
ThashInsertConsume(arg->xid, arg->hash, fifo->iterator);
Titerator_tupleDone(arg->xid, arg->it);
}
return NULL;
}
/*lladdMultiplexer_t **/
lladdConsumer_t * TasyncHashInit(int xid, recordid rid, int numWorkerThreads,
int mainFifoLen, int numFifos,
int subFifoLen, int dirtyFifoLen,
lladdIterator_t ** dirtyIterator) {
lladdFifo_t * mainFifo = logMemoryFifo(mainFifoLen, 0);
lladdFifo_t * dirtyFifos = logMemoryFifo(dirtyFifoLen, 0);
lladdFifoPool_t * fifoPool = lladdFifoPool_ringBufferInit(numFifos, subFifoLen, NULL, dirtyFifos);
lladdMultiplexer_t * mux = lladdMultiplexer_alloc(xid, mainFifo->iterator, &multiplexHashLogByKey, fifoPool);
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN);
lladdMultiplexer_start(mux, &attr);
int i = 0;
for(i = 0; i < numWorkerThreads; i++) {
pthread_t thread;
pthread_create(&thread, &attr, ThashAsyncWorker, mux->fifoPool->dirtyPoolFifo->iterator);
pthread_detach(thread);
}
if(dirtyIterator) {
*dirtyIterator = mux->fifoPool->dirtyPoolFifo->iterator;
}
return mainFifo->consumer;
}

View file

@ -212,7 +212,7 @@ START_TEST(multiplexTest) {
pthread_cond_init(&never, NULL); pthread_cond_init(&never, NULL);
pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN); pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN);
pthread_attr_setschedpolicy(&attr, SCHED_FIFO); // pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
lladdMultiplexer_start(mux, &attr); lladdMultiplexer_start(mux, &attr);