diff --git a/benchmarks/berkeleyDB/bdbHashThreaded.c b/benchmarks/berkeleyDB/bdbHashThreaded.c index affe039..3735d92 100644 --- a/benchmarks/berkeleyDB/bdbHashThreaded.c +++ b/benchmarks/berkeleyDB/bdbHashThreaded.c @@ -54,94 +54,34 @@ int main(int argc, char *argv[]) { extern int optind; - pthread_t ptid; int ch, ret; - /* while ((ch = getopt(argc, argv, "")) != EOF) - switch (ch) { - case '?': - default: - usage(); - } - argc -= optind; - argv += optind; */ - assert(argc == 3 || argc == 4); + /* threads have static thread sizes. Ughh. */ alwaysCommit = (argc == 4); + pthread_attr_t attr; pthread_attr_init(&attr); - pthread_mutex_init(&mutex, NULL); - pthread_attr_setstacksize (&attr, 4 * PTHREAD_STACK_MIN); pthread_mutex_lock(&mutex); - + initDB(&attr); - env_dir_create(); - env_open(&dbenv); - - /* Start a checkpoint thread. */ - if ((ret = pthread_create( - &ptid, &attr, checkpoint_thread, (void *)dbenv)) != 0) { - fprintf(stderr, - "txnapp: failed spawning checkpoint thread: %s\n", - strerror(ret)); - exit (1); - } - - /* Start a logfile removal thread. */ - if ((ret = pthread_create( - &ptid, &attr, logfile_thread, (void *)dbenv)) != 0) { - fprintf(stderr, - "txnapp: failed spawning log file removal thread: %s\n", - strerror(ret)); - exit (1); - } - - /* Open database: Key is fruit class; Data is specific type. */ - /* db_open(dbenv, &db_fruit, "fruit", 0); */ - - /* Open database: Key is a color; Data is an integer. */ - /*db_open(dbenv, &db_color, "color", 0); */ - - /* - * Open database: - * Key is a name; Data is: company name, address, cat breeds. - */ - db_open(dbenv, &db_cats, "cats", 1); - - /* add_fruit(dbenv, db_fruit, "apple", "yellow delicious"); - - add_color(dbenv, db_color, "blue", 0); - add_color(dbenv, db_color, "blue", 3); */ - - /* add_cat(dbenv, db_cats, - "Amy Adams", - "Sleepycat Software", - "394 E. Riding Dr., Carlisle, MA 01741, USA", - "abyssinian", - "bengal", - "chartreaux", - NULL);*/ int r; - int num_threads = atoi(argv[1]); + int num_threads = atoi(argv[1]); + if(alwaysCommit) { - num_xact = atoi(argv[2]); //100; + num_xact = atoi(argv[2]); insert_per_xact = 1; } else { num_xact = 1; insert_per_xact = atoi(argv[2]); } - /*int *///insert_per_xact = //atoi(argv[2]); //1000; - // insert_per_xact = 1; - // int num_threads = 100; - - pthread_t * threads = malloc(num_threads * sizeof(pthread_t)); int i ; @@ -153,14 +93,11 @@ main(int argc, char *argv[]) strerror(ret)); exit (1); } - - - /* - for(r = 0; r < num_xact; r ++) { - run_xact(dbenv, db_cats, 1+r*insert_per_xact, insert_per_xact); - } - */ - + /* + for(r = 0; r < num_xact; r ++) { + run_xact(dbenv, db_cats, 1+r*insert_per_xact, insert_per_xact); + } + */ } pthread_mutex_unlock(&mutex); @@ -179,10 +116,12 @@ void * runThread(void * arg) { int offset = (int) arg; pthread_mutex_lock(&mutex); + activeThreads++; if(activeThreads > max_active) { max_active = activeThreads; } + pthread_mutex_unlock(&mutex); @@ -196,534 +135,5 @@ void * runThread(void * arg) { activeThreads--; pthread_mutex_unlock(&mutex); - - // printf("%d done\n", offset); } -#if 0 -void -env_dir_create() -{ - struct stat sb; - - /* - * If the directory exists, we're done. We do not further check - * the type of the file, DB will fail appropriately if it's the - * wrong type. - */ - if (stat(ENV_DIRECTORY, &sb) == 0) - return; - - /* Create the directory, read/write/access owner only. */ - if (mkdir(ENV_DIRECTORY, S_IRWXU) != 0) { - fprintf(stderr, - "txnapp: mkdir: %s: %s\n", ENV_DIRECTORY, strerror(errno)); - exit (1); - } -} - -void -env_open(DB_ENV **dbenvp) -{ - DB_ENV *dbenv; - int ret; - - /* Create the environment handle. */ - if ((ret = db_env_create(&dbenv, 0)) != 0) { - fprintf(stderr, - "txnapp: db_env_create: %s\n", db_strerror(ret)); - exit (1); - } - - /* Set up error handling. */ - dbenv->set_errpfx(dbenv, "txnapp"); - dbenv->set_errfile(dbenv, stderr); - - /* Do deadlock detection internally. */ - /* if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) { - dbenv->err(dbenv, ret, "set_lk_detect: DB_LOCK_DEFAULT"); - exit (1); - } */ - - /* - * Open a transactional environment: - * create if it doesn't exist - * free-threaded handle - * run recovery - * read/write owner only - */ - if ((ret = dbenv->open(dbenv, ENV_DIRECTORY, - DB_CREATE /*| DB_INIT_LOCK */| DB_INIT_LOG | - DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD, - S_IRUSR | S_IWUSR)) != 0) { - dbenv->err(dbenv, ret, "dbenv->open: %s", ENV_DIRECTORY); - exit (1); - } - - *dbenvp = dbenv; -} - -void * -checkpoint_thread(void *arg) -{ - DB_ENV *dbenv; - int ret; - - dbenv = arg; - dbenv->errx(dbenv, "Checkpoint thread: %lu", (u_long)pthread_self()); - - /* Checkpoint once a minute. */ - for (;; sleep(60)) - if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0)) != 0) { - dbenv->err(dbenv, ret, "checkpoint thread"); - exit (1); - } - - /* NOTREACHED */ -} - -void * -logfile_thread(void *arg) -{ - DB_ENV *dbenv; - int ret; - char **begin, **list; - - dbenv = arg; - dbenv->errx(dbenv, - "Log file removal thread: %lu", (u_long)pthread_self()); - - /* Check once every 5 minutes. */ - for (;; sleep(300)) { - /* Get the list of log files. */ - if ((ret = - dbenv->log_archive(dbenv, &list, DB_ARCH_ABS)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->log_archive"); - exit (1); - } - - /* Remove the log files. */ - if (list != NULL) { - for (begin = list; *list != NULL; ++list) - if ((ret = remove(*list)) != 0) { - dbenv->err(dbenv, - ret, "remove %s", *list); - exit (1); - } - free (begin); - } - } - /* NOTREACHED */ -} - -void -log_archlist(DB_ENV *dbenv) -{ - int ret; - char **begin, **list; - - /* Get the list of database files. */ - if ((ret = dbenv->log_archive(dbenv, - &list, DB_ARCH_ABS | DB_ARCH_DATA)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->log_archive: DB_ARCH_DATA"); - exit (1); - } - if (list != NULL) { - for (begin = list; *list != NULL; ++list) - printf("database file: %s\n", *list); - free (begin); - } - - /* Get the list of log files. */ - if ((ret = dbenv->log_archive(dbenv, - &list, DB_ARCH_ABS | DB_ARCH_LOG)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->log_archive: DB_ARCH_LOG"); - exit (1); - } - if (list != NULL) { - for (begin = list; *list != NULL; ++list) - printf("log file: %s\n", *list); - free (begin); - } -} - -void -db_open(DB_ENV *dbenv, DB **dbp, char *name, int dups) -{ - DB *db; - int ret; - - /* Create the database handle. */ - if ((ret = db_create(&db, dbenv, 0)) != 0) { - dbenv->err(dbenv, ret, "db_create"); - exit (1); - } - - /* Optionally, turn on duplicate data items. */ - /* if (dups && (ret = db->set_flags(db, DB_DUP)) != 0) { - dbenv->err(dbenv, ret, "db->set_flags: DB_DUP"); - exit (1); - } */ - - /* - * Open a database in the environment: - * create if it doesn't exist - * free-threaded handle - * read/write owner only - */ - if ((ret = db->open(db, NULL, name, NULL, /*DB_BTREE*//* DB_RECNO */DB_HASH, - DB_AUTO_COMMIT | DB_DIRTY_READ | DB_TXN_SYNC | DB_CREATE | DB_THREAD, S_IRUSR | S_IWUSR)) != 0) { - (void)db->close(db, 0); - dbenv->err(dbenv, ret, "db->open: %s", name); - exit (1); - } - - *dbp = db; -} - -void -add_fruit(DB_ENV *dbenv, DB *db, char *fruit, char *name) -{ - DBT key, data; - DB_TXN *tid; - int ret; - - /* Initialization. */ - memset(&key, 0, sizeof(key)); - memset(&data, 0, sizeof(data)); - key.data = fruit; - key.size = strlen(fruit); - data.data = name; - data.size = strlen(name); - - for (;;) { - /* Begin the transaction. */ - if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->txn_begin"); - exit (1); - } - - /* Store the value. */ - switch (ret = db->put(db, tid, &key, &data, 0)) { - case 0: - /* Success: commit the change. */ - if ((ret = tid->commit(tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->commit"); - exit (1); - } - return; - case DB_LOCK_DEADLOCK: - /* Deadlock: retry the operation. */ - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); - } - break; - default: - /* Error: run recovery. */ - dbenv->err(dbenv, ret, "dbc->put: %s/%s", fruit, name); - exit (1); - } - } -} - -void -add_color(DB_ENV *dbenv, DB *dbp, char *color, int increment) -{ - DBT key, data; - DB_TXN *tid; - int original, ret; - char buf[64]; - - /* Initialization. */ - memset(&key, 0, sizeof(key)); - key.data = color; - key.size = strlen(color); - memset(&data, 0, sizeof(data)); - data.flags = DB_DBT_MALLOC; - - for (;;) { - /* Begin the transaction. */ - if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->txn_begin"); - exit (1); - } - - /* - * Get the key. If it exists, we increment the value. If it - * doesn't exist, we create it. - */ - switch (ret = dbp->get(dbp, tid, &key, &data, 0)) { - case 0: - original = atoi(data.data); - break; - case DB_LOCK_DEADLOCK: - /* Deadlock: retry the operation. */ - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); - } - continue; - case DB_NOTFOUND: - original = 0; - break; - default: - /* Error: run recovery. */ - dbenv->err( - dbenv, ret, "dbc->get: %s/%d", color, increment); - exit (1); - } - if (data.data != NULL) - free(data.data); - - /* Create the new data item. */ - (void)snprintf(buf, sizeof(buf), "%d", original + increment); - data.data = buf; - data.size = strlen(buf) + 1; - - /* Store the new value. */ - switch (ret = dbp->put(dbp, tid, &key, &data, 0)) { - case 0: - /* Success: commit the change. */ - if ((ret = tid->commit(tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->commit"); - exit (1); - } - return; - case DB_LOCK_DEADLOCK: - /* Deadlock: retry the operation. */ - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); - } - break; - default: - /* Error: run recovery. */ - dbenv->err( - dbenv, ret, "dbc->put: %s/%d", color, increment); - exit (1); - } - } -} - -void -add_cat(DB_ENV *dbenv, DB *db, char *name, ...) -{ - va_list ap; - DBC *dbc; - DBT key, data; - DB_TXN *tid; - int ret; - char *s; - - /* Initialization. */ - memset(&key, 0, sizeof(key)); - memset(&data, 0, sizeof(data)); - key.data = name; - key.size = strlen(name); - -retry: /* Begin the transaction. */ - if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->txn_begin"); - exit (1); - } - - /* Delete any previously existing item -- LLADD always does this during insert.*/ - /* switch (ret = db->del(db, tid, &key, 0)) { - case 0: - case DB_NOTFOUND: - break; - case DB_LOCK_DEADLOCK: - // Deadlock: retry the operation. - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); - } - goto retry; - default: - dbenv->err(dbenv, ret, "db->del: %s", name); - exit (1); - }*/ - - /* Create a cursor. */ - if ((ret = db->cursor(db, tid, &dbc, 0)) != 0) { - dbenv->err(dbenv, ret, "db->cursor"); - exit (1); - } - - /* Append the items, in order. */ - va_start(ap, name); - while ((s = va_arg(ap, char *)) != NULL) { - data.data = s; - data.size = strlen(s); - switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) { - case 0: - break; - case DB_LOCK_DEADLOCK: - va_end(ap); - abort(); - /* Deadlock: retry the operation. */ - if ((ret = dbc->c_close(dbc)) != 0) { - dbenv->err( - dbenv, ret, "dbc->c_close"); - exit (1); - } - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); - } - goto retry; - default: - /* Error: run recovery. */ - dbenv->err(dbenv, ret, "dbc->put: %s/%s", name, s); - exit (1); - } - } - va_end(ap); - - /* Success: commit the change. */ - if ((ret = dbc->c_close(dbc)) != 0) { - dbenv->err(dbenv, ret, "dbc->c_close"); - exit (1); - } - if ((ret = tid->commit(tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->commit"); - exit (1); - } -} - -void -run_xact(DB_ENV *dbenv, DB *db, int offset, int count) -{ - // va_list ap; - DBC *dbc; - DBT key, data; - DB_TXN *tid; - int ret; - char *s; - - /* Initialization. */ - memset(&key, 0, sizeof(key)); - memset(&data, 0, sizeof(data)); - int keyPtr; - int valPtr; - key.data = &keyPtr; - key.size = sizeof(int);/*strlen(name);*/ - data.data = &valPtr; - data.size = sizeof(int); - -retry: /* Begin the transaction. */ - if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->txn_begin"); - // exit (1); - abort(); - } - - /* Delete any previously existing item. */ - switch (ret = db->del(db, tid, &key, 0)) { - case 0: - case DB_NOTFOUND: - break; - case DB_LOCK_DEADLOCK: - abort(); - /* Deadlock: retry the operation. */ - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); - } - goto retry; - default: - // dbenv->err(dbenv, ret, "db->del: %s", name); - abort(); - exit (1); - } - - /* Create a cursor. */ - if ((ret = db->cursor(db, tid, &dbc, 0)) != 0) { - dbenv->err(dbenv, ret, "db->cursor"); - abort(); - // exit (1); - } - - /* Append the items, in order. */ - // va_start(ap, name); - // while ((s = va_arg(ap, char *)) != NULL) { - int q; - for(q = offset; q < offset + count; q++) { - keyPtr = q; - valPtr = q; - - switch (ret = db->del(db, tid, &key, 0)) { - case 0: - // abort(); // we dont insert dups in this test! - case DB_NOTFOUND: - break; - case DB_LOCK_DEADLOCK: - /// Deadlock: retry the operation. - abort(); // the lock manager should be disabled for this test... - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - // exit (1); - abort(); - } - goto retry; - default: - dbenv->err(dbenv, ret, "db->del: %d", q); - // exit (1); - abort(); - } - - /* data.data = s; - data.size = strlen(s); */ - // printf("A"); fflush(NULL); - - switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) { - case 0: - // printf("B"); fflush(NULL); - break; - case DB_LOCK_DEADLOCK: - // va_end(ap); - abort(); // no locking! - /* Deadlock: retry the operation. */ - if ((ret = dbc->c_close(dbc)) != 0) { - dbenv->err( - dbenv, ret, "dbc->c_close"); - // exit (1); - abort(); - } - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - // exit (1); - abort(); - } - goto retry; - default: - /* Error: run recovery. */ - dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q); - // exit (1); - abort(); - } - } - // va_end(ap); - - /* Success: commit the change. */ - if ((ret = dbc->c_close(dbc)) != 0) { - dbenv->err(dbenv, ret, "dbc->c_close"); - // exit (1); - abort(); - } - if ((ret = tid->commit(tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->commit"); - // exit (1); - abort(); - } - // printf("commit"); -} - -void -usage() -{ - (void)fprintf(stderr, "usage: txnapp\n"); - exit(1); -} -#endif diff --git a/benchmarks/berkeleyDB/bdbHashWriteRequests.c b/benchmarks/berkeleyDB/bdbHashWriteRequests.c index 6bbe01a..ccb61ac 100644 --- a/benchmarks/berkeleyDB/bdbHashWriteRequests.c +++ b/benchmarks/berkeleyDB/bdbHashWriteRequests.c @@ -31,20 +31,6 @@ int max_active = 0; pthread_cond_t never; pthread_mutex_t mutex; -#if 0 -void run_xact(DB_ENV *, DB *, int, int); -void *checkpoint_thread(void *); -void log_archlist(DB_ENV *); -void *logfile_thread(void *); -void db_open(DB_ENV *, DB **, char *, int); -void env_dir_create(void); -void env_open(DB_ENV **); -void usage(void); - -DB_ENV *dbenv; -DB *db_cats; /*, *db_color, *db_fruit; */ -#endif - int num_xact; int insert_per_xact; void * runThread(void * arg); @@ -53,7 +39,6 @@ main(int argc, char *argv[]) { extern int optind; - pthread_t ptid; int ch, ret; assert(argc == 3); @@ -75,40 +60,6 @@ main(int argc, char *argv[]) buckets[l] = 0; } - - env_dir_create(); - env_open(&dbenv); - - /* Start a checkpoint thread. */ - if ((ret = pthread_create( - &ptid, &attr, checkpoint_thread, (void *)dbenv)) != 0) { - fprintf(stderr, - "txnapp: failed spawning checkpoint thread: %s\n", - strerror(ret)); - exit (1); - } - - /* Start a logfile removal thread. */ - if ((ret = pthread_create( - &ptid, &attr, logfile_thread, (void *)dbenv)) != 0) { - fprintf(stderr, - "txnapp: failed spawning log file removal thread: %s\n", - strerror(ret)); - exit (1); - } - - /* Open database: Key is fruit class; Data is specific type. */ - /* db_open(dbenv, &db_fruit, "fruit", 0); */ - - /* Open database: Key is a color; Data is an integer. */ - /*db_open(dbenv, &db_color, "color", 0); */ - - /* - * Open database: - * Key is a name; Data is: company name, address, cat breeds. - */ - db_open(dbenv, &db_cats, "cats", 1); - int r; int num_threads = atoi(argv[1]); @@ -226,268 +177,3 @@ void * runThread(void * arg) { // printf("%d done\n", offset); } -#if 0 -void -env_dir_create() -{ - struct stat sb; - - /* - * If the directory exists, we're done. We do not further check - * the type of the file, DB will fail appropriately if it's the - * wrong type. - */ - if (stat(ENV_DIRECTORY, &sb) == 0) - return; - - /* Create the directory, read/write/access owner only. */ - if (mkdir(ENV_DIRECTORY, S_IRWXU) != 0) { - fprintf(stderr, - "txnapp: mkdir: %s: %s\n", ENV_DIRECTORY, strerror(errno)); - exit (1); - } -} - -void -env_open(DB_ENV **dbenvp) -{ - DB_ENV *dbenv; - int ret; - - /* Create the environment handle. */ - if ((ret = db_env_create(&dbenv, 0)) != 0) { - fprintf(stderr, - "txnapp: db_env_create: %s\n", db_strerror(ret)); - exit (1); - } - - /* Set up error handling. */ - dbenv->set_errpfx(dbenv, "txnapp"); - dbenv->set_errfile(dbenv, stderr); - - /* Do deadlock detection internally. */ - /* if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) { - dbenv->err(dbenv, ret, "set_lk_detect: DB_LOCK_DEFAULT"); - exit (1); - }*/ - - /* - * Open a transactional environment: - * create if it doesn't exist - * free-threaded handle - * run recovery - * read/write owner only - */ - if ((ret = dbenv->open(dbenv, ENV_DIRECTORY, - DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | - DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD, - S_IRUSR | S_IWUSR)) != 0) { - dbenv->err(dbenv, ret, "dbenv->open: %s", ENV_DIRECTORY); - exit (1); - } - - *dbenvp = dbenv; -} - -void * -checkpoint_thread(void *arg) -{ - DB_ENV *dbenv; - int ret; - - dbenv = arg; - dbenv->errx(dbenv, "Checkpoint thread: %lu", (u_long)pthread_self()); - - /* Checkpoint once a minute. */ - for (;; sleep(60)) - if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0)) != 0) { - dbenv->err(dbenv, ret, "checkpoint thread"); - exit (1); - } - - /* NOTREACHED */ -} - -void * -logfile_thread(void *arg) -{ - DB_ENV *dbenv; - int ret; - char **begin, **list; - - dbenv = arg; - dbenv->errx(dbenv, - "Log file removal thread: %lu", (u_long)pthread_self()); - - /* Check once every 5 minutes. */ - for (;; sleep(300)) { - /* Get the list of log files. */ - if ((ret = - dbenv->log_archive(dbenv, &list, DB_ARCH_ABS)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->log_archive"); - exit (1); - } - - /* Remove the log files. */ - if (list != NULL) { - for (begin = list; *list != NULL; ++list) - if ((ret = remove(*list)) != 0) { - dbenv->err(dbenv, - ret, "remove %s", *list); - exit (1); - } - free (begin); - } - } - /* NOTREACHED */ -} - -void -log_archlist(DB_ENV *dbenv) -{ - int ret; - char **begin, **list; - - /* Get the list of database files. */ - if ((ret = dbenv->log_archive(dbenv, - &list, DB_ARCH_ABS | DB_ARCH_DATA)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->log_archive: DB_ARCH_DATA"); - exit (1); - } - if (list != NULL) { - for (begin = list; *list != NULL; ++list) - printf("database file: %s\n", *list); - free (begin); - } - - /* Get the list of log files. */ - if ((ret = dbenv->log_archive(dbenv, - &list, DB_ARCH_ABS | DB_ARCH_LOG)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->log_archive: DB_ARCH_LOG"); - exit (1); - } - if (list != NULL) { - for (begin = list; *list != NULL; ++list) - printf("log file: %s\n", *list); - free (begin); - } -} - -void -db_open(DB_ENV *dbenv, DB **dbp, char *name, int dups) -{ - DB *db; - int ret; - - /* Create the database handle. */ - if ((ret = db_create(&db, dbenv, 0)) != 0) { - dbenv->err(dbenv, ret, "db_create"); - exit (1); - } - - /* Optionally, turn on duplicate data items. */ - /* if (dups && (ret = db->set_flags(db, DB_DUP)) != 0) { - dbenv->err(dbenv, ret, "db->set_flags: DB_DUP"); - exit (1); - } */ - - /* - * Open a database in the environment: - * create if it doesn't exist - * free-threaded handle - * read/write owner only - */ - if ((ret = db->open(db, NULL, name, NULL, /*DB_BTREE*//* DB_RECNO */DB_HASH, - DB_AUTO_COMMIT | DB_DIRTY_READ | DB_TXN_SYNC | DB_CREATE | DB_THREAD, S_IRUSR | S_IWUSR)) != 0) { - (void)db->close(db, 0); - dbenv->err(dbenv, ret, "db->open: %s", name); - exit (1); - } - - *dbp = db; -} - -void -run_xact(DB_ENV *dbenv, DB *db, int offset, int count) -{ - // va_list ap; - DBC *dbc; - DBT key, data; - DB_TXN *tid; - int ret; - char *s; - - /* Initialization. */ - memset(&key, 0, sizeof(key)); - memset(&data, 0, sizeof(data)); - int keyPtr; - int valPtr; - key.data = &keyPtr; - key.size = sizeof(int);/*strlen(name);*/ - data.data = &valPtr; - data.size = sizeof(int); - -retry: /* Begin the transaction. */ - if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_ENV->txn_begin"); - exit (1); - } - - /* Create a cursor. */ - if ((ret = db->cursor(db, tid, &dbc, 0)) != 0) { - dbenv->err(dbenv, ret, "db->cursor"); - exit (1); - } - - int q; - /* Count is one for this test */ - assert(count == 1); - - for(q = offset; q < offset + count; q++) { - keyPtr = q; - valPtr = q; - - switch (ret = dbc->c_put(dbc, &key, &data, DB_KEYLAST)) { - case 0: - break; - case DB_LOCK_DEADLOCK: - // va_end(ap); - abort(); // Locking should be disabled! - /* Deadlock: retry the operation. */ - if ((ret = dbc->c_close(dbc)) != 0) { - dbenv->err( - dbenv, ret, "dbc->c_close"); - exit (1); - } - if ((ret = tid->abort(tid)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->abort"); - exit (1); - } - goto retry; - default: - abort(); // Error invalidates benchmark! - /* Error: run recovery. */ - dbenv->err(dbenv, ret, "dbc->put: %d/%d", q, q); - exit (1); - } - } - - /* Success: commit the change. */ - if ((ret = dbc->c_close(dbc)) != 0) { - dbenv->err(dbenv, ret, "dbc->c_close"); - exit (1); - } - if ((ret = tid->commit(tid, 0)) != 0) { - dbenv->err(dbenv, ret, "DB_TXN->commit"); - exit (1); - } -} - -void -usage() -{ - (void)fprintf(stderr, "usage: txnapp\n"); - exit(1); -} -#endif // 0 - diff --git a/benchmarks/berkeleyDB/genericBerkeleyDBCode.c b/benchmarks/berkeleyDB/genericBerkeleyDBCode.c index 26d2785..8e9cb8a 100644 --- a/benchmarks/berkeleyDB/genericBerkeleyDBCode.c +++ b/benchmarks/berkeleyDB/genericBerkeleyDBCode.c @@ -176,7 +176,7 @@ db_open(DB_ENV *dbenv, DB **dbp, char *name, int dups) * free-threaded handle * read/write owner only */ - if ((ret = db->open(db, NULL, name, NULL, /*DB_BTREE*//* DB_RECNO */DB_HASH, + if ((ret = db->open(db, NULL, name, NULL, /*DB_BTREE*//* DB_RECNO */DB_HASH, /*DB_DIRECT_LOG | DB_DIRECT_DB | */ DB_AUTO_COMMIT | DB_DIRTY_READ | DB_TXN_SYNC | DB_CREATE | DB_THREAD, S_IRUSR | S_IWUSR)) != 0) { (void)db->close(db, 0); dbenv->err(dbenv, ret, "db->open: %s", name); diff --git a/src/lladd/logger/logWriter.c b/src/lladd/logger/logWriter.c index cf061db..c3670f2 100644 --- a/src/lladd/logger/logWriter.c +++ b/src/lladd/logger/logWriter.c @@ -135,11 +135,12 @@ pthread_mutex_t log_write_mutex; */ pthread_mutex_t truncateLog_mutex; - +/** If 1, flushLog needs to call fseek the next time it is called. */ static int sought = 1; static char * buffer; int openLogWriter() { -#define BUFSIZE (1024*96) + //#define BUFSIZE (1024*96) +#define BUFSIZE (1024 * 1024) //#define BUFSIZE (512) //char * buffer ;/*= malloc(BUFSIZE);*/ /* int ret = posix_memalign((void*)&(buffer), PAGE_SIZE, BUFSIZE); @@ -148,7 +149,7 @@ int openLogWriter() { if(!buffer) { return LLADD_NO_MEM; } - int logFD = open (LOG_FILE, O_CREAT | O_RDWR | O_APPEND /*| O_SYNC*/, S_IRWXU | S_IRWXG | S_IRWXO); + int logFD = open (LOG_FILE, O_CREAT | O_RDWR | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO); if(logFD == -1) { perror("Couldn't open log file (A)"); abort(); @@ -315,7 +316,14 @@ int writeLogEntry(LogEntry * e) { interactions between write() calls and readLSN, and locking, so this currently only writes one entry at a time. (If this function weren't designed to bundle log entries together, it would not make - such heavy use of global variables...) */ + such heavy use of global variables...) + + This function should only be called when the calling thread holds + a write lock on log_read_lock. In theory, this function will not + block on I/O, although in the case of long running-transactions, + log's buffer may fill up. Since we have opened the file with + O_SYNC, this function may block on disk I/O. +*/ static int flushLog() { if (!logBuffer) { return 0;} @@ -339,22 +347,24 @@ static int flushLog() { void syncLog() { lsn_t newFlushedLSN; + writelock(log_read_lock, 0); if(sought) { newFlushedLSN = myFseek(log, 0, SEEK_END); - sought = 1; + sought = 1; } else { newFlushedLSN = ftell(log); } + writeunlock(log_read_lock); /* Wait to set the static variable until after the flush returns. */ fflush(log); // Since we open the logfile with O_SYNC, fflush suffices. #ifdef HAVE_FDATASYNC /* Should be available in linux >= 2.4 */ - fdatasync(fileno(log)); + // fdatasync(fileno(log)); #else /* Slow - forces fs implementation to sync the file metadata to disk */ - fsync(fileno(log)); + // fsync(fileno(log)); #endif writelock(flushedLSN_lock, 0); @@ -531,9 +541,9 @@ int truncateLog(lsn_t LSN) { fflush(tmpLog); #ifdef HAVE_FDATASYNC - fdatasync(fileno(tmpLog)); + // fdatasync(fileno(tmpLog)); #else - fsync(fileno(tmpLog)); + // fsync(fileno(tmpLog)); #endif /** Time to shut out the readers */ diff --git a/src/lladd/logger/logWriter.h b/src/lladd/logger/logWriter.h index 53f6aa5..5a719f1 100644 --- a/src/lladd/logger/logWriter.h +++ b/src/lladd/logger/logWriter.h @@ -59,10 +59,9 @@ terms specified in this license. * performance. * * @todo Everything in this file cores on failure (no error handling yet) - * @todo All of the logWriter calls should be reentrant. (aren't they?) - * @todo logWriter currently calls fwrite once per log entry. This may be a - * significant bottleneck. What is the most efficient way to write out - * log entries will still correctly supporing readLSN? + * @todo logWriter is optimized for forward operation, scanning backwards + * (as in abort()) is very slow. Perhaps we should maintian a + * 'write behind' buffer? * * $Id$ * diff --git a/src/lladd/operations/linearHashNTA.c b/src/lladd/operations/linearHashNTA.c index 6f681b3..6096945 100644 --- a/src/lladd/operations/linearHashNTA.c +++ b/src/lladd/operations/linearHashNTA.c @@ -223,7 +223,6 @@ compensated_function static int __ThashInsert(int xid, recordid hashHeader, cons Tread(xid, bucket, &bucketList); // int before = TpagedListSpansPages(xid, bucketList); - ret = TpagedListRemove(xid, bucketList, key, keySize); TpagedListInsert(xid, bucketList, key, keySize, value, valueSize);