/*- * Copyright (C) 2002, 2017, Oracle and/or its affiliates. All rights reserved. * * This file was distributed by Oracle as part of a version of Oracle Berkeley * DB Java Edition made available at: * * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html * * Please see the LICENSE file included in the top-level directory of the * appropriate version of Oracle Berkeley DB Java Edition for a copy of the * license and additional information. */ import static junit.framework.Assert.assertTrue; import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.je.CacheMode; import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DiskLimitException; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.OperationFailureException; import com.sleepycat.je.OperationResult; import com.sleepycat.je.Put; import com.sleepycat.je.dbi.EnvironmentImpl; import com.sleepycat.je.rep.InsufficientAcksException; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.ReplicationConfig; import com.sleepycat.je.rep.utilint.RepTestUtils; import com.sleepycat.je.rep.utilint.RepTestUtils.RepEnvInfo; import com.sleepycat.je.utilint.LoggerUtils; import com.sleepycat.util.test.SharedTestUtils; /** * Tests a full write load with a disk limit to ensure that files are deleted * quickly enough to avoid hitting the disk limit. With HA, reserved files * (files cleaned and ready-to-delete) are retained until the disk limit is * approached. A steady-state update workload is used to generate waste as * quickly as possible. *

* Suggested steady-state runs: *

 *   # Test steady-state max throughput with one node.
 *   DiskLimitStress -nodes 1 -minutes 15
 *   # Test steady-state HA throughput.
 *   DiskLimitStress -nodes 3 -minutes 15
 * 
*

* In addition this test periodically lowers the disk limit on one or more * nodes to cause it to be violated, then restores the original limit and * expects the write load to continue as before. There are three cases that * are tested: *

 * 1. Master node violates disk limit, both replicas do not. No writes can
 *    occur.
 *
 * 2. One replica node violates disk limit, but not the master and the other
 *    replica. Writes can continue. The replica which violates the limit
 *    will lag, but the test doesn't stay in this mode long enough that a
 *    network restore is needed.
 *
 * 3. Both replicas violate disk limit, but not the master.
 *    InsufficientAcksException and/or InsufficientReplicasException will be
 *    thrown.
 * 
*

* Suggested run to test the three disk limit violation scenarios: *

 *   # Test violations of disk limit
 *   DiskLimitStress -nodes 3 -violations true -minutes 25
 * 
*/ public class DiskLimitStress { /* * Sizes are designed so the data set fits within MAX_DISK, but also so * reserved files will be deleted. These sizes are used for the * steady-state test mode (-violations false). */ private static final long ONE_MB = 1L << 20; private static final long FILE_SIZE = 100 * ONE_MB; private static final long CLEANER_BYTES_INTERVAL = 100 * ONE_MB; private static final int DATA_SIZE = 1024; private static final int ACTIVE_FILES = 20; private static final int TOTAL_FILES = 30; private static final int RECORDS = (int) ((ACTIVE_FILES * FILE_SIZE) / DATA_SIZE); private static final long MAX_DISK = TOTAL_FILES * FILE_SIZE; private static final long FREE_DISK = 0; private static final long HA_TIMEOUT_MS = 2000; /* * With disk violations, use a very small data set. With a larger data * set, there is no guarantee that all records will be updated and * cleaned, making the use of disk space difficult to predict. */ private static final int VIOLATIONS_MODE_RECORDS = 1000; private boolean withViolations; private long runStopTime; private String homeDir; private int nodes; private int updateThreads; private int cleanerThreads; private int minutes; private int records; private volatile boolean stopFlag = false; private final AtomicReference unexpectedEx = new AtomicReference<>(null); private final UpdateThread[] threads; private RepEnvInfo[] repEnvInfo; private Database masterDb; private long lastStatTime; private long lastOperations; private long lastExceptions; private boolean allowDiskLimitEx; private boolean allowDurabilityEx; private boolean expectDiskLimitEx; private boolean expectDurabilityEx; public static void main(final String[] args) { try { printArgs(args); final DiskLimitStress test = new DiskLimitStress(args); test.runTest(); System.out.println("SUCCESS"); System.exit(0); } catch (Throwable e) { e.printStackTrace(System.out); System.exit(-1); } } private static void printArgs(String[] args) { System.out.print("Command line args:"); for (String arg : args) { System.out.print(' '); System.out.print(arg); } System.out.println(); } private DiskLimitStress(String[] args) throws IOException { homeDir = "tmp"; nodes = 1; updateThreads = 4; cleanerThreads = 2; minutes = 10; withViolations = false; for (int i = 0; i < args.length; i += 1) { final String arg = args[i]; final boolean moreArgs = i < args.length - 1; if (arg.equals("-h") && moreArgs) { homeDir = args[++i]; } else if (arg.equals("-violations") && moreArgs) { withViolations = Boolean.parseBoolean(args[++i]); } else if (arg.equals("-nodes") && moreArgs) { nodes = Integer.parseInt(args[++i]); } else if (arg.equals("-threads") && moreArgs) { updateThreads = Integer.parseInt(args[++i]); } else if (arg.equals("-cleaners") && moreArgs) { cleanerThreads = Integer.parseInt(args[++i]); } else if (arg.equals("-minutes") && moreArgs) { minutes= Integer.parseInt(args[++i]); } else { throw new IllegalArgumentException("Unknown arg: " + arg); } } threads = new UpdateThread[updateThreads]; records = withViolations ? VIOLATIONS_MODE_RECORDS : RECORDS; System.out.println( "Resolved args:" + " homeDir=" + homeDir + " nodes=" + nodes + " updateThreads=" + updateThreads + " cleanerThreads=" + cleanerThreads + " totalRecords=" + records + " minutes=" + minutes); } private void start(final boolean smallRepTimeouts) throws IOException { final EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setDurability(Durability.COMMIT_NO_SYNC); envConfig.setMaxDisk(MAX_DISK); envConfig.setSharedCache(true); envConfig.setCacheMode(CacheMode.EVICT_LN); envConfig.setCachePercent(70); envConfig.setConfigParam( EnvironmentConfig.FREE_DISK, String.valueOf(FREE_DISK)); envConfig.setConfigParam( EnvironmentConfig.CLEANER_THREADS, String.valueOf(cleanerThreads)); envConfig.setConfigParam( EnvironmentConfig.CLEANER_BYTES_INTERVAL, String.valueOf(CLEANER_BYTES_INTERVAL)); envConfig.setConfigParam( EnvironmentConfig.LOG_FILE_MAX, String.valueOf(FILE_SIZE)); /* The verifier slows down the test and causes timeouts. */ envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_VERIFIER, "false"); final ReplicationConfig repConfig = new ReplicationConfig(); /* * Use small timeouts to promptly cause InsufficientAcksException * and InsufficientReplicasException when replicas have a disk limit * violation, and to promptly restore normal write operations when * the violation is cleared. */ if (smallRepTimeouts) { final String timeout = String.valueOf(HA_TIMEOUT_MS) + " ms"; repConfig.setConfigParam( ReplicationConfig.FEEDER_TIMEOUT, timeout); repConfig.setConfigParam( ReplicationConfig.REPLICA_TIMEOUT, timeout); repConfig.setConfigParam( ReplicationConfig.REPLICA_ACK_TIMEOUT, timeout); repConfig.setConfigParam( ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, timeout); } SharedTestUtils.cleanUpTestDir(new File(homeDir)); repEnvInfo = RepTestUtils.setupEnvInfos( new File(homeDir), nodes, envConfig, repConfig); final Environment masterEnv = RepTestUtils.joinGroup(repEnvInfo); final DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(true); masterDb = masterEnv.openDatabase(null, "foo", dbConfig); final int keysPerThread = (records / updateThreads) + 1; int startKey = 0; for (int i = 0; i < updateThreads; i += 1) { threads[i] = new UpdateThread(keysPerThread, startKey); startKey += keysPerThread; } stopFlag = false; for (final Thread t : threads) { t.start(); } } private void stop() throws Throwable { if (unexpectedEx.get() != null) { throw unexpectedEx.get(); } stopFlag = true; for (final Thread t : threads) { t.join(10 * 1000); if (t.isAlive()) { t.interrupt(); t.join(10 * 1000); if (t.isAlive()) { throw new RuntimeException( "Thread " + t.getName() + " still running"); } } } masterDb.close(); /* Tolerate DiskLimitException to simplify test. Close master last. */ for (int i = nodes - 1; i >= 0; i -= 1) { try { repEnvInfo[i].closeEnv(); } catch (DiskLimitException expected) { /* Do nothing. */ } } } private void runTest() throws Throwable { lastStatTime = System.currentTimeMillis(); long durationMs = minutes * 60 * 1000; if (withViolations) { if (nodes > 1) { durationMs /= 3; } runStopTime = System.currentTimeMillis() + durationMs; start(false /*smallRepTimeouts*/); runWithViolationsOnMaster(); stop(); if (nodes > 1) { runStopTime += durationMs; start(false /*smallRepTimeouts*/); runWithViolationsOnOneReplica(); stop(); runStopTime += durationMs; start(true /*smallRepTimeouts*/); runWithViolationsOnAllReplicas(); stop(); } } else { runStopTime = System.currentTimeMillis() + durationMs; start(false /*smallRepTimeouts*/); runWithNoViolations(); stop(); } } private void setDiskLimitViolation(final RepEnvInfo info) { setMaxDisk(info, 1); } private void clearDiskLimitViolation(final RepEnvInfo info) { setMaxDisk(info, MAX_DISK); } private void setMaxDisk(final RepEnvInfo info, final long size) { final Environment env = info.getEnv(); env.setMutableConfig(env.getMutableConfig().setMaxDisk(size)); info.getRepImpl().getCleaner().manageDiskUsage(); } private void runWithNoViolations() throws Throwable { while (!stopFlag) { runAndCheckExceptions( System.currentTimeMillis() + 5000); refreshStats(); } } private void runWithViolationsOnMaster() throws Throwable { /* Time to run before lowering MAX_DISK to cause a violation. */ final long steadyStateMs = 5 * 1000; /* Time to wait for lowering MAX_DISK to take effect. */ final long setViolationMs = 3 * 1000; /* Time to wait for restoring MAX_DISK to take effect. */ final long clearViolationMs = 3 * 1000; allowDurabilityEx = false; expectDurabilityEx = false; while (!stopFlag) { System.out.println("Steady state with no violations"); allowDiskLimitEx = false; expectDiskLimitEx = false; runAndCheckExceptions( System.currentTimeMillis() + steadyStateMs); cleanLog(); System.out.println("Start violations on MASTER"); setDiskLimitViolation(repEnvInfo[0]); allowDiskLimitEx = true; expectDiskLimitEx = true; runAndCheckExceptions( System.currentTimeMillis() + setViolationMs); System.out.println("Clear violations on MASTER"); clearDiskLimitViolation(repEnvInfo[0]); allowDiskLimitEx = true; expectDiskLimitEx = false; runAndCheckExceptions( System.currentTimeMillis() + clearViolationMs); for (final UpdateThread t : threads) { t.clearExceptions(); } } } private void runWithViolationsOnOneReplica() throws Throwable { /* Time to run before lowering MAX_DISK to cause a violation. */ final long steadyStateMs = 3 * HA_TIMEOUT_MS; /* * Time to wait for lowering MAX_DISK to take effect. * For replicas we should to wait long enough to cause * InsufficientReplicasException as well as InsufficientAcksException. */ final long setViolationMs = 3 * HA_TIMEOUT_MS; /* Time to wait for restoring MAX_DISK to take effect. */ final long clearViolationMs = 5 * HA_TIMEOUT_MS; allowDiskLimitEx = false; expectDiskLimitEx = false; allowDurabilityEx = false; expectDurabilityEx = false; while (!stopFlag) { System.out.println("Steady state with no violations"); runAndCheckExceptions( System.currentTimeMillis() + steadyStateMs); cleanLog(); System.out.println("Start violations on ONE_REPLICA"); setDiskLimitViolation(repEnvInfo[1]); runAndCheckExceptions( System.currentTimeMillis() + setViolationMs); System.out.println("Clear violations on ONE_REPLICA"); clearDiskLimitViolation(repEnvInfo[1]); runAndCheckExceptions( System.currentTimeMillis() + clearViolationMs); for (final UpdateThread t : threads) { t.clearExceptions(); } } } private void runWithViolationsOnAllReplicas() throws Throwable { /* Time to run before lowering MAX_DISK to cause a violation. */ final long steadyStateMs = 3 * HA_TIMEOUT_MS; /* * Time to wait for lowering MAX_DISK to take effect. * For replicas we should to wait long enough to cause * InsufficientReplicasException as well as InsufficientAcksException. */ final long setViolationMs = 3 * HA_TIMEOUT_MS; /* * Time to wait for restoring MAX_DISK to take effect. * For replicas this takes longer because the connection has * to be reestablished. */ final long clearViolationMs = 5 * HA_TIMEOUT_MS; allowDiskLimitEx = false; expectDiskLimitEx = false; while (!stopFlag) { System.out.println("Steady state with no violations"); allowDurabilityEx = false; expectDurabilityEx = false; runAndCheckExceptions( System.currentTimeMillis() + steadyStateMs); cleanLog(); System.out.println("Start violations on ALL_REPLICAS"); for (int i = 1; i < repEnvInfo.length; i += 1) { setDiskLimitViolation(repEnvInfo[i]); } allowDurabilityEx = true; expectDurabilityEx = true; runAndCheckExceptions( System.currentTimeMillis() + setViolationMs); System.out.println("Clear violations on ALL_REPLICAS"); for (int i = 1; i < repEnvInfo.length; i += 1) { clearDiskLimitViolation(repEnvInfo[i]); } allowDurabilityEx = true; expectDurabilityEx = false; runAndCheckExceptions( System.currentTimeMillis() + clearViolationMs); for (final UpdateThread t : threads) { t.clearExceptions(); } } } private void runAndCheckExceptions(final long stopTime) throws Throwable { if (stopTime > runStopTime) { stopFlag = true; return; } while (!stopFlag && System.currentTimeMillis() < stopTime) { Thread.sleep(1000); for (final UpdateThread t : threads) { if (!allowDiskLimitEx && t.diskLimitEx != null) { throw new IllegalStateException(t.diskLimitEx); } if (!allowDurabilityEx && t.durabilityEx != null) { throw new IllegalStateException(t.durabilityEx); } } } if (stopFlag) { return; } refreshStats(); for (final UpdateThread t : threads) { if (expectDiskLimitEx && t.diskLimitEx == null) { throw new IllegalStateException( "Expected DiskLimitException"); } if (expectDurabilityEx && t.durabilityEx == null) { throw new IllegalStateException( "Expected InsufficientAcksException or " + "InsufficientReplicasException"); } } if (expectDiskLimitEx || expectDurabilityEx) { assertTrue(lastExceptions > 0); } if (!allowDiskLimitEx && !allowDurabilityEx) { assertEquals(0, lastExceptions); assertTrue(lastOperations > 0); } } /** * During steady state, when we know there are no violations, give * cleaner/checkpointer a chance to run to completion. */ private void cleanLog() { for (final RepEnvInfo info : repEnvInfo) { final Environment env = info.getEnv(); env.cleanLog(); env.checkpoint(new CheckpointConfig().setForce(true)); } } @SuppressWarnings("unused") private void dumpThreads() { final EnvironmentImpl envImpl = repEnvInfo[0].getRepImpl(); LoggerUtils.fullThreadDump( envImpl.getLogger(), envImpl, Level.SEVERE); } private void refreshStats() { final long curTime = System.currentTimeMillis(); final long statMs = curTime - lastStatTime; if (statMs < 1000) { return; } long ops = 0; long exceptions = 0; for (final UpdateThread t : threads) { ops += t.getOps(); exceptions += t.getExceptions(); } lastStatTime = curTime; lastOperations = ops; lastExceptions = exceptions; final long statSec = statMs / 1000; final long opsPerSec = ops / statSec; final long exceptionsPerSec = exceptions / statSec; System.out.format( "Ops: %,d Exc: %,d Ops/s: %,d Exc/s: %,d %n", ops, exceptions, opsPerSec, exceptionsPerSec); // System.out.println(envImpl.getCleaner().getDiskLimitMessage()); // final EnvironmentStats stats = masterEnv.getStats(null); } class UpdateThread extends Thread { private final int keysPerThread; private final int startKey; private volatile int ops = 0; private volatile int exceptions = 0; volatile DiskLimitException diskLimitEx; volatile OperationFailureException durabilityEx; UpdateThread(final int keysPerThread, final int startKey) { this.keysPerThread = keysPerThread; this.startKey = startKey; } int getOps() { final int ret = ops; ops = 0; return ret; } int getExceptions() { final int ret = exceptions; exceptions = 0; return ret; } void clearExceptions() { diskLimitEx = null; durabilityEx = null; } @Override public void run() { try { final DatabaseEntry key = new DatabaseEntry(); final DatabaseEntry data = new DatabaseEntry(new byte[DATA_SIZE]); while (!stopFlag) { for (int i = startKey; i < (startKey + keysPerThread) && !stopFlag; i += 1) { IntegerBinding.intToEntry(i, key); try { final OperationResult result = masterDb.put( null, key, data, Put.OVERWRITE, null); if (result == null) { throw new IllegalStateException("put failed"); } ops += 1; } catch (DiskLimitException e) { diskLimitEx = e; exceptions += 1; } catch (InsufficientAcksException| InsufficientReplicasException e) { durabilityEx = e; exceptions += 1; } } } } catch (Throwable e) { e.printStackTrace(System.out); unexpectedEx.compareAndSet(null, e); stopFlag = true; } } } }