/*- * Copyright (C) 2002, 2017, Oracle and/or its affiliates. All rights reserved. * * This file was distributed by Oracle as part of a version of Oracle Berkeley * DB Java Edition made available at: * * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html * * Please see the LICENSE file included in the top-level directory of the * appropriate version of Oracle Berkeley DB Java Edition for a copy of the * license and additional information. */ import java.io.File; import java.util.ArrayList; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.Transaction; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.utilint.RepTestUtils; import com.sleepycat.je.rep.utilint.RepTestUtils.RepEnvInfo; import com.sleepycat.persist.EntityCursor; import com.sleepycat.persist.EntityIndex; import com.sleepycat.persist.EntityStore; import com.sleepycat.persist.PrimaryIndex; /** * Applications does reading operations on replica may cause reader transaction * deadlocks, since JE ReplayTxn would steal locks to make sure it can finish * its own work. Simulate such an application and measure how many retries the * reader transactions would do and check whether the log cleaning works as * expected in HA. * * This test uses DPL and is divided into two phases: ramp up stage and steady * stage. It's not a fail-over test, all replicas are alive during the test. * * Configurations * ========================================================================== * envRoot: environment home root for the whole replication group, it's * the same as we used in HA unit tests. * repNodeNum: size of the replication group, default is 2. * dbSize: number of records in the database, default is 300. * roundPerSync: master would traverse the database for this number before it * does a sync for the whole group. * totalRounds: total number of traversing the database in the whole test, * default is 20. * txnOps: number of operations the test wants to do protected by a * transaction, default is 10. * nPriThreads: number of threads reading primary index, default is 2. * nSecThreads: number of threads reading secondary index, default is 2. * * Work Flow * ========================================================================== * During the ramp up stage, master will do "dbSize" insertions and sync whole * replication group. * * During the steady stage, the test would start "nPriThreads + nSecThreads" * reading on replica, all of them read backwards. The reading threads can get * records from primary index and secondary index, and check the data * correctness. * * At the same time, the master would do "totalRounds * dbSize" updating * operations. It would first delete the smallest "txnOps" records in the * database. Next, it will do updates, and the transaction would abort * randomly. At last, insert "txnOps" new records at the end of the database. * After it traverses "roundPerSync" time on the database, the test would sync * the whole group and check node equality. * * After finishing the "totalRounds" database traverse, both the reading * operations on the replica and the update operations on master would stop. * Then the test would close all the replicas and check whether log cleaning * does work in this test. * * How To Run This Test * ========================================================================== * All the test configurations have a default value, except the envRoot, so * you need to assign a directory to "envRoot" to start the test, like: * java ReplicaReading -envRoot data * * If you want to specify some configurations, please see the usage. */ public class ReplicaReading { /* Master of the replication group. */ private ReplicatedEnvironment master; private RepEnvInfo[] repEnvInfo; private boolean runnable = true; /* The two variables saves the maximum and minimum reading retry number. */ private int minNum = 100; private int maxNum = 0; /* The smallest and largest key in the database. */ private volatile int beginKey; private volatile int endKey; /* ----------------Configurable params-----------------*/ /* Environment home root for whole replication group. */ private File envRoot; /* Replication group size. */ private int nNodes = 2; /* Database size. */ private int dbSize = 300; /* Steady state would finish after traversing these rounds of database. */ private int totalRounds = 4000; /* Do a sync after traversing the database for these rounds. */ private int roundPerSync= 20; /* Transaction commits after doing this number of operations. */ private int txnOps = 10; /* Thread number of reading PrimaryIndex on replica. */ private int nPriThreads = 2; /* Thread number of reading SecondaryIndex on replica. */ private int nSecThreads = 2; private int subDir = 3; /* True if replica reading thread doing reverse reads. */ private boolean isReverseRead = true; /* Size of each JE log file. */ private long logFileSize = 5000000; /* More than enough disk space to hold the data set and allow cleaning. */ private long maxDisk = 500 * 1000000; /* Checkpointer wakes up when JE writes checkpointBytes bytes. */ private long checkpointBytes = 10000000; /* The latch used to start all the threads at the same time. */ private CountDownLatch startSignal; /* The latch used to stop the threads. */ private CountDownLatch endSignal; private final Random random = new Random(); /* Database and PrimaryIndex used in this test.*/ private EntityStore dbStore; private PrimaryIndex primaryIndex; /* * A map saves abort transactions, represented by txnRounds, maps from * txnRounds to Key, so that we can remove those deleted transactions. */ private final ConcurrentHashMap abortMap = new ConcurrentHashMap<>(); private void doRampup() throws Exception { repEnvInfo = Utils.setupGroup( envRoot, nNodes, logFileSize, maxDisk, checkpointBytes, subDir, 0, 0); master = Utils.getMaster(repEnvInfo); RepTestData.insertData (Utils.openStore(master, Utils.DB_NAME), dbSize, true); beginKey = 1; endKey = dbSize; Utils.doSyncAndCheck(repEnvInfo); } private void doSteadyState() throws Exception { startSignal = new CountDownLatch(1); endSignal = new CountDownLatch(nPriThreads + nSecThreads); /* Start the threads. */ startThreads(nPriThreads, false); startThreads(nSecThreads, true); /* Count down the latch, so that all threads start to work. */ startSignal.countDown(); /* Doing the updates. */ doMasterUpdates(); /* Print out the minimum and maximum retry number of this test. */ if (Utils.VERBOSE) { System.out.println("The minimum retry number is: " + minNum); System.out.println("The maximum retry number is: " + maxNum); } /* Do the sync until the reading threads finish their work. */ endSignal.await(); RepTestUtils.shutdownRepEnvs(repEnvInfo); } /* Start the reading threads. */ private void startThreads(int threadNum, boolean secondary) { for (int i = 0; i < threadNum; i++) { Thread thread = new ReplicaReadingThread(repEnvInfo[1].getEnv(), secondary); thread.start(); } } /* Do the updates on master. */ private void doMasterUpdates() throws Exception { int txnRounds = 0; int tempRound = roundPerSync; UpdateRange range = new UpdateRange(); openStore(); Transaction txn = null; while (runnable) { if (Utils.VERBOSE) { System.out.println ("Updating rounds left on Master: " + totalRounds); } /* Traverse the database and do updates. */ for (int i = range.getStart(); i <= range.getEnd(); i++) { /* Create a new transaction for every txnOps operations. */ if ((i - 1) % txnOps == 0) { txn = master.beginTransaction(null, null); txnRounds++; } /* Do updates. */ if (range.doDelete(i)) { doDeleteWork(primaryIndex, txn, i); } else { /* Updates and inserts are actually putting a record. */ doPutWork(primaryIndex, txn, range, i, txnRounds); } } /* Shift the range so that it can traverse the database again. */ range.shift(); /* Exit the loop if the updates have been finished. */ if (--totalRounds == 0) { runnable = false; } /* If a round of traverses finishes, synch the group. */ if (--tempRound == 0 || !runnable) { dbStore.close(); Utils.doSyncAndCheck(repEnvInfo); if (runnable) { openStore(); } tempRound = roundPerSync; } } } /* Open the EntityStore. */ private void openStore() throws Exception { dbStore = Utils.openStore(master, Utils.DB_NAME); primaryIndex = dbStore.getPrimaryIndex(Integer.class, RepTestData.class); } /* Delete records on the database. */ private void doDeleteWork(PrimaryIndex pIndex, Transaction txn, int key) throws Exception { pIndex.delete(txn, key); if (key % txnOps == 0) { txn.commit(); /* Increase the beginKey since the smallest key has changed. */ beginKey += txnOps; /* Delete those entries whose values are smaller than beginKey. */ if (abortMap.size() != 0) { ArrayList keys = new ArrayList<>(); for (Map.Entry entry : abortMap.entrySet()) { if (entry.getValue() <= beginKey) { keys.add(entry.getKey()); } } for (Integer abortKey : keys) { abortMap.remove(abortKey); } } } } /* Put records into database if the operations are updates or inserts. */ private void doPutWork(PrimaryIndex pIndex, Transaction txn, UpdateRange range, int key, int txnRounds) throws Exception { /* * Put a record into the database. If the key exists, doing updates. * If the key doesn't exist, doing inserts. */ RepTestData data = new RepTestData(); data.setKey(key); data.setData(key); data.setName("test" + txnRounds); pIndex.put(txn, data); if (key % txnOps == 0) { if (range.doUpdate(key)) { /* Random abort if it's an update operation. */ if (random.nextBoolean()) { txn.abort(); /* Put this abort data to the abort map. */ abortMap.put(txnRounds, key); } else { txn.commit(); } } else { /* Increase the endKey if the insertion finishes. */ txn.commit(); endKey += txnOps; } } } protected void parseArgs(String args[]) throws Exception { for (int i = 0; i < args.length; i++) { boolean moreArgs = i < args.length - 1; if (args[i].equals("-h") && moreArgs) { envRoot = new File(args[++i]); } else if (args[i].equals("-repNodeNum") && moreArgs) { nNodes = Integer.parseInt(args[++i]); } else if (args[i].equals("-dbSize") && moreArgs) { dbSize = Integer.parseInt(args[++i]); } else if (args[i].equals("-totalRounds") && moreArgs) { totalRounds = Integer.parseInt(args[++i]); } else if (args[i].equals("-roundPerSync") && moreArgs) { roundPerSync = Integer.parseInt(args[++i]); } else if (args[i].equals("-txnOps") && moreArgs) { txnOps = Integer.parseInt(args[++i]); } else if (args[i].equals("-logFileSize") && moreArgs) { logFileSize = Long.parseLong(args[++i]); } else if (args[i].equals("-maxDisk") && moreArgs) { maxDisk = Long.parseLong(args[++i]); } else if (args[i].equals("-checkpointBytes") && moreArgs) { checkpointBytes = Long.parseLong(args[++i]); } else if (args[i].equals("-nPriThreads") && moreArgs) { nPriThreads = Integer.parseInt(args[++i]); } else if (args[i].equals("-nSecThreads") && moreArgs) { nSecThreads = Integer.parseInt(args[++i]); } else if (args[i].equals("-isReverseRead") && moreArgs) { isReverseRead = Boolean.parseBoolean(args[++i]); } else if (args[i].equals("-subDir") && moreArgs) { subDir = Integer.parseInt(args[++i]); } else { usage("Error: Unknown arg: " + args[i]); } } if (nNodes < 2) { throw new IllegalArgumentException ("Replication group size should > 2!"); } if (txnOps >= dbSize || dbSize % txnOps != 0) { throw new IllegalArgumentException ("dbSize should be larger and integral multiple of txnOps!"); } } private void usage(String error) { System.err.println(error); System.err.println ("java " + getClass().getName() + "\n" + " [-h ]\n" + " [-repNodeNum ]\n" + " [-dbSize ]\n" + " [-totalRounds ]\n" + " [-roundPerSync ]\n" + " [-logFileSize ]\n" + " [-checkpointBytes ]\n" + " [-nPriThreads ]\n" + " [-nSecThreads ]\n" + " [-isReverseRead ]"); System.exit(2); } public static void main(String args[]) { try { ReplicaReading test = new ReplicaReading(); test.parseArgs(args); test.doRampup(); test.doSteadyState(); } catch (Throwable t) { t.printStackTrace(System.err); System.exit(1); } } /* * This test traverses the database multiple times. In each traversal, * - the first records are deleted * - the rest of the records are updated, * - an additional worth of records are inserted, in order to * keep the database the same size. The class saves the range for delete, * update and insert. */ class UpdateRange { private int deleteStart; private int deleteEnd; private int updateStart; private int updateEnd; private int insertStart; private int insertEnd; UpdateRange() { deleteStart = 1; deleteEnd = deleteStart + txnOps - 1; updateStart = deleteEnd + 1; updateEnd = dbSize; insertStart = updateEnd + 1; insertEnd = insertStart + txnOps - 1; } public int getStart() { return deleteStart; } public int getEnd() { return insertEnd; } /* Returns true if the key is in the scope of deletion. */ boolean doDelete(int index) { return (index >= deleteStart) && (index <= deleteEnd); } /* Returns true if the key is in the scope of updates. */ boolean doUpdate(int index) { return (index >= updateStart) && (index <= updateEnd); } /* * Adjust the traversal parameters for the next traverse of the * database, since some records have been deleted and some added. */ void shift() { deleteStart += txnOps; deleteEnd += txnOps; updateStart += txnOps; updateEnd += txnOps; insertStart += txnOps; insertEnd += txnOps; } } /* The reading thread on replica. */ class ReplicaReadingThread extends Thread { private final ReplicatedEnvironment repEnv; private boolean secondary; private final ArrayList list; public ReplicaReadingThread(ReplicatedEnvironment repEnv, boolean secondary) { this.repEnv = repEnv; this.secondary = secondary; list = new ArrayList<>(); } @Override public void run() { try { startSignal.await(); EntityStore repStore = Utils.openStore(repEnv, Utils.DB_NAME); EntityIndex index = repStore.getPrimaryIndex(Integer.class, RepTestData.class); if (secondary) { index = repStore.getSecondaryIndex ((PrimaryIndex) index, Integer.class, "data"); } while (runnable) { int numIters = (endKey - beginKey) / txnOps; int startKey = beginKey; /* Do txnOps read operations during each transaction. */ for (int i = 0; runnable && i < numIters; i++) { int start = i * txnOps + startKey; int end = start + txnOps - 1; /* * If there is no data between start and end, then * break and get a new start and end. */ if (doRetries(start, end, repEnv, index)) { break; } } } repStore.close(); endSignal.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } /* Retry if there exists deadlock. */ private boolean doRetries(int start, int end, ReplicatedEnvironment env, EntityIndex index) { boolean success = false; boolean noData = false; int maxTries = 100; for (int tries = 0; !success && tries < maxTries; tries++) { try { Transaction txn = env.beginTransaction(null, null); int realStart = 0; EntityCursor cursor = null; try { cursor = index.entities(txn, null); realStart = cursor.first(null).getKey(); cursor.close(); cursor = index.entities(txn, start, true, end, true, null); noData = addRecordsToList(cursor); success = true; } finally { if (cursor != null) { cursor.close(); } if (success) { if (noData) { checkNoDataCorrectness(start, realStart, tries); } else { checkCorrectness(tries); } txn.commit(); } else { txn.abort(); } list.clear(); } } catch (LockConflictException e) { success = false; } } return noData; } /* * If there is no data in this cursor, return true. If there exists * data in the cursor, add the datas into the list and return false. */ private boolean addRecordsToList(EntityCursor cursor) throws DatabaseException { if (isReverseRead) { RepTestData data = cursor.last(null); if (data == null) { return true; } else { list.add(data); while ((data = cursor.prev(null)) != null) { list.add(data); } } } else { RepTestData data = cursor.first(null); if (data == null) { return true; } else { list.add(data); while ((data = cursor.next(null)) != null) { list.add(data); } } } return false; } /* Check the correctness if there is no data in the cursor. */ private void checkNoDataCorrectness(int start, int realStart, int tries) { /* Expect the list size to 0. */ if (list.size() != 0) { System.err.println( "Error: The expected number of records should be 0, " + "but it is " + list.size() + "!"); System.exit(-1); } /* * The actual beginKey should be larger than the specified * beginKey, and the distance between them should be integral * multiple of txnOps. */ if (realStart < start || ((realStart - start) % txnOps != 0)) { System.err.println( "Error: There are some deleted key exists in database!"); System.err.println("Expected start key is: " + start + ", real start key is: " + realStart); System.exit(-1); } updateRetries(tries); } private void checkCorrectness(int tries) { if (list.size() == txnOps) { int minus = isReverseRead ? 1 : -1; RepTestData firstData = list.get(0); /* Check if the firstData is an abort data. */ if (!("").equals(firstData.getName().substring(4))) { Integer txnRounds = new Integer(firstData.getName().substring(4)); /* If this data is in the abort map, fail the test. */ if (abortMap.get(txnRounds) != null) { System.err.println("Error: Read an aborted record."); System.exit(-1); } } /* Check that records in this list are valid. */ for (int i = 0; i < list.size(); i++) { if (!firstData.logicEquals(list.get(i), i * minus)) { System.err.println("Error: Reading data is wrong!" + "FirstData: " + firstData + "WrontData: " + list.get(i)); for (RepTestData each : list) { System.err.println(each); } System.exit(-1); } } updateRetries(tries); } else { System.err.println( "Error: The expected number of records should be: " + txnOps + ", but it is " + list.size() + "!"); System.exit(-1); } } private void updateRetries(int tries) { /* Assign the value to maxNum and minNum. */ synchronized(this) { maxNum = maxNum < tries ? tries : maxNum; minNum = minNum < tries ? minNum : tries; } if (tries > 0 && Utils.VERBOSE) { System.err.println("Retries this round: " + tries); } } } }