/*- * Copyright (C) 2002, 2017, Oracle and/or its affiliates. All rights reserved. * * This file was distributed by Oracle as part of a version of Oracle Berkeley * DB Java Edition made available at: * * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html * * Please see the LICENSE file included in the top-level directory of the * appropriate version of Oracle Berkeley DB Java Edition for a copy of the * license and additional information. */ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; import java.text.DateFormat; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.bind.tuple.TupleBase; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import com.sleepycat.je.CacheMode; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.Get; import com.sleepycat.je.OperationResult; import com.sleepycat.je.Put; import com.sleepycat.je.SecondaryConfig; import com.sleepycat.je.SecondaryCursor; import com.sleepycat.je.SecondaryDatabase; import com.sleepycat.je.SecondaryKeyCreator; import com.sleepycat.je.Transaction; import com.sleepycat.je.WriteOptions; import com.sleepycat.je.dbi.TTL; import com.sleepycat.je.test.SpeedyTTLTime; import com.sleepycat.je.utilint.TracerFormatter; /** * Tests concurrent access when using TTL. * * Goals: * - Read records that are expiring. * - Delete records that are expiring. * - Update TTL for records that are expiring, including changing TTL to zero. * - Lock a record multiple times per txn. * - Read key then data in separate ops. * - Use secondaries that expire. * - Read primary and secondary records individually in separate ops. * - Verify expected data. * - Tolerate only expected deviant behavior, only on expiration boundaries. */ public class TTLStress { private static final WriteOptions ONE_HOUR_TTL = new WriteOptions().setTTL(1, TimeUnit.HOURS).setUpdateTTL(true); /* Must be at least long enough to empty the full queue. */ private static final int TERMINATION_SEC = 10 * 60; /* Since a TTL of 1 hour is used, this is the TTL in millis. */ private static final int FAKE_MILLIS_PER_HOUR = 100; /* * Must be at least the time for a thread to wake up and do an insert. The * insert and a read operation for that record are queued one after the * other and may both be assigned to threads at about the same time. */ private static final int THREAD_SWITCH_TIME = 5000; /* * The read operation expects to read the record, and then for it to * expire, all before EXPIRATION_MAX_MILLIS. */ private static final int EXPIRATION_MAX_MILLIS = (FAKE_MILLIS_PER_HOUR * 2) + THREAD_SWITCH_TIME; private static final DateFormat DATE_FORMAT = TracerFormatter.makeDateFormat(); private static final int DEFAULT_TEST_THREADS = 10; private static final int DEFAULT_CLEANER_THREADS = 2; private static final int DEFAULT_DURATION_MINUTES = 30; private static final int DEFAULT_MAIN_CACHE_MB = 200; private static final int DEFAULT_OFFHEAP_CACHE_MB = 200; private static final int DEFAULT_QUEUE_SIZE = 1000; private static final String DEFAULT_HOME_DIR = "tmp"; public static void main(final String[] args) { try { printArgs(args); final TTLStress test = new TTLStress(args); test.runTest(); test.close(); System.exit(0); } catch (Throwable e) { e.printStackTrace(System.out); System.exit(-1); } } private Environment env; private Database db; private SecondaryDatabase secDb; private int durationMinutes = DEFAULT_DURATION_MINUTES; private ThreadPoolExecutor executor; private final SpeedyTTLTime speedyTime = new SpeedyTTLTime(FAKE_MILLIS_PER_HOUR); private final AtomicInteger nInserts = new AtomicInteger(0); private final AtomicInteger nUpdates = new AtomicInteger(0); private final AtomicInteger nDeletions = new AtomicInteger(0); private final AtomicInteger nDeleteExpired = new AtomicInteger(0); private final AtomicInteger nPriReads = new AtomicInteger(0); private final AtomicInteger nPriExpired = new AtomicInteger(0); private final AtomicInteger nPriExpiredData = new AtomicInteger(0); private final AtomicInteger nPriDeleted = new AtomicInteger(0); private final AtomicInteger nPriNotFound = new AtomicInteger(0); private final AtomicInteger nSecReads = new AtomicInteger(0); private final AtomicInteger nSecExpired = new AtomicInteger(0); private final AtomicInteger nSecExpiredData = new AtomicInteger(0); private final AtomicInteger nSecDeleted = new AtomicInteger(0); private final AtomicInteger nSecNotFound = new AtomicInteger(0); private TTLStress(String[] args) { int nTestThreads = DEFAULT_TEST_THREADS; int queueSize = DEFAULT_QUEUE_SIZE; int nCleanerThreads = DEFAULT_CLEANER_THREADS; int mainCacheMb = DEFAULT_MAIN_CACHE_MB; int offheapCacheMb = DEFAULT_OFFHEAP_CACHE_MB; String homeDir = DEFAULT_HOME_DIR; /* Parse arguments. */ for (int i = 0; i < args.length; i += 1) { final String arg = args[i]; final boolean moreArgs = i < args.length - 1; if (arg.equals("-h") && moreArgs) { homeDir = args[++i]; } else if (arg.equals("-threads") && moreArgs) { nTestThreads = Integer.parseInt(args[++i]); } else if (arg.equals("-cleaners") && moreArgs) { nCleanerThreads = Integer.parseInt(args[++i]); } else if (arg.equals("-minutes") && moreArgs) { durationMinutes = Integer.parseInt(args[++i]); } else if (arg.equals("-cacheMB") && moreArgs) { mainCacheMb = Integer.parseInt(args[++i]); } else if (arg.equals("-offheapMB") && moreArgs) { offheapCacheMb = Integer.parseInt(args[++i]); } else if (arg.equals("-queueSize") && moreArgs) { queueSize = Integer.parseInt(args[++i]); } else { throw new IllegalArgumentException("Unknown arg: " + arg); } } executor = new ThreadPoolExecutor( nTestThreads, nTestThreads, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize), new ThreadPoolExecutor.AbortPolicy()); executor.prestartAllCoreThreads(); open(homeDir, nCleanerThreads, mainCacheMb, offheapCacheMb); } private static void printArgs(String[] args) { System.out.print("\nCommand line arguments:"); for (String arg : args) { System.out.print(' '); System.out.print(arg); } System.out.println(); } private void open(final String homeDir, final int nCleanerThreads, final int mainCacheMb, final int offheapCacheMb) { final EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setDurability(Durability.COMMIT_NO_SYNC); envConfig.setConfigParam( EnvironmentConfig.LOG_FILE_MAX, String.valueOf(1024 * 1024)); envConfig.setConfigParam( EnvironmentConfig.CLEANER_THREADS, String.valueOf(nCleanerThreads)); envConfig.setCacheSize(mainCacheMb * (1024 * 1024)); envConfig.setOffHeapCacheSize(offheapCacheMb * (1024 * 1024)); /* Account for very slow test machines. */ envConfig.setLockTimeout(30, TimeUnit.SECONDS); env = new Environment(new File(homeDir), envConfig); final DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(true); dbConfig.setCacheMode(CacheMode.EVICT_LN); db = env.openDatabase(null, "priDb", dbConfig); final SecondaryConfig secConfig = new SecondaryConfig(); secConfig.setAllowCreate(true); secConfig.setTransactional(true); secConfig.setCacheMode(CacheMode.EVICT_LN); secConfig.setSortedDuplicates(true); secConfig.setKeyCreator(new SecondaryKeyCreator() { @Override public boolean createSecondaryKey(final SecondaryDatabase secDb, final DatabaseEntry key, final DatabaseEntry data, final DatabaseEntry result) { result.setData(key.getData()); return true; } }); secDb = env.openSecondaryDatabase(null, "secDb", db, secConfig); } private void close() throws InterruptedException { log("Starting shutdown"); executor.shutdown(); if (!executor.awaitTermination(TERMINATION_SEC, TimeUnit.SECONDS)) { System.out.println( "Could not terminate gracefully after " + TERMINATION_SEC + " seconds"); final List stillRunning = executor.shutdownNow(); if (!stillRunning.isEmpty()) { System.out.println( "Did not empty queue during close after " + TERMINATION_SEC + " seconds, " + stillRunning.size() + " tasks still running."); System.exit(1); } } secDb.close(); db.close(); env.close(); log(String.format( "Test succeeded %n" + "nInserts: %,d %n" + "nUpdates: %,d %n" + "nDeletions: %,d %n" + "nPriReads: %,d %n" + "nPriExpired: %,d %n" + "nPriExpiredData: %,d %n" + "nPriDeleted: %,d %n" + "nPriNotFound: %,d %n" + "nSecReads: %,d %n" + "nSecExpired: %,d %n" + "nSecExpiredData: %,d %n" + "nSecDeleted: %,d %n" + "nSecNotFound: %,d %n" + "nDeleteExpired: %,d ", nInserts.get(), nUpdates.get(), nDeletions.get(), nPriReads.get(), nPriExpired.get(), nPriExpiredData.get(), nPriDeleted.get(), nPriNotFound.get(), nSecReads.get(), nSecExpired.get(), nSecExpiredData.get(), nSecDeleted.get(), nSecNotFound.get(), nDeleteExpired.get())); } private static void log(final String msg) { synchronized (DATE_FORMAT) { System.out.println( DATE_FORMAT.format(System.currentTimeMillis()) + " " + msg); } } private void runTest() throws Throwable { final long endTime = System.currentTimeMillis() + (durationMinutes * 60 * 1000); final Random rnd = new Random(123); final BlockingQueue queue = executor.getQueue(); speedyTime.start(); while (System.currentTimeMillis() < endTime) { final boolean doDelete; final boolean doUpdate; int nOps = 3; // insert, read primary and secondary switch (rnd.nextInt(3)) { case 0: doDelete = false; doUpdate = false; break; case 1: doDelete = false; doUpdate = true; nOps += 1; break; case 2: doDelete = true; doUpdate = false; nOps += 1; break; default: throw new RuntimeException(); } if (queue.remainingCapacity() < nOps) { continue; } final long key = rnd.nextLong(); final AtomicBoolean abandonOp = new AtomicBoolean(false); final AtomicLong expirationTime = new AtomicLong(0); executor.execute(new Write( key, abandonOp, expirationTime, !doUpdate /*doInsert*/)); executor.execute(new PrimaryRead( key, abandonOp, expirationTime, doDelete)); executor.execute(new SecondaryRead( key, abandonOp, expirationTime, doDelete, doUpdate)); if (doUpdate) { executor.execute(new Write( key, abandonOp, expirationTime, false /*doInsert*/)); } if (doDelete) { executor.execute(new Delete( key, abandonOp, expirationTime)); } } } private class Write implements Runnable { private static final boolean DEBUG_WRITE = false; private final long key; private final AtomicBoolean abandonOp; private final AtomicLong expirationTime; private final boolean doInsert; Write(final long key, final AtomicBoolean abandonOp, final AtomicLong expirationTime, final boolean doInsert) { this.key = key; this.abandonOp = abandonOp; this.doInsert = doInsert; this.expirationTime = expirationTime; } @Override public void run() { Thread.currentThread().setName("Write"); try { final DatabaseEntry keyEntry = new DatabaseEntry(); LongBinding.longToEntry(key, keyEntry); final boolean useSeparateLN = (key & 1) != 0; final byte[] dataBytes = new byte[useSeparateLN ? 20 : 8]; final DatabaseEntry dataEntry = new DatabaseEntry(); TupleBase.outputToEntry( new TupleOutput(dataBytes).writeLong(key), dataEntry); final Transaction txn = env.beginTransaction(null, null); final long opBeforeTime = speedyTime.realTimeToFakeTime(System.currentTimeMillis()); OperationResult result = db.put( txn, keyEntry, dataEntry, doInsert ? Put.NO_OVERWRITE : Put.OVERWRITE, ONE_HOUR_TTL); final long opAfterTime = speedyTime.realTimeToFakeTime(System.currentTimeMillis()); if (result == null && doInsert) { log("Apparent duplicate random number as key: " + key); txn.abort(); abandonOp.set(true); return; } assertNotNull("Could not write: " + key, result); final long opBeforeExpTime = writeTimeToExpirationTime(opBeforeTime); final long opAfterExpTime = writeTimeToExpirationTime(opAfterTime); final long resultExpTime = result.getExpirationTime(); boolean resultExpTimeMatches = false; for (long time = opBeforeExpTime; time <= opAfterExpTime; time += TTL.MILLIS_PER_HOUR) { if (resultExpTime == time) { resultExpTimeMatches = true; break; } } if (!resultExpTimeMatches) { fail( "key: " + key + " opBeforeTime: " + formatTime(opBeforeTime) + " opAfterTime: " + formatTime(opAfterTime) + " opBeforeExpTime: " + formatTime(opBeforeExpTime) + " opAfterExpTime: " + formatTime(opAfterExpTime) + " resultExpTime: " + formatTime(resultExpTime)); } expirationTime.set(resultExpTime); if (DEBUG_WRITE) { log( (doInsert ? "Inserted " : "Updated ") + "key: " + key + " opBeforeTime: " + formatTime(opBeforeTime) + " opAfterTime: " + formatTime(opAfterTime) + " resultExpTime: " + formatTime(resultExpTime)); result = db.get( txn, keyEntry, dataEntry, Get.SEARCH, null); assertNotNull("Could not read: " + key, result); assertEquals( resultExpTime, result.getExpirationTime()); } txn.commit(); if (doInsert) { nInserts.incrementAndGet(); } else { nUpdates.incrementAndGet(); } } catch (Throwable e) { e.printStackTrace(System.out); System.exit(1); } } } private class Delete implements Runnable { private final long key; private final AtomicBoolean abandonOp; private final AtomicLong expirationTime; Delete(final long key, final AtomicBoolean abandonOp, final AtomicLong expirationTime) { this.key = key; this.abandonOp = abandonOp; this.expirationTime = expirationTime; } @Override public void run() { Thread.currentThread().setName("Delete"); final long startTime = System.currentTimeMillis(); final DatabaseEntry keyEntry = new DatabaseEntry(); LongBinding.longToEntry(key, keyEntry); try { while (true) { if (abandonOp.get()) { return; } final long sysTime = System.currentTimeMillis(); final long opTime = speedyTime.realTimeToFakeTime(sysTime); if (sysTime - startTime > EXPIRATION_MAX_MILLIS) { if (TTL.isExpired(expirationTime.get())) { nDeleteExpired.incrementAndGet(); } else { fail("Did not expire: " + key + " opTime: " + formatTime(opTime) + " expirationTime: " + formatTime(expirationTime.get())); } break; } final OperationResult result = db.delete( null, keyEntry, null); if (result == null) { if (TTL.isExpired(expirationTime.get())) { nDeleteExpired.incrementAndGet(); return; } continue; } assertEquals( expirationTime.get(), result.getExpirationTime()); nDeletions.incrementAndGet(); return; } } catch (Throwable e) { e.printStackTrace(System.out); System.exit(1); } } } private class PrimaryRead implements Runnable { private final long key; private final AtomicBoolean abandonOp; private final AtomicLong expirationTime; private final boolean doDelete; PrimaryRead(final long key, final AtomicBoolean abandonOp, final AtomicLong expirationTime, final boolean doDelete) { this.key = key; this.abandonOp = abandonOp; this.expirationTime = expirationTime; this.doDelete = doDelete; } @Override public void run() { Thread.currentThread().setName("PrimaryRead"); final DatabaseEntry keyEntry = new DatabaseEntry(); final DatabaseEntry dataEntry = new DatabaseEntry(); final long startTime = System.currentTimeMillis(); OperationResult result; while (true) { if (abandonOp.get()) { return; } try (final Cursor cursor = db.openCursor(null, null)) { final long sysTime = System.currentTimeMillis(); final long opTime = speedyTime.realTimeToFakeTime(sysTime); if (sysTime - startTime > EXPIRATION_MAX_MILLIS) { if (TTL.isExpired(expirationTime.get())) { nPriExpired.incrementAndGet(); } else if (doDelete) { nPriDeleted.incrementAndGet(); } else { fail("Did not expire: " + key + " opTime: " + formatTime(opTime) + " expirationTime: " + formatTime(expirationTime.get())); } break; } nPriReads.incrementAndGet(); LongBinding.longToEntry(key, keyEntry); final boolean readDataSeparately = (key % 3) == 0; result = cursor.get( keyEntry, readDataSeparately ? null : dataEntry, Get.SEARCH, null); if (result == null) { if (TTL.isExpired(expirationTime.get())) { nPriExpired.incrementAndGet(); break; } continue; } assertEquals(key, LongBinding.entryToLong(keyEntry)); assertEquals( expirationTime.get(), result.getExpirationTime()); if (!readDataSeparately) { final TupleInput input = TupleBase.entryToInput(dataEntry); assertEquals(key, input.readLong()); } result = cursor.get( keyEntry, dataEntry, Get.CURRENT, null); if (result == null) { if (readDataSeparately && TTL.isExpired(expirationTime.get())) { nPriExpiredData.incrementAndGet(); break; } fail("Could not read locked record: " + key); } assertEquals(key, LongBinding.entryToLong(keyEntry)); final TupleInput input = TupleBase.entryToInput(dataEntry); assertEquals(key, input.readLong()); assertEquals( expirationTime.get(), result.getExpirationTime()); } catch (Throwable e) { e.printStackTrace(System.out); System.exit(1); } } } } private class SecondaryRead implements Runnable { private final long key; private final AtomicBoolean abandonOp; private final AtomicLong expirationTime; private final boolean doDelete; private final boolean doUpdate; SecondaryRead(final long key, final AtomicBoolean abandonOp, final AtomicLong expirationTime, final boolean doDelete, final boolean doUpdate) { this.key = key; this.abandonOp = abandonOp; this.expirationTime = expirationTime; this.doDelete = doDelete; this.doUpdate = doUpdate; } @Override public void run() { Thread.currentThread().setName("SecondaryRead"); final DatabaseEntry keyEntry = new DatabaseEntry(); final DatabaseEntry pKeyEntry = new DatabaseEntry(); final DatabaseEntry dataEntry = new DatabaseEntry(); final long startTime = System.currentTimeMillis(); OperationResult result; while (true) { if (abandonOp.get()) { return; } try (final SecondaryCursor cursor = secDb.openCursor(null, null)) { final long sysTime = System.currentTimeMillis(); final long opTime = speedyTime.realTimeToFakeTime(sysTime); if (sysTime - startTime > EXPIRATION_MAX_MILLIS) { if (TTL.isExpired(expirationTime.get())) { nSecExpired.incrementAndGet(); } else if (doDelete) { nSecDeleted.incrementAndGet(); } else { fail("Did not expire: " + key + " currentTime: " + formatTime(opTime) + " expirationTime: " + formatTime(expirationTime.get())); } break; } nSecReads.incrementAndGet(); LongBinding.longToEntry(key, keyEntry); /* * If we read data separately when an update or deletion is * being done, this would cause deadlocks. */ final boolean readDataSeparately = !doUpdate && !doDelete && (key % 3) == 0; result = cursor.get( keyEntry, pKeyEntry, readDataSeparately ? null : dataEntry, Get.SEARCH, null); if (result == null) { if (TTL.isExpired(expirationTime.get())) { nSecExpired.incrementAndGet(); break; } continue; } assertEquals(key, LongBinding.entryToLong(keyEntry)); assertEquals( expirationTime.get(), result.getExpirationTime()); assertTrue(Arrays.equals( keyEntry.getData(), pKeyEntry.getData())); if (!readDataSeparately) { final TupleInput input = TupleBase.entryToInput(dataEntry); assertEquals(key, input.readLong()); } result = cursor.get( keyEntry, pKeyEntry, dataEntry, Get.CURRENT, null); if (result == null) { if (readDataSeparately && TTL.isExpired(expirationTime.get())) { nSecExpiredData.incrementAndGet(); break; } fail("Could not read locked record: " + key + " readDataSeparately: " + readDataSeparately); } assertEquals(key, LongBinding.entryToLong(keyEntry)); assertTrue(Arrays.equals( keyEntry.getData(), pKeyEntry.getData())); final TupleInput input = TupleBase.entryToInput(dataEntry); assertEquals(key, input.readLong()); assertEquals( expirationTime.get(), result.getExpirationTime()); } catch (Throwable e) { e.printStackTrace(System.out); System.exit(1); } } } } private long writeTimeToExpirationTime(final long writeTime) { final int expiration = TTL.systemTimeToExpiration( writeTime + TTL.MILLIS_PER_HOUR, true /*hours*/); return TTL.expirationToSystemTime(expiration, true /*hours*/); } private String formatTime(final long time) { synchronized (DATE_FORMAT) { return DATE_FORMAT.format(time); } } }