mirror of
https://github.com/berkeleydb/je.git
synced 2024-11-15 01:46:24 +00:00
1531 lines
58 KiB
Java
1531 lines
58 KiB
Java
|
/*-
|
|||
|
* Copyright (C) 2002, 2017, Oracle and/or its affiliates. All rights reserved.
|
|||
|
*
|
|||
|
* This file was distributed by Oracle as part of a version of Oracle Berkeley
|
|||
|
* DB Java Edition made available at:
|
|||
|
*
|
|||
|
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
|
|||
|
*
|
|||
|
* Please see the LICENSE file included in the top-level directory of the
|
|||
|
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
|
|||
|
* license and additional information.
|
|||
|
*/
|
|||
|
|
|||
|
import java.io.File;
|
|||
|
import java.util.Collection;
|
|||
|
import java.util.HashSet;
|
|||
|
import java.util.List;
|
|||
|
import java.util.Map;
|
|||
|
import java.util.Set;
|
|||
|
import java.util.concurrent.BrokenBarrierException;
|
|||
|
import java.util.concurrent.Callable;
|
|||
|
import java.util.concurrent.ConcurrentHashMap;
|
|||
|
import java.util.concurrent.CountDownLatch;
|
|||
|
import java.util.concurrent.CyclicBarrier;
|
|||
|
import java.util.concurrent.ExecutorService;
|
|||
|
import java.util.concurrent.Executors;
|
|||
|
import java.util.concurrent.TimeUnit;
|
|||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
import java.util.logging.Formatter;
|
|||
|
import java.util.logging.Level;
|
|||
|
import java.util.logging.Logger;
|
|||
|
|
|||
|
import com.sleepycat.bind.tuple.IntegerBinding;
|
|||
|
import com.sleepycat.je.Cursor;
|
|||
|
import com.sleepycat.je.CursorConfig;
|
|||
|
import com.sleepycat.je.Database;
|
|||
|
import com.sleepycat.je.DatabaseConfig;
|
|||
|
import com.sleepycat.je.DatabaseEntry;
|
|||
|
import com.sleepycat.je.Durability;
|
|||
|
import com.sleepycat.je.Durability.ReplicaAckPolicy;
|
|||
|
import com.sleepycat.je.Durability.SyncPolicy;
|
|||
|
import com.sleepycat.je.Environment;
|
|||
|
import com.sleepycat.je.EnvironmentConfig;
|
|||
|
import com.sleepycat.je.OperationStatus;
|
|||
|
import com.sleepycat.je.Transaction;
|
|||
|
import com.sleepycat.je.TransactionConfig;
|
|||
|
import com.sleepycat.je.rep.InsufficientAcksException;
|
|||
|
import com.sleepycat.je.rep.ReplicaWriteException;
|
|||
|
import com.sleepycat.je.rep.ReplicatedEnvironment;
|
|||
|
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
|
|||
|
import com.sleepycat.je.rep.ReplicationConfig;
|
|||
|
import com.sleepycat.je.rep.ReplicationNode;
|
|||
|
import com.sleepycat.je.rep.StateChangeEvent;
|
|||
|
import com.sleepycat.je.rep.StateChangeListener;
|
|||
|
import com.sleepycat.je.rep.UnknownMasterException;
|
|||
|
import com.sleepycat.je.rep.impl.node.NameIdPair;
|
|||
|
import com.sleepycat.je.rep.utilint.ReplicationFormatter;
|
|||
|
import com.sleepycat.je.utilint.LoggerUtils;
|
|||
|
|
|||
|
/**
|
|||
|
* MasterTransferExercise exercises master transfer at the JE level. The test
|
|||
|
* starts a JE HA group of a configurable size, devoting a thread to each
|
|||
|
* node. If a node is a master, it will in turn start numAppThreads threads,
|
|||
|
* which each executes a stream of application operations to present a busy
|
|||
|
* write-only workload. The operation load can be configured so that it
|
|||
|
* consists of record inserts, record updates, or database operations.
|
|||
|
*
|
|||
|
* Concurrently, a task requesst a master transfer. The master transfer
|
|||
|
* protocol is a two phase affair which will hold block transactions in the
|
|||
|
* second phase, so the goal is to exercise that vulnerable blocking period
|
|||
|
* by making the application load and the transfer execute in parallel.
|
|||
|
*
|
|||
|
* The test is a success if:
|
|||
|
* - no node experiences a RollbackProhibitedException
|
|||
|
* - in theory, the nodes should all take roughly equal turns being the master
|
|||
|
* - the test maintains a set of committed data, and checks that each node
|
|||
|
* contains the correct data.
|
|||
|
*
|
|||
|
* One flavor of test has mimics an application load of data record inserts,
|
|||
|
* updates and deletes, while the other executes database creates, renames, and
|
|||
|
* removes. Both flavors must be tested, because there is specialized txn
|
|||
|
* handling for database operations.
|
|||
|
*
|
|||
|
* Here's a timeline of what happens:
|
|||
|
*
|
|||
|
* Test starts up N threads, each instantiating a MasterTransferExercise
|
|||
|
* instance, which operates a JE HA node.
|
|||
|
*
|
|||
|
* Each exercise instance waits to become the master. If it is the master,it
|
|||
|
* spawns X threads to act as application threads, and 1 thread to instigate a
|
|||
|
* master transfer.
|
|||
|
*
|
|||
|
* When the master transfer begins, the application threads are caught mid-txn,
|
|||
|
* and must abort and cleanup the inflight txns. The old master becomes a
|
|||
|
* replica.
|
|||
|
*
|
|||
|
* At the end of the test, each node should contain the same data. The same
|
|||
|
* environment handle should have been used for the entire time of the test.
|
|||
|
*/
|
|||
|
|
|||
|
public class MasterTransferExercise {
|
|||
|
|
|||
|
private static final String TEST = "TEST: ";
|
|||
|
private static final String TEST_FAIL = "FAILURE - ";
|
|||
|
private static final String DB_NAME = "MasterTransferDB";
|
|||
|
private static final String REP_GROUP_NAME = "MasterTransferGroup";
|
|||
|
private static final int TRANSFER_TIME = 10;
|
|||
|
private static final TimeUnit TRANSFER_UNIT = TimeUnit.SECONDS;
|
|||
|
private static final String NODE_NAME_PREFIX = "node";
|
|||
|
|
|||
|
/*
|
|||
|
* Types of test data -- either regular records, or creating and
|
|||
|
* removing database.
|
|||
|
*/
|
|||
|
private static final String GEN_KEY_DATA = "KeyData";
|
|||
|
private static final String GEN_DB_OPS = "DbOps";
|
|||
|
/*
|
|||
|
* Coordinate the node threads that execute as the replication nodes:
|
|||
|
* - shutdownBarrier: wait for them all to finish inserts before closing
|
|||
|
* env and losing quorum
|
|||
|
* - verifyBarrier: wait for them all to finish shutting down before
|
|||
|
* reopening for verify.
|
|||
|
*/
|
|||
|
private static CyclicBarrier initBarrier;
|
|||
|
private static CyclicBarrier startWorkBarrier;
|
|||
|
private static CyclicBarrier shutdownBarrier;
|
|||
|
private static CyclicBarrier verifyBarrier;
|
|||
|
|
|||
|
/*
|
|||
|
* Report on test stats, and verify db contents after all nodes are
|
|||
|
* finished.
|
|||
|
*/
|
|||
|
private static CountDownLatch waitForInsertEnd;
|
|||
|
|
|||
|
/*
|
|||
|
* Let the main thread wait for the nodes to be done before letting the
|
|||
|
* test complete.
|
|||
|
*/
|
|||
|
private static CountDownLatch waitForVerify;
|
|||
|
|
|||
|
/* Coordinate which records are used in the test */
|
|||
|
private static DataGenerator dataGenerator;
|
|||
|
|
|||
|
private static TestConfiguration testConfig;
|
|||
|
private static AtomicBoolean testSuccess;
|
|||
|
private static AtomicInteger totalMasterStints;
|
|||
|
|
|||
|
private final String nodeName;
|
|||
|
private final int nodeId;
|
|||
|
private Logger logger;
|
|||
|
private Formatter formatter;
|
|||
|
private ReplicatedEnvironment repEnv;
|
|||
|
private Database db;
|
|||
|
|
|||
|
private AtomicInteger numMasterStints = new AtomicInteger(0);
|
|||
|
private AtomicInteger taskNumber = new AtomicInteger(0);
|
|||
|
|
|||
|
public static void main(String[] argv)
|
|||
|
throws Exception {
|
|||
|
|
|||
|
testConfig = new TestConfiguration(argv);
|
|||
|
System.out.println(TEST + testConfig.showParams());
|
|||
|
|
|||
|
testSuccess = new AtomicBoolean(true);
|
|||
|
totalMasterStints = new AtomicInteger(0);
|
|||
|
|
|||
|
/*
|
|||
|
* waitForInit provides coordination so the test begins only after all
|
|||
|
* the environments are open.
|
|||
|
*/
|
|||
|
initBarrier = new CyclicBarrier(testConfig.groupSize);
|
|||
|
startWorkBarrier = new CyclicBarrier(testConfig.groupSize);
|
|||
|
shutdownBarrier = new CyclicBarrier(testConfig.groupSize);
|
|||
|
verifyBarrier = new CyclicBarrier(testConfig.groupSize);
|
|||
|
|
|||
|
waitForInsertEnd = new CountDownLatch(testConfig.groupSize);
|
|||
|
waitForVerify = new CountDownLatch(testConfig.groupSize);
|
|||
|
|
|||
|
/* Each "tester" is a single JE environment */
|
|||
|
MasterTransferExercise[] testers =
|
|||
|
new MasterTransferExercise[testConfig.groupSize];
|
|||
|
|
|||
|
/*
|
|||
|
* Each tester will countdown on the waitForInsertEnd latch when it has
|
|||
|
* run for the prescribed time limit.
|
|||
|
*/
|
|||
|
if (testConfig.testType.equals(GEN_KEY_DATA)) {
|
|||
|
dataGenerator = new DataAndKeyGenerator();
|
|||
|
} else {
|
|||
|
dataGenerator = new DbOpsGenerator();
|
|||
|
}
|
|||
|
|
|||
|
for (int i = 0; i < testConfig.groupSize; i++) {
|
|||
|
final MasterTransferExercise test =
|
|||
|
new MasterTransferExercise(i + 1);
|
|||
|
testers[i] = test;
|
|||
|
Thread t = new Thread() {
|
|||
|
@Override
|
|||
|
public void run() {
|
|||
|
test.runTest();
|
|||
|
}};
|
|||
|
t.start();
|
|||
|
}
|
|||
|
waitForInsertEnd.await();
|
|||
|
|
|||
|
/*
|
|||
|
* Verify that each node contains the expected data. The verify will
|
|||
|
* issue an ack-all transaction to ensure that everyone's environment
|
|||
|
* is up to date, so it must be spawned by multiple threads, rather
|
|||
|
* than serially.
|
|||
|
*/
|
|||
|
for (int i = 0; i < testConfig.groupSize; i++) {
|
|||
|
final MasterTransferExercise test = testers[i];
|
|||
|
Thread t = new Thread() {
|
|||
|
@Override
|
|||
|
public void run() {
|
|||
|
test.verify();
|
|||
|
}};
|
|||
|
t.start();
|
|||
|
}
|
|||
|
waitForVerify.await();
|
|||
|
|
|||
|
/*
|
|||
|
* Check that the nodes shared mastership a reasonable number of
|
|||
|
* times. Arbitrarily, a sufficient amount of sharing would be a fifth
|
|||
|
* of the average number of master stints.
|
|||
|
*/
|
|||
|
int minStints = (totalMasterStints.get()/testConfig.groupSize)/5;
|
|||
|
for (int i = 0; i < testConfig.groupSize; i++) {
|
|||
|
testers[i].reportStints(minStints);
|
|||
|
}
|
|||
|
|
|||
|
System.out.println(TEST + (testSuccess.get() ? "SUCCEEDED" :
|
|||
|
"FAILED"));
|
|||
|
}
|
|||
|
|
|||
|
MasterTransferExercise(int nodeId) {
|
|||
|
|
|||
|
this.nodeName = NODE_NAME_PREFIX + nodeId;
|
|||
|
this.nodeId = nodeId;
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Main thread, runs the replication node.
|
|||
|
*
|
|||
|
* - open the node
|
|||
|
* - when the node becomes master, do inserts and simultaneously request
|
|||
|
* a master transfer.
|
|||
|
* - when the node loses mastership, close all application txns.
|
|||
|
* - loop and wait for the next stint as master.
|
|||
|
*
|
|||
|
* The loop ends when the test has executed for the prescribed amount of
|
|||
|
* time, or an unexpected exception has been thrown.
|
|||
|
* @throws InterruptedException
|
|||
|
*/
|
|||
|
final void runTest() {
|
|||
|
|
|||
|
try {
|
|||
|
/* Initialize the environment */
|
|||
|
setupEnvironment(false);
|
|||
|
|
|||
|
/* Wait for all nodes to open their environments */
|
|||
|
initBarrier.await();
|
|||
|
|
|||
|
/*
|
|||
|
* If you are the master, load up any existing records into the
|
|||
|
* committedRecords set, so that the test verification works even
|
|||
|
* if the test is running from a non-empty db.
|
|||
|
*/
|
|||
|
if (repEnv.getState() == State.MASTER) {
|
|||
|
Set<TestRecord> existing =
|
|||
|
dataGenerator.readTestRecords(nodeName, repEnv, db);
|
|||
|
System.out.println(TEST + "loading " + existing.size() +
|
|||
|
" pre-existing records");
|
|||
|
dataGenerator.updateCommittedRecords(existing);
|
|||
|
}
|
|||
|
|
|||
|
startWorkBarrier.await();
|
|||
|
|
|||
|
/* The test will only run for "testMinutes" amount of time */
|
|||
|
long endTime = System.currentTimeMillis() +
|
|||
|
TimeUnit.MINUTES.toMillis(testConfig.testMinutes);
|
|||
|
|
|||
|
ResettableMasterListener masterWaiter =
|
|||
|
new ResettableMasterListener();
|
|||
|
repEnv.setStateChangeListener(masterWaiter);
|
|||
|
|
|||
|
final Map<Transaction,AbortProblem> failedTxns =
|
|||
|
new ConcurrentHashMap<Transaction, AbortProblem>();
|
|||
|
while (true) {
|
|||
|
try {
|
|||
|
/*
|
|||
|
* Wait until the node becomes a master, so it will do
|
|||
|
* work.
|
|||
|
*/
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.CONFIG,
|
|||
|
"Waiting for mastership");
|
|||
|
|
|||
|
/*
|
|||
|
* Come out of the wait because either
|
|||
|
* a. the test timed out
|
|||
|
* b. this node is a master. It did some work, and
|
|||
|
* triggers a master-replica transition
|
|||
|
* c. this node is a replica. The application thread
|
|||
|
* has nothing to do, so it will loop around and wait
|
|||
|
* for a new state change.
|
|||
|
*/
|
|||
|
long timeLeftInTest = endTime - System.currentTimeMillis();
|
|||
|
boolean gotMaster =
|
|||
|
masterWaiter.awaitMastership(timeLeftInTest,
|
|||
|
TimeUnit.MILLISECONDS);
|
|||
|
masterWaiter.reset();
|
|||
|
|
|||
|
if (gotMaster) {
|
|||
|
|
|||
|
/*
|
|||
|
* Check the failed txns from the last round. All
|
|||
|
* transactions that did not successfully commit should
|
|||
|
* be aborted.
|
|||
|
*/
|
|||
|
System.err.println(TEST + " Round " +
|
|||
|
totalMasterStints.get() +
|
|||
|
nodeName + " became master");
|
|||
|
cleanupFailedTxns(failedTxns);
|
|||
|
startInsertsAndTransfer(failedTxns);
|
|||
|
} else {
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.INFO,
|
|||
|
"test is out of time");
|
|||
|
}
|
|||
|
} finally {
|
|||
|
if (System.currentTimeMillis() > endTime) {
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.INFO,
|
|||
|
"test is out of time");
|
|||
|
break;
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
cleanupFailedTxns(failedTxns);
|
|||
|
|
|||
|
} catch (Throwable e) {
|
|||
|
markTestAsFailed("ended with " + e, e);
|
|||
|
} finally {
|
|||
|
try {
|
|||
|
/*
|
|||
|
* Make sure all nodes/inserters are quiescent before we
|
|||
|
* shutdown, so that we don't get InsufficientReplicaExceptions
|
|||
|
*/
|
|||
|
shutdownBarrier.await();
|
|||
|
} catch (InterruptedException e) {
|
|||
|
markTestAsFailed("shutdown barrier interrupted", e);
|
|||
|
} catch (BrokenBarrierException e) {
|
|||
|
markTestAsFailed("shutdown barrier broken", e);
|
|||
|
} finally {
|
|||
|
waitForInsertEnd.countDown();
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* Check that this node contains all (and only) the committed data. Do
|
|||
|
* this in a second pass where we re-open the environment, so that
|
|||
|
* there's no issue with leftover master transfers.
|
|||
|
*/
|
|||
|
void verify() {
|
|||
|
System.out.println(TEST + nodeName + " starting verify");
|
|||
|
|
|||
|
/*
|
|||
|
* Sync up the persistent storage of the group by doing one write with
|
|||
|
* SYNC/commitAll
|
|||
|
*/
|
|||
|
if (repEnv.getState().equals(State.MASTER)) {
|
|||
|
|
|||
|
System.err.println("Syncing up group before verify");
|
|||
|
Durability all = new Durability(SyncPolicy.SYNC,
|
|||
|
SyncPolicy.SYNC,
|
|||
|
ReplicaAckPolicy.ALL);
|
|||
|
TransactionConfig txnConfig = new TransactionConfig();
|
|||
|
txnConfig.setDurability(all);
|
|||
|
DatabaseConfig dbConfig = new DatabaseConfig();
|
|||
|
dbConfig.setTransactional(true);
|
|||
|
dbConfig.setReadOnly(false);
|
|||
|
dbConfig.setAllowCreate(true);
|
|||
|
Transaction txn = repEnv.beginTransaction(null, txnConfig);
|
|||
|
System.err.println("Syncing up group before verify using txn " +
|
|||
|
txn.getId());
|
|||
|
Database newDb = repEnv.openDatabase(txn, "VerifyMarker", dbConfig);
|
|||
|
newDb.close();
|
|||
|
txn.commit();
|
|||
|
}
|
|||
|
|
|||
|
Set<TestRecord> foundKeys = null;
|
|||
|
try {
|
|||
|
/* Read all the records in this node */
|
|||
|
foundKeys = dataGenerator.readTestRecords(nodeName, repEnv, db);
|
|||
|
} finally {
|
|||
|
|
|||
|
/*
|
|||
|
* If you are node 1, reconcile the problematic commits using your
|
|||
|
* database. Then all the other nodes can check against that
|
|||
|
* record.
|
|||
|
*/
|
|||
|
if (nodeName.equals("node1")) {
|
|||
|
dataGenerator.reconcileProblematicCommits(nodeName, repEnv, db);
|
|||
|
}
|
|||
|
|
|||
|
if (db != null) {
|
|||
|
db.close();
|
|||
|
db = null;
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* Don't close the environment before everyone is done reading
|
|||
|
*/
|
|||
|
try {
|
|||
|
verifyBarrier.await();
|
|||
|
} catch (InterruptedException e) {
|
|||
|
markTestAsFailed("verify barrier interrupted", e);
|
|||
|
} catch (BrokenBarrierException e) {
|
|||
|
markTestAsFailed("verify barrier broken", e);
|
|||
|
}
|
|||
|
|
|||
|
if (repEnv != null) {
|
|||
|
repEnv.close();
|
|||
|
repEnv = null;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* Use the committedRecords map now that it's been updated appropriately
|
|||
|
* with the problematic commits.
|
|||
|
*/
|
|||
|
Set<TestRecord> expectedKeys =
|
|||
|
new HashSet<TestRecord>(dataGenerator.getCommittedRecords());
|
|||
|
if (expectedKeys.size() != foundKeys.size()) {
|
|||
|
testSuccess.set(false);
|
|||
|
System.err.println(TEST + TEST_FAIL + " " + nodeName + ": " +
|
|||
|
" expected " + expectedKeys.size() +
|
|||
|
" records but found " + foundKeys.size());
|
|||
|
}
|
|||
|
|
|||
|
expectedKeys.removeAll(foundKeys);
|
|||
|
if (expectedKeys.size() != 0) {
|
|||
|
testSuccess.set(false);
|
|||
|
System.err.println(TEST + TEST_FAIL + " " + nodeName + ": " +
|
|||
|
" missing records: " +
|
|||
|
expectedKeys);
|
|||
|
}
|
|||
|
|
|||
|
foundKeys.removeAll(dataGenerator.getCommittedRecords());
|
|||
|
if (foundKeys.size() != 0) {
|
|||
|
testSuccess.set(false);
|
|||
|
System.err.println(TEST + TEST_FAIL + " " + nodeName + ": " +
|
|||
|
" found unexpected records: " +
|
|||
|
foundKeys);
|
|||
|
}
|
|||
|
waitForVerify.countDown();
|
|||
|
System.out.println(TEST + nodeName + " ending verify");
|
|||
|
}
|
|||
|
|
|||
|
private void cleanupFailedTxns(Map<Transaction, AbortProblem> failedTxns) {
|
|||
|
for (Map.Entry<Transaction, AbortProblem> entry :
|
|||
|
failedTxns.entrySet()) {
|
|||
|
Transaction t = entry.getKey();
|
|||
|
if (t.getState().equals(Transaction.State.ABORTED)) {
|
|||
|
continue;
|
|||
|
}
|
|||
|
|
|||
|
if (Boolean.getBoolean("verbose")) {
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.INFO,
|
|||
|
"Cleaning up txn " + t.getId() + "/" +
|
|||
|
t.getState() + " after " +
|
|||
|
entry.getValue());
|
|||
|
}
|
|||
|
|
|||
|
try {
|
|||
|
t.abort();
|
|||
|
} catch (UnknownMasterException e) {
|
|||
|
/* Okay to get this exception. */
|
|||
|
System.err.println(TEST + nodeName +
|
|||
|
" OK: Got UnknownMasterEx when aborting txn"
|
|||
|
+ t.getId());
|
|||
|
} catch (ReplicaWriteException e) {
|
|||
|
/* Okay to get this exception. */
|
|||
|
System.err.println(TEST + nodeName +
|
|||
|
" OK: Got ReplicaWriteEx when aborting " +
|
|||
|
"txn " + t.getId());
|
|||
|
} catch (Throwable e) {
|
|||
|
markTestAsFailed("Problem when cleaning up " + t.getId() +
|
|||
|
"/" + t.getState() + " after " +
|
|||
|
entry.getValue(), e);
|
|||
|
}
|
|||
|
}
|
|||
|
failedTxns.clear();
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Set up environment handle.
|
|||
|
*/
|
|||
|
private void setupEnvironment(boolean openForReads) {
|
|||
|
|
|||
|
File envHome = new File(testConfig.baseEnvDir, nodeName);
|
|||
|
envHome.mkdir();
|
|||
|
|
|||
|
EnvironmentConfig envConfig = new EnvironmentConfig();
|
|||
|
envConfig.setTransactional(true);
|
|||
|
envConfig.setAllowCreate(true);
|
|||
|
if (testConfig.testType.equals(GEN_DB_OPS)) {
|
|||
|
/* replaying database operations can be expensive */
|
|||
|
envConfig.setTxnTimeout(1, TimeUnit.MINUTES);
|
|||
|
}
|
|||
|
|
|||
|
ReplicationConfig repConfig = new ReplicationConfig();
|
|||
|
repConfig.setNodeHostPort(testConfig.hostName +
|
|||
|
":" + testConfig.startPort + nodeId);
|
|||
|
repConfig.setHelperHosts(testConfig.hostName +
|
|||
|
":" + testConfig.startPort + 1);
|
|||
|
|
|||
|
repConfig.setGroupName(REP_GROUP_NAME);
|
|||
|
repConfig.setNodeName(nodeName);
|
|||
|
|
|||
|
System.out.println(TEST + nodeName + " opening environment");
|
|||
|
if (openForReads) {
|
|||
|
repConfig.setConfigParam
|
|||
|
(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "1 ms");
|
|||
|
}
|
|||
|
|
|||
|
repEnv = new ReplicatedEnvironment(envHome, repConfig, envConfig);
|
|||
|
logger = LoggerUtils.getLoggerFormatterNeeded(getClass());
|
|||
|
formatter = new ReplicationFormatter
|
|||
|
(new NameIdPair("Tester_" + nodeName, 0));
|
|||
|
|
|||
|
DatabaseConfig dbConfig = new DatabaseConfig();
|
|||
|
dbConfig.setTransactional(true);
|
|||
|
dbConfig.setReadOnly(false);
|
|||
|
dbConfig.setAllowCreate(true);
|
|||
|
db = repEnv.openDatabase(null, DB_NAME, dbConfig);
|
|||
|
}
|
|||
|
|
|||
|
private String insertPrefix(final int insertIndex) {
|
|||
|
return "[task " + insertIndex + "]";
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Insert records until we run into an exception.
|
|||
|
* @param failedTxns are returned to the application so it can attempt
|
|||
|
* to clean them up.
|
|||
|
*/
|
|||
|
private void insertRecords(final int inserterIndex,
|
|||
|
final CountDownLatch readyForMasterTransfer,
|
|||
|
Map<Transaction, AbortProblem> failedTxns) {
|
|||
|
|
|||
|
Transaction txn = null;
|
|||
|
int i = 0;
|
|||
|
|
|||
|
Exception endException = null;
|
|||
|
try {
|
|||
|
while (true) {
|
|||
|
i++;
|
|||
|
txn = repEnv.beginTransaction(null, null);
|
|||
|
Set<TestRecord> results = null;
|
|||
|
try {
|
|||
|
results = dataGenerator.doOneTransaction
|
|||
|
(this, repEnv, logger, formatter, txn, db);
|
|||
|
|
|||
|
/*
|
|||
|
* The master transfer is waiting for the inserts jobs
|
|||
|
* to get started. We want a few committed transactions and
|
|||
|
* hopefuly some inflight transactions.
|
|||
|
*/
|
|||
|
if (i == 10) {
|
|||
|
readyForMasterTransfer.countDown();
|
|||
|
}
|
|||
|
|
|||
|
txn.commit();
|
|||
|
dataGenerator.updateCommittedRecords(results);
|
|||
|
} catch (IllegalStateException e) {
|
|||
|
/*
|
|||
|
* It's expected that the transaction may be aborted or
|
|||
|
* closed, probably by the repNode when it transitioning to
|
|||
|
* replica state. In this case, it's ok to get an ISE,
|
|||
|
* because the txn has been abruptly shut down without
|
|||
|
* the application's knowledge.This is the same thing as
|
|||
|
* would happen if the user calls txn.commit() twice in a
|
|||
|
* row. However, if the txn is still open, something is
|
|||
|
* wrong.
|
|||
|
*/
|
|||
|
if (txn.getState().equals(Transaction.State.OPEN)) {
|
|||
|
throw e;
|
|||
|
}
|
|||
|
|
|||
|
System.err.println(TEST + nodeName +
|
|||
|
insertPrefix(inserterIndex) +
|
|||
|
" warning: txn " + txn.getId() +
|
|||
|
" in state " + txn.getState() +
|
|||
|
" with actions " + results +
|
|||
|
" got illegal state exception" + e);
|
|||
|
dataGenerator.addProblematicCommits(results);
|
|||
|
} catch (InsufficientAcksException e) {
|
|||
|
|
|||
|
/*
|
|||
|
* Make note of this. May happen, particularly with the
|
|||
|
* database ops, because they have more contention. Repeat
|
|||
|
* the transaction until we get a UnknownMaster or
|
|||
|
* ReplicaWriteException.
|
|||
|
*/
|
|||
|
System.err.println(TEST + nodeName +
|
|||
|
insertPrefix(inserterIndex) +
|
|||
|
" warning: txn " + txn.getId() +
|
|||
|
" in state " + txn.getState() +
|
|||
|
" with actions " + results +
|
|||
|
" got insufficient acks: " + e);
|
|||
|
|
|||
|
dataGenerator.addProblematicCommits(results);
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
/* Print this only -if -Dverbose is on */
|
|||
|
if (Boolean.getBoolean("verbose")) {
|
|||
|
if ((i % 10) == 0) {
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.CONFIG,
|
|||
|
insertPrefix(inserterIndex) +
|
|||
|
" running txn " + i);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
} catch (ReplicaWriteException e) {
|
|||
|
/* issue a briefer message */
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.CONFIG,
|
|||
|
insertPrefix(inserterIndex) + " " + i +
|
|||
|
" inserts ending due to master->rep " +
|
|||
|
"transition");
|
|||
|
endException = e;
|
|||
|
|
|||
|
} catch (UnknownMasterException e) {
|
|||
|
/* issue a briefer message */
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.CONFIG,
|
|||
|
insertPrefix(inserterIndex) + " " + i +
|
|||
|
"th insert ending due to unknown master");
|
|||
|
|
|||
|
endException = e;
|
|||
|
} catch (Throwable e) {
|
|||
|
markTestAsFailed(insertPrefix(inserterIndex) +
|
|||
|
" inserts saw unexpected exception", e);
|
|||
|
} finally {
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.CONFIG,
|
|||
|
insertPrefix(inserterIndex) +
|
|||
|
" Executed " + i + " txns");
|
|||
|
abortTxn(txn, inserterIndex, endException, failedTxns);
|
|||
|
dataGenerator.lastMasterInsertersDone.countDown();
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Do the abort for a transaction that has incurred an exception during
|
|||
|
* an insert.
|
|||
|
*/
|
|||
|
private void abortTxn(Transaction txn,
|
|||
|
int inserterIndex,
|
|||
|
Exception reasonForAbort,
|
|||
|
Map<Transaction, AbortProblem> failedTxns) {
|
|||
|
try {
|
|||
|
if (txn != null) {
|
|||
|
txn.abort();
|
|||
|
}
|
|||
|
} catch (Exception e) {
|
|||
|
if (txn != null) {
|
|||
|
failedTxns.put(txn, new AbortProblem(reasonForAbort, e));
|
|||
|
displayTxnFailure(inserterIndex, failedTxns, txn);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
private String displayTxnFailure(int inserterIndex,
|
|||
|
Map<Transaction, AbortProblem> failedTxns,
|
|||
|
Transaction txn) {
|
|||
|
AbortProblem problem = failedTxns.get(txn);
|
|||
|
return (insertPrefix(inserterIndex) +
|
|||
|
"txn " + txn.getId() + "/" + txn.getState() +
|
|||
|
" aborted because of " +
|
|||
|
problem.getOriginalProblem() +
|
|||
|
" and had problem aborting " +
|
|||
|
problem.getAbortProblem());
|
|||
|
}
|
|||
|
|
|||
|
private void reportStints(int minStints) {
|
|||
|
int numStints = numMasterStints.get();
|
|||
|
System.out.println(TEST + nodeName + " was master " +
|
|||
|
numStints + " times");
|
|||
|
if (minStints > numStints) {
|
|||
|
testSuccess.set(false);
|
|||
|
System.err.println(TEST + TEST_FAIL + ":" + nodeName +
|
|||
|
" had " + numStints +
|
|||
|
" turns as master, but should have a " +
|
|||
|
"minimum of " + minStints);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* Request a master transfer
|
|||
|
*/
|
|||
|
private void requestTransfer(final CountDownLatch readyForMasterTransfer)
|
|||
|
throws InterruptedException {
|
|||
|
|
|||
|
/* Wait until data operations are in flight. */
|
|||
|
readyForMasterTransfer.await();
|
|||
|
|
|||
|
Set<String> replicas = new HashSet<String>();
|
|||
|
for (ReplicationNode rn : repEnv.getGroup().getNodes()) {
|
|||
|
if (rn.getName().equals(nodeName)) {
|
|||
|
continue;
|
|||
|
}
|
|||
|
replicas.add(rn.getName());
|
|||
|
}
|
|||
|
|
|||
|
if (replicas.size() == 0) {
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.INFO,
|
|||
|
"requesting transfer");
|
|||
|
try {
|
|||
|
String newMasterName =
|
|||
|
repEnv.transferMaster(replicas, TRANSFER_TIME, TRANSFER_UNIT);
|
|||
|
System.err.println(TEST + nodeName + " finishing transfer to " +
|
|||
|
newMasterName);
|
|||
|
} catch (Throwable e) {
|
|||
|
markTestAsFailed("Transfer failed", e);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Begin the work of a master.
|
|||
|
* @throws InterruptedException
|
|||
|
*/
|
|||
|
private void startInsertsAndTransfer
|
|||
|
(final Map<Transaction, AbortProblem> failedTxns)
|
|||
|
throws InterruptedException {
|
|||
|
|
|||
|
ExecutorService execService = Executors.newCachedThreadPool();
|
|||
|
|
|||
|
/*
|
|||
|
* readyForMasterTransfer lets the thread that will start the master
|
|||
|
* transfer wait until data operations are in flight before initiating
|
|||
|
* the transfer, to better test concurrent txns.
|
|||
|
*/
|
|||
|
final CountDownLatch readyForMasterTransfer = new CountDownLatch(1);
|
|||
|
numMasterStints.incrementAndGet();
|
|||
|
totalMasterStints.incrementAndGet();
|
|||
|
dataGenerator.newMasterReset(nodeName, repEnv, db);
|
|||
|
|
|||
|
/*
|
|||
|
* Start multiple jobs to do inserts. These mimic the application
|
|||
|
* threads.
|
|||
|
*/
|
|||
|
for (int i = 0; i < testConfig.numAppThreads; i++) {
|
|||
|
final int index = taskNumber.incrementAndGet();
|
|||
|
|
|||
|
Callable<Void> doInserts = new Callable<Void>() {
|
|||
|
|
|||
|
@Override
|
|||
|
public Void call() throws Exception {
|
|||
|
insertRecords(index, readyForMasterTransfer, failedTxns);
|
|||
|
return null;
|
|||
|
}
|
|||
|
};
|
|||
|
execService.submit(doInserts);
|
|||
|
}
|
|||
|
|
|||
|
/* Start another job to do a master transfer */
|
|||
|
Callable<Void> doTransfer = new Callable<Void>() {
|
|||
|
|
|||
|
@Override
|
|||
|
public Void call() throws Exception {
|
|||
|
requestTransfer(readyForMasterTransfer);
|
|||
|
return null;
|
|||
|
}
|
|||
|
};
|
|||
|
execService.submit(doTransfer);
|
|||
|
|
|||
|
/* Stop taking any more tasks, and wait for them to finish */
|
|||
|
execService.shutdown();
|
|||
|
execService.awaitTermination(10, TimeUnit.MINUTES);
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.FINE,
|
|||
|
"succeeded waiting for inserts and transfers " +
|
|||
|
"to finish");
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* The test failed, exit.
|
|||
|
*/
|
|||
|
private void markTestAsFailed(String message, Throwable e) {
|
|||
|
testSuccess.set(false);
|
|||
|
System.err.println(TEST + TEST_FAIL + " " + nodeName +
|
|||
|
" " + message);
|
|||
|
e.printStackTrace();
|
|||
|
System.exit(1);
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* A state change listener that can be used to repeatedly wait for
|
|||
|
* master status. This is preferable to using a new state listener each
|
|||
|
* time we want to listen for a new transition to master status, because
|
|||
|
* ReplicatedEnvironment.setStateListener will fire off the state change
|
|||
|
* notification if the node is already master at the point when the
|
|||
|
* listener is set, which complicates the handling of the test loop.
|
|||
|
*/
|
|||
|
private class ResettableMasterListener implements StateChangeListener {
|
|||
|
|
|||
|
private CountDownLatch waitForMaster = new CountDownLatch(1);
|
|||
|
|
|||
|
@Override
|
|||
|
public void stateChange(StateChangeEvent stateChangeEvent) {
|
|||
|
|
|||
|
if (stateChangeEvent.getState().equals
|
|||
|
(ReplicatedEnvironment.State.MASTER)) {
|
|||
|
waitForMaster.countDown();
|
|||
|
}
|
|||
|
|
|||
|
if (stateChangeEvent.getState().isDetached()) {
|
|||
|
waitForMaster.countDown();
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
boolean awaitMastership(long time, TimeUnit timeUnit)
|
|||
|
throws InterruptedException {
|
|||
|
return waitForMaster.await(time, timeUnit);
|
|||
|
}
|
|||
|
|
|||
|
void reset() {
|
|||
|
waitForMaster = new CountDownLatch(1);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/* Struct to hold info about abort attempts. */
|
|||
|
private class AbortProblem {
|
|||
|
/* The exception that made it necessary to abort this txn */
|
|||
|
private final Exception originalProblem;
|
|||
|
|
|||
|
/* The exception experienced when attempting to abort this txn */
|
|||
|
private final Exception abortProblem;
|
|||
|
|
|||
|
AbortProblem(Exception original, Exception abortProblem) {
|
|||
|
this.originalProblem = original;
|
|||
|
this.abortProblem = abortProblem;
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
public String toString() {
|
|||
|
return "original exception=" + originalProblem +
|
|||
|
" abort exception=" + abortProblem;
|
|||
|
}
|
|||
|
|
|||
|
Exception getOriginalProblem() {
|
|||
|
return abortProblem;
|
|||
|
}
|
|||
|
|
|||
|
Exception getAbortProblem() {
|
|||
|
return abortProblem;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/* Parse test parameters, hold test configuration */
|
|||
|
private static class TestConfiguration {
|
|||
|
|
|||
|
/* Each JE env is created under this directory */
|
|||
|
File baseEnvDir;
|
|||
|
|
|||
|
/* How long the test runs. */
|
|||
|
int testMinutes = 10;
|
|||
|
|
|||
|
/* How many concurrent threads limit the application load */
|
|||
|
int numAppThreads = 20;
|
|||
|
|
|||
|
String hostName = "localhost";
|
|||
|
int startPort = 5000;
|
|||
|
int groupSize = 3;
|
|||
|
|
|||
|
/* number of records in each transaction */
|
|||
|
int recordsInTxn = 4;
|
|||
|
|
|||
|
String testType = GEN_KEY_DATA;
|
|||
|
|
|||
|
TestConfiguration(String[] argv) {
|
|||
|
parseParams(argv);
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Parse the command line parameters for a replication node and set up
|
|||
|
* any configuration parameters.
|
|||
|
*/
|
|||
|
void parseParams(String[] argv)
|
|||
|
throws IllegalArgumentException {
|
|||
|
|
|||
|
int argc = 0;
|
|||
|
int nArgs = argv.length;
|
|||
|
|
|||
|
if (nArgs == 0) {
|
|||
|
usage("-h, -hostName, -startPort and -groupSize " +
|
|||
|
"are required arguments.");
|
|||
|
}
|
|||
|
|
|||
|
while (argc < nArgs) {
|
|||
|
String thisArg = argv[argc++];
|
|||
|
|
|||
|
if (thisArg.equals("-h")) {
|
|||
|
if (argc < nArgs) {
|
|||
|
baseEnvDir = new File(argv[argc++]);
|
|||
|
} else {
|
|||
|
usage("-h requires an argument");
|
|||
|
}
|
|||
|
} else if (thisArg.equals("-hostName")) {
|
|||
|
/* The node hostname, port pair. */
|
|||
|
if (argc < nArgs) {
|
|||
|
hostName = argv[argc++];
|
|||
|
} else {
|
|||
|
usage("-hostName requires an argument");
|
|||
|
}
|
|||
|
} else if (thisArg.equals("-groupSize")) {
|
|||
|
if (argc < nArgs) {
|
|||
|
groupSize = Integer.parseInt(argv[argc++]);
|
|||
|
} else {
|
|||
|
usage("-groupSize requires an argument");
|
|||
|
}
|
|||
|
} else if (thisArg.equals("-startPort")) {
|
|||
|
if (argc < nArgs) {
|
|||
|
startPort = Integer.parseInt(argv[argc++]);
|
|||
|
} else {
|
|||
|
usage("-startPort requires an argument");
|
|||
|
}
|
|||
|
} else if (thisArg.equals("-testMinutes")) {
|
|||
|
if (argc < nArgs) {
|
|||
|
testMinutes = Integer.parseInt(argv[argc++]);
|
|||
|
} else {
|
|||
|
usage("-testMinutes requires an argument");
|
|||
|
}
|
|||
|
} else if (thisArg.equals("-numAppThreads")) {
|
|||
|
if (argc < nArgs) {
|
|||
|
numAppThreads = Integer.parseInt(argv[argc++]);
|
|||
|
} else {
|
|||
|
usage("-numAppThreads requires an argument");
|
|||
|
}
|
|||
|
} else if (thisArg.equals("-recordsInTxn")) {
|
|||
|
if (argc < nArgs) {
|
|||
|
recordsInTxn = Integer.parseInt(argv[argc++]);
|
|||
|
} else {
|
|||
|
usage("-recordsInTxn requires an argument");
|
|||
|
}
|
|||
|
} else if (thisArg.equals("-testType")) {
|
|||
|
if (argc < nArgs) {
|
|||
|
testType = argv[argc++];
|
|||
|
if ((!testType.equals(GEN_KEY_DATA)) &&
|
|||
|
(!testType.equals(GEN_DB_OPS))) {
|
|||
|
usage("-testType must be " + GEN_KEY_DATA +
|
|||
|
" or " + GEN_DB_OPS);
|
|||
|
}
|
|||
|
} else {
|
|||
|
usage("-testType must be " + GEN_KEY_DATA +
|
|||
|
" or " + GEN_DB_OPS);
|
|||
|
}
|
|||
|
} else {
|
|||
|
usage("Unknown argument; " + thisArg);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
if (baseEnvDir == null) {
|
|||
|
usage("-h is a required parameter");
|
|||
|
}
|
|||
|
|
|||
|
if (hostName == null) {
|
|||
|
usage("-hostName is a required parameter");
|
|||
|
}
|
|||
|
|
|||
|
if (startPort == 0) {
|
|||
|
usage("-startPort is a required parameter");
|
|||
|
}
|
|||
|
|
|||
|
if (groupSize < 2) {
|
|||
|
usage("-groupSize is a required parameter and must be >= 2");
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
private void usage(String message) {
|
|||
|
System.out.println();
|
|||
|
System.out.println(message);
|
|||
|
System.out.println();
|
|||
|
System.out.print("usage: " + getClass().getName());
|
|||
|
|
|||
|
System.out.println
|
|||
|
("-h <environment dir> " +
|
|||
|
"-hostName <hostName> " +
|
|||
|
"-startPort<port> " +
|
|||
|
"-groupSize <num> " +
|
|||
|
"-testMinutes <num>" +
|
|||
|
"-numAppThreads <num>" +
|
|||
|
"-recordsInTxn <num>" +
|
|||
|
"-testType <KeyData|DbOps>");
|
|||
|
|
|||
|
System.out.println
|
|||
|
("\t -h: the base directory that will house subdirectories " +
|
|||
|
"for each replicated environment\n" +
|
|||
|
"\t -hostName: hostname for the test machine\n" +
|
|||
|
"\t -startPort: starting port number for the replication "+
|
|||
|
"group. The test will consume <groupSize> ports starting " +
|
|||
|
"with that port number\n" +
|
|||
|
"\t -groupSize the number of nodes in the group\n" +
|
|||
|
"\t -testMinutes the number of minutes that the test should " +
|
|||
|
"\t -recordsInTxn number of operation in each txn\n" +
|
|||
|
"\t -numAppThreads number of concurrently inserting threads" +
|
|||
|
"\t -testType <KeyData|DbOps>");
|
|||
|
System.exit(-1);
|
|||
|
}
|
|||
|
|
|||
|
String showParams() {
|
|||
|
return "Test parameters are:\n" +
|
|||
|
"envDir = " + baseEnvDir +
|
|||
|
"\nhostName = " + hostName +
|
|||
|
"\nstartPort = " + startPort +
|
|||
|
"\ngroupSize = " + groupSize +
|
|||
|
"\ntestMinutes = " + testMinutes +
|
|||
|
"\nnumAppThreads = " + numAppThreads +
|
|||
|
"\nrecordsInTxn = " + recordsInTxn +
|
|||
|
"\ntestType = " + testType;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* A class used by any node that is currently master to generate some
|
|||
|
* application workload, and to save the set of valid, committed data
|
|||
|
* records that should be present in the store. Work is done by multiple
|
|||
|
* concurrent application threads.
|
|||
|
*/
|
|||
|
abstract private static class DataGenerator {
|
|||
|
protected final static String DBPREFIX = "testrecord";
|
|||
|
private final static String FIRST_NAME = "first";
|
|||
|
private final static String SECOND_NAME = "second";
|
|||
|
|
|||
|
AtomicInteger reusedKeys;
|
|||
|
|
|||
|
/*
|
|||
|
* Ensures that the inserters from one stint have finished updating
|
|||
|
* the expected results set before the next stint starts.
|
|||
|
*/
|
|||
|
public CountDownLatch lastMasterInsertersDone;
|
|||
|
|
|||
|
/*
|
|||
|
* The set of records that should be in environment, after all the
|
|||
|
* master transfers, at the end of the test. Use a map of keys,
|
|||
|
* because records are updated, and we want the latest version of
|
|||
|
* the record.
|
|||
|
*/
|
|||
|
protected final Map<Integer, TestRecord> committedRecords;
|
|||
|
|
|||
|
/*
|
|||
|
* These records received an insufficient ack exception, so it's
|
|||
|
* unknown whether the change was propagated. Using a Map instead
|
|||
|
* of a Set so as to use ConcurrentHashMap.
|
|||
|
*/
|
|||
|
protected final Map<Integer, TestRecord> problematicCommits;
|
|||
|
|
|||
|
DataGenerator() {
|
|||
|
committedRecords = new ConcurrentHashMap<Integer, TestRecord>();
|
|||
|
problematicCommits = new ConcurrentHashMap<Integer, TestRecord>();
|
|||
|
}
|
|||
|
|
|||
|
public void addProblematicCommits(Set<TestRecord> results) {
|
|||
|
for (TestRecord r : results) {
|
|||
|
problematicCommits.put(r.getKey(), r);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
abstract void deleteRecord(MasterTransferExercise tester,
|
|||
|
Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String secondVal);
|
|||
|
|
|||
|
abstract void updateRecord(Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String firstVal,
|
|||
|
String secondVal);
|
|||
|
|
|||
|
abstract void insertRecord(Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String firstVal);
|
|||
|
|
|||
|
/**
|
|||
|
* @param committed these records were committed, so update the
|
|||
|
* map used for test verification accordingly.
|
|||
|
*/
|
|||
|
public void updateCommittedRecords(final Set<TestRecord> committed) {
|
|||
|
|
|||
|
for (TestRecord r: committed) {
|
|||
|
TestRecord prev = null;
|
|||
|
if (r.getData() == null) {
|
|||
|
prev = committedRecords.remove(r.getKey());
|
|||
|
if (prev == null) {
|
|||
|
throw new IllegalStateException
|
|||
|
("Should be able to delete " + r +
|
|||
|
" from committedRecords map");
|
|||
|
}
|
|||
|
} else {
|
|||
|
prev = committedRecords.put(r.getKey(), r);
|
|||
|
}
|
|||
|
// System.err.println("committed: " + r + " prev=" + prev);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* Return the set of keys that should be in environment, after all the
|
|||
|
* master transfers, as the end of the test.
|
|||
|
*/
|
|||
|
public synchronized Collection<TestRecord> getCommittedRecords() {
|
|||
|
return committedRecords.values();
|
|||
|
}
|
|||
|
|
|||
|
/*
|
|||
|
* Each new master will do both updates to the same set of keys and
|
|||
|
* inserts of new records, because updates will exercise the
|
|||
|
* rollbacks better.
|
|||
|
*/
|
|||
|
public synchronized void newMasterReset(String nodeName,
|
|||
|
Environment repEnv,
|
|||
|
Database db)
|
|||
|
throws InterruptedException {
|
|||
|
|
|||
|
if (lastMasterInsertersDone != null) {
|
|||
|
lastMasterInsertersDone.await();
|
|||
|
}
|
|||
|
lastMasterInsertersDone =
|
|||
|
new CountDownLatch(testConfig.numAppThreads);
|
|||
|
reusedKeys = new AtomicInteger(0);
|
|||
|
reconcileProblematicCommits(nodeName, repEnv, db);
|
|||
|
}
|
|||
|
|
|||
|
public void reconcileProblematicCommits(String nodeName,
|
|||
|
Environment repEnv,
|
|||
|
Database db) {
|
|||
|
|
|||
|
/*
|
|||
|
* These records got an insufficient ack exception from the last
|
|||
|
* master, and therefore weren't added to the verify set See if the
|
|||
|
* action really happened, and if verify set needs to be updated.
|
|||
|
*/
|
|||
|
if (problematicCommits.size() > 0) {
|
|||
|
System.err.println(TEST + " " + nodeName +
|
|||
|
" sees " + problematicCommits.size() +
|
|||
|
" problematic commits");
|
|||
|
|
|||
|
if (Boolean.getBoolean("verbose")) {
|
|||
|
System.err.println(TEST + " " + nodeName + " " +
|
|||
|
problematicCommits.values());
|
|||
|
}
|
|||
|
|
|||
|
setupVerifyCheck(repEnv);
|
|||
|
for (TestRecord r: problematicCommits.values()) {
|
|||
|
String val = findRecord(r.getKey(), repEnv, db);
|
|||
|
if (val == null) {
|
|||
|
/* The delete really happened, update the verify set */
|
|||
|
if (r.getData() == null) {
|
|||
|
committedRecords.remove(r.getKey());
|
|||
|
if (Boolean.getBoolean("verbose")) {
|
|||
|
System.err.println
|
|||
|
("correct records, delete got " +
|
|||
|
" insufficientAcks but was" +
|
|||
|
" propagated:" + r);
|
|||
|
}
|
|||
|
}
|
|||
|
} else if (val.equals(r.getData())) {
|
|||
|
/* The insert or update really happened*/
|
|||
|
committedRecords.put(r.getKey(), r);
|
|||
|
if (Boolean.getBoolean("verbose")) {
|
|||
|
System.err.println
|
|||
|
("correct the records, Insert/update got " +
|
|||
|
" insufficient acks, but was propagated:" +
|
|||
|
r);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
problematicCommits.clear();
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
abstract void setupVerifyCheck(Environment repEnv);
|
|||
|
|
|||
|
abstract String findRecord(int key, Environment repEnv, Database db);
|
|||
|
|
|||
|
/*
|
|||
|
* Do one transaction's worth of work, and return the test records
|
|||
|
* contained in that transaction.
|
|||
|
*/
|
|||
|
public Set<TestRecord> doOneTransaction(MasterTransferExercise tester,
|
|||
|
ReplicatedEnvironment repEnv,
|
|||
|
Logger logger,
|
|||
|
Formatter formatter,
|
|||
|
Transaction txn,
|
|||
|
Database db) {
|
|||
|
Set<TestRecord> actions = new HashSet<TestRecord>();
|
|||
|
|
|||
|
for (int i = 0; i < testConfig.recordsInTxn; i++) {
|
|||
|
int key = reusedKeys.incrementAndGet();
|
|||
|
TestRecord existing = committedRecords.get(key);
|
|||
|
|
|||
|
if (existing == null) {
|
|||
|
String firstVal = makeFirstVal(key, txn.getId());
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.FINE,
|
|||
|
"Txn " + txn.getId() +
|
|||
|
" inserting " + firstVal);
|
|||
|
insertRecord(txn, key, repEnv, db, firstVal);
|
|||
|
actions.add(new TestRecord(key, firstVal));
|
|||
|
} else {
|
|||
|
String existingVal = existing.getData();
|
|||
|
if (isFirstVal(existingVal)) {
|
|||
|
String secondVal = makeSecondVal(key, txn.getId());
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.FINE,
|
|||
|
"Txn " + txn.getId() +
|
|||
|
" Updating " + existingVal +
|
|||
|
" to " + secondVal);
|
|||
|
updateRecord(txn, key, repEnv, db, existingVal,
|
|||
|
secondVal);
|
|||
|
actions.add(new TestRecord(key, secondVal));
|
|||
|
} else if (isSecondVal(existingVal)) {
|
|||
|
LoggerUtils.logMsg(logger, formatter, Level.FINE,
|
|||
|
"Txn " + txn.getId() +
|
|||
|
" Deleting " + existingVal +
|
|||
|
" using key " + key);
|
|||
|
deleteRecord(tester, txn, key, repEnv, db, existingVal);
|
|||
|
actions.add(new TestRecord(key, null));
|
|||
|
} else {
|
|||
|
throw new IllegalStateException
|
|||
|
("Found unexpected record in committed test set " +
|
|||
|
"when doing dbop testing: " + existing);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
return actions;
|
|||
|
}
|
|||
|
|
|||
|
private String makeFirstVal(int val, long txnId) {
|
|||
|
return DBPREFIX + "_" + FIRST_NAME + "_" + val + "_t" + txnId;
|
|||
|
}
|
|||
|
|
|||
|
private String makeSecondVal(int val, long txnId) {
|
|||
|
return DBPREFIX + "_" + SECOND_NAME + "_" + val + "_t" + txnId;
|
|||
|
}
|
|||
|
|
|||
|
protected int getKeyFromName(String name) {
|
|||
|
String[] split = name.split("_");
|
|||
|
if (split.length == 4) {
|
|||
|
return Integer.parseInt(split[2]);
|
|||
|
}
|
|||
|
|
|||
|
throw new IllegalStateException
|
|||
|
("Unexpected database name in test: " + name);
|
|||
|
}
|
|||
|
|
|||
|
private boolean isFirstVal(String name) {
|
|||
|
String[] split = name.split("_");
|
|||
|
if (split.length == 4) {
|
|||
|
return split[1].equals(FIRST_NAME);
|
|||
|
}
|
|||
|
return false;
|
|||
|
}
|
|||
|
|
|||
|
private boolean isSecondVal(String name) {
|
|||
|
String[] split = name.split("_");
|
|||
|
if (split.length == 4) {
|
|||
|
return split[1].equals(SECOND_NAME);
|
|||
|
}
|
|||
|
return false;
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* @return the set of test records that are currently in the
|
|||
|
* environment.
|
|||
|
*/
|
|||
|
abstract public Set<TestRecord>
|
|||
|
readTestRecords(String nodeName,
|
|||
|
ReplicatedEnvironment repEnv,
|
|||
|
Database db);
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* The workload consists of creating, renaming and deleting databases.
|
|||
|
*/
|
|||
|
private static class DbOpsGenerator extends DataGenerator {
|
|||
|
DatabaseConfig dbConfig;
|
|||
|
private List<String> existingDbs;
|
|||
|
|
|||
|
DbOpsGenerator() {
|
|||
|
dbConfig = new DatabaseConfig();
|
|||
|
dbConfig.setTransactional(true);
|
|||
|
dbConfig.setReadOnly(false);
|
|||
|
dbConfig.setAllowCreate(true);
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
void insertRecord(Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String firstVal) {
|
|||
|
Database newDb = repEnv.openDatabase(txn, firstVal, dbConfig);
|
|||
|
newDb.close();
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
void updateRecord(Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String firstVal,
|
|||
|
String secondVal) {
|
|||
|
repEnv.renameDatabase(txn, firstVal, secondVal);
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
void deleteRecord(MasterTransferExercise tester,
|
|||
|
Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String secondVal) {
|
|||
|
repEnv.removeDatabase(txn, secondVal);
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
public Set<TestRecord> readTestRecords(String nodeName,
|
|||
|
ReplicatedEnvironment repEnv,
|
|||
|
Database db) {
|
|||
|
Set<TestRecord> foundDBs = new HashSet<TestRecord>();
|
|||
|
List<String> dbNames = repEnv.getDatabaseNames();
|
|||
|
for (String name : dbNames) {
|
|||
|
if (isTestDb(name)) {
|
|||
|
foundDBs.add(new TestRecord(getKeyFromName(name),
|
|||
|
name));
|
|||
|
}
|
|||
|
}
|
|||
|
return foundDBs;
|
|||
|
}
|
|||
|
|
|||
|
private boolean isTestDb(String name) {
|
|||
|
return name.startsWith(DBPREFIX);
|
|||
|
}
|
|||
|
|
|||
|
/* Return the database name blah blah */
|
|||
|
@Override
|
|||
|
String findRecord(int key, Environment repEnv, Database db) {
|
|||
|
for (String name : existingDbs) {
|
|||
|
if (!isTestDb(name)) {
|
|||
|
continue;
|
|||
|
}
|
|||
|
|
|||
|
if (getKeyFromName(name) == key) {
|
|||
|
return name;
|
|||
|
}
|
|||
|
}
|
|||
|
return null;
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
void setupVerifyCheck(Environment repEnv) {
|
|||
|
existingDbs = repEnv.getDatabaseNames();
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* The workload consists of inserting and updating records in a single db.
|
|||
|
* On each round of inserts, the node should both:
|
|||
|
* -insert into the same set of keys, so as to cause a lot of updates
|
|||
|
* -insert into a random set of new keys
|
|||
|
* -coordinate multiple threads do a lot of inserts concurrently without
|
|||
|
* blocking on the same key values, so there's more application activity
|
|||
|
* while the master transfer is going on
|
|||
|
|
|||
|
*/
|
|||
|
private static class DataAndKeyGenerator extends DataGenerator {
|
|||
|
|
|||
|
@Override
|
|||
|
void insertRecord(Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String firstVal) {
|
|||
|
DatabaseEntry keyEntry = new DatabaseEntry();
|
|||
|
IntegerBinding.intToEntry(key, keyEntry);
|
|||
|
DatabaseEntry dataEntry =
|
|||
|
new DatabaseEntry(firstVal.getBytes());
|
|||
|
db.put(txn, keyEntry, dataEntry);
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
void updateRecord(Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String firstVal,
|
|||
|
String secondVal) {
|
|||
|
insertRecord(txn, key, repEnv, db, secondVal);
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
void deleteRecord(MasterTransferExercise tester,
|
|||
|
Transaction txn,
|
|||
|
int key,
|
|||
|
Environment repEnv,
|
|||
|
Database db,
|
|||
|
String secondVal) {
|
|||
|
DatabaseEntry keyEntry = new DatabaseEntry();
|
|||
|
IntegerBinding.intToEntry(key, keyEntry);
|
|||
|
OperationStatus status = db.delete(txn, keyEntry);
|
|||
|
if (!(status.equals(OperationStatus.SUCCESS))) {
|
|||
|
tester.markTestAsFailed("Delete of " + key + " got status " +
|
|||
|
status, null);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* Assumes environment is open, and turns the data in the
|
|||
|
* environment into a set of test records for verification.
|
|||
|
*/
|
|||
|
@Override
|
|||
|
public Set<TestRecord> readTestRecords(String nodeName,
|
|||
|
ReplicatedEnvironment repEnv,
|
|||
|
Database db) {
|
|||
|
Set<TestRecord> existing = new HashSet<TestRecord>();
|
|||
|
Transaction txn = repEnv.beginTransaction(null, null);
|
|||
|
Cursor cursor = db.openCursor(txn, CursorConfig.READ_COMMITTED);
|
|||
|
int i = 0;
|
|||
|
try {
|
|||
|
DatabaseEntry key = new DatabaseEntry();
|
|||
|
DatabaseEntry value = new DatabaseEntry();
|
|||
|
while (cursor.getNext(key, value, null) ==
|
|||
|
OperationStatus.SUCCESS) {
|
|||
|
int keyVal = IntegerBinding.entryToInt(key);
|
|||
|
existing.add(new TestRecord(keyVal,
|
|||
|
new String(value.getData())));
|
|||
|
if ((i++ % 1000) == 0) {
|
|||
|
System.out.println(TEST + nodeName +
|
|||
|
" Scan at record " + i);
|
|||
|
}
|
|||
|
}
|
|||
|
} finally {
|
|||
|
cursor.close();
|
|||
|
txn.commit();
|
|||
|
}
|
|||
|
return existing;
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
String findRecord(int key, Environment repEnv, Database db) {
|
|||
|
DatabaseEntry keyEntry = new DatabaseEntry();
|
|||
|
IntegerBinding.intToEntry(key, keyEntry);
|
|||
|
DatabaseEntry dataEntry = new DatabaseEntry();
|
|||
|
OperationStatus status = db.get(null, keyEntry, dataEntry, null);
|
|||
|
if (status.equals(OperationStatus.SUCCESS)) {
|
|||
|
return new String(dataEntry.getData());
|
|||
|
}
|
|||
|
return null;
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
void setupVerifyCheck(Environment repEnv) {
|
|||
|
/* nothing to do */
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/* Struct to hold the key and data */
|
|||
|
private static class TestRecord {
|
|||
|
private final Integer key;
|
|||
|
private final String data;
|
|||
|
|
|||
|
TestRecord(int keyVal, String dataVal) {
|
|||
|
this.key = keyVal;
|
|||
|
this.data = dataVal;
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
public String toString() {
|
|||
|
return key + " / " + data;
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
public int hashCode() {
|
|||
|
final int prime = 31;
|
|||
|
int result = 1;
|
|||
|
result = prime * result + ((data == null) ? 0 :
|
|||
|
data.hashCode());
|
|||
|
result = prime * result + ((key == null) ? 0 :
|
|||
|
key.hashCode());
|
|||
|
return result;
|
|||
|
}
|
|||
|
|
|||
|
@Override
|
|||
|
public boolean equals(Object obj) {
|
|||
|
if (this == obj)
|
|||
|
return true;
|
|||
|
if (obj == null)
|
|||
|
return false;
|
|||
|
if (getClass() != obj.getClass())
|
|||
|
return false;
|
|||
|
TestRecord other = (TestRecord) obj;
|
|||
|
if (data == null) {
|
|||
|
if (other.getData() != null)
|
|||
|
return false;
|
|||
|
} else if (!data.equals(other.getData()))
|
|||
|
return false;
|
|||
|
if (key == null) {
|
|||
|
if (other.getKey() != null)
|
|||
|
return false;
|
|||
|
} else if (!key.equals(other.getKey()))
|
|||
|
return false;
|
|||
|
return true;
|
|||
|
}
|
|||
|
|
|||
|
public Integer getKey() {
|
|||
|
return key;
|
|||
|
}
|
|||
|
|
|||
|
public String getData() {
|
|||
|
return data;
|
|||
|
}
|
|||
|
}
|
|||
|
}
|