/*- * Copyright (C) 2002, 2017, Oracle and/or its affiliates. All rights reserved. * * This file was distributed by Oracle as part of a version of Oracle Berkeley * DB Java Edition made available at: * * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html * * Please see the LICENSE file included in the top-level directory of the * appropriate version of Oracle Berkeley DB Java Edition for a copy of the * license and additional information. */ import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.Random; import java.util.concurrent.CountDownLatch; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationFailureException; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.SecondaryConfig; import com.sleepycat.je.SecondaryCursor; import com.sleepycat.je.SecondaryDatabase; import com.sleepycat.je.SecondaryKeyCreator; import com.sleepycat.je.Transaction; /** * Application to simulate different deadlock scenarios. * * The simple scenario: * Two threads access two records in opposite order with their own txns. */ public class DeadlockStress { private String homeDir = "./tmp"; private Environment env = null; private Database db; private int dbSize = 100; private int totalTxns = 1000; private int factor = 100; private int maxRetry = 100; /* The number of operations in each Txn in mix access mode. */ private int opNum = 5; /* * The number of threads used in mix access mode. It should be divided * by dbSize. */ private int threadNum = 20; /* The run time for mix access mode. */ private long runtime = 5 * 60 * 1000; // 10minutes boolean verbose = true; private CountDownLatch startSignal; private boolean deadlockDone = false; void openEnv() { EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setDurability(Durability.COMMIT_NO_SYNC); /* envConfig.setConfigParam (EnvironmentParams.LOCK_TIMEOUT.getName(), "1000 ms"); */ try { File envHome = new File(homeDir); env = new Environment(envHome, envConfig); } catch (Error e) { e.printStackTrace(); System.exit(1); } DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(true); db = env.openDatabase(null, "testDB", dbConfig); } void closeEnv() { try { if (db != null) { db.close(); } if (env != null) { env.close(); } } catch (Throwable e) { e.printStackTrace(); System.exit(1); } } public static void main(String args[]){ try { DeadlockStress test = new DeadlockStress(); test.parseArgs(args); test.run(); System.exit(0); } catch (Throwable e) { e.printStackTrace(System.out); System.exit(1); } } /* Output command-line input arguments to log. */ private void printArgs(String[] args) { System.out.print("\nCommand line arguments:"); for (String arg : args) { System.out.print(' '); System.out.print(arg); } System.out.println(); } protected void parseArgs(String args[]) throws Exception { for (int i = 0; i < args.length; i++) { boolean moreArgs = i < args.length - 1; if (args[i].equals("-h") && moreArgs) { homeDir = args[++i]; } else if (args[i].equals("-dbSize") && moreArgs) { dbSize = Integer.parseInt(args[++i]); } else if (args[i].equals("-totalTxns") && moreArgs) { totalTxns = Integer.parseInt(args[++i]); } else if (args[i].equals("-retry") && moreArgs) { maxRetry = Integer.parseInt(args[++i]); } else if (args[i].equals("-opnum") && moreArgs) { opNum = Integer.parseInt(args[++i]); } else if (args[i].equals("-threads") && moreArgs) { threadNum = Integer.parseInt(args[++i]); } else if (args[i].equals("-time") && moreArgs) { runtime = Integer.parseInt(args[++i]); } else if (args[i].equals("-verbose") && moreArgs ) { verbose = Boolean.parseBoolean(args[++i]); } else { usage("Error: Unknown arg: " + args[i]); } } printArgs(args); } private void usage(String error) { if (error != null) { System.err.println(error); } System.err.println ("java " + getClass().getName() + '\n' + " [-h ] [-dbsize] [-totalTxns]\n"); System.exit(1); } public void run() throws Exception { openEnv(); insertRecords(); compareExceptionMessFoDebug(); doTwoThreadsDeadlock(); doTwoThreadsNoInteraction(); doTwoThreadsPartInteraction(); doThreeThreadsDeadlock(); doThreeThreadsNoInteraction(); doThreeThreadsPartInteraction(); doDeadlockOnOneRecord(); noDeadlockOnOneRecord(); doDeadlockOneCommonLocker(); doDeadlockTwoCommonLockers(); doMixedOperationWithDeadlock(); doMixedOperationSortedToNoDeadlock(); doMixedOperationNoInteraction(); doMixedOperationWithDeadlockSecondary(); doMixedOperationSortedToNoDeadlockSecondary(); doMixedOperationNoInteractionSecondary(); closeEnv(); } private void insertRecords() throws Exception, InterruptedException { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); for (int i = 0; i < dbSize; i++) { IntegerBinding.intToEntry(i, key); IntegerBinding.intToEntry(i, data); db.put(null, key, data); } } public void compareExceptionMessFoDebug() throws InterruptedException { System.out.println("Compare Exception content"); startSignal = new CountDownLatch(1); AccessThreadBreakWhenDeadlock thread1 = new AccessThreadBreakWhenDeadlock(1,1,2,-1,false); AccessThreadBreakWhenDeadlock thread2 = new AccessThreadBreakWhenDeadlock(2,2,1,-1,false); thread1.start(); thread2.start(); startSignal.countDown(); thread1.join(); thread2.join(); } public void doTwoThreadsDeadlock() throws InterruptedException { System.out.println("Deadlock between two threads"); startSignal = new CountDownLatch(1); AccessThread thread1 = new AccessThread(1,1,2,-1,false); AccessThread thread2 = new AccessThread(2,2,1,-1,false); thread1.start(); thread2.start(); startSignal.countDown(); thread1.join(); thread2.join(); } public void doTwoThreadsNoInteraction() throws InterruptedException { System.out.println("Two threads do not have any interaction"); startSignal = new CountDownLatch(1); totalTxns = factor * totalTxns; AccessThread thread1 = new AccessThread(1,1,2,-1,false); AccessThread thread2 = new AccessThread(2,3,4,-1,false); thread1.start(); thread2.start(); startSignal.countDown(); thread1.join(); thread2.join(); totalTxns = totalTxns / factor; } public void doTwoThreadsPartInteraction() throws InterruptedException { System.out.println("Two threads have part interaction"); startSignal = new CountDownLatch(1); totalTxns = factor * totalTxns; AccessThread thread1 = new AccessThread(1,1,2,-1,false); AccessThread thread2 = new AccessThread(2,3,1,-1,false); thread1.start(); thread2.start(); startSignal.countDown(); thread1.join(); thread2.join(); totalTxns = totalTxns / factor; } public void doThreeThreadsDeadlock() throws InterruptedException { System.out.println("Deadlock between three threads"); startSignal = new CountDownLatch(1); AccessThread thread1 = new AccessThread(1,1,2,-1,false); AccessThread thread2 = new AccessThread(2,2,3,-1,false); AccessThread thread3 = new AccessThread(3,3,1,-1,false); thread1.start(); thread2.start(); thread3.start(); startSignal.countDown(); thread1.join(); thread2.join(); thread3.join(); } public void doThreeThreadsNoInteraction() throws InterruptedException { System.out.println("Three threads do not have any interaction"); startSignal = new CountDownLatch(1); totalTxns = factor * totalTxns; AccessThread thread1 = new AccessThread(1,1,2,-1,false); AccessThread thread2 = new AccessThread(2,3,4,-1,false); AccessThread thread3 = new AccessThread(3,5,6,-1,false); thread1.start(); thread2.start(); thread3.start(); startSignal.countDown(); thread1.join(); thread2.join(); thread3.join(); totalTxns = totalTxns / factor; } public void doThreeThreadsPartInteraction() throws InterruptedException { System.out.println("Three threads have part interaction"); startSignal = new CountDownLatch(1); totalTxns = factor * totalTxns; AccessThread thread1 = new AccessThread(1,1,2,-1,false); AccessThread thread2 = new AccessThread(2,1,3,-1,false); AccessThread thread3 = new AccessThread(3,4,1,-1,false); thread1.start(); thread2.start(); thread3.start(); startSignal.countDown(); thread1.join(); thread2.join(); thread3.join(); totalTxns = totalTxns / factor; } public void doDeadlockOnOneRecord() throws InterruptedException { System.out.println("Deadlock formed on one record"); startSignal = new CountDownLatch(1); AccessThread thread1 = new AccessThread(1,1,1,-1,true); AccessThread thread2 = new AccessThread(2,1,1,-1,true); thread1.start(); thread2.start(); startSignal.countDown(); thread1.join(); thread2.join(); } public void noDeadlockOnOneRecord() throws InterruptedException { System.out.println("No Deadlock formed on one record"); startSignal = new CountDownLatch(1); totalTxns = factor * totalTxns; AccessThread thread1 = new AccessThread(1,1,1,-1,false); AccessThread thread2 = new AccessThread(2,1,1,-1,false); thread1.start(); thread2.start(); startSignal.countDown(); thread1.join(); thread2.join(); totalTxns = totalTxns / factor; } public void doDeadlockOneCommonLocker() throws InterruptedException { System.out.println("Deadlock with one common locker"); startSignal = new CountDownLatch(1); AccessThread thread1 = new AccessThread(1,1,2,-1,true); AccessThread thread2 = new AccessThread(2,3,2,1,false); AccessThread thread3 = new AccessThread(3,1,3,-1,true); thread1.start(); thread2.start(); thread3.start(); startSignal.countDown(); thread1.join(); thread2.join(); thread3.join(); } public void doDeadlockTwoCommonLockers() throws InterruptedException { System.out.println("Deadlock with two common lockers"); startSignal = new CountDownLatch(1); AccessThread thread1 = new AccessThread(1,1,2,-1,true); AccessThread thread2 = new AccessThread(2,4,2,3,false); AccessThread thread3 = new AccessThread(3,3,1,-1,false); AccessThread thread4 = new AccessThread(4,1,4,-1,true); thread1.start(); thread2.start(); thread3.start(); thread4.start(); startSignal.countDown(); thread1.join(); thread2.join(); thread3.join(); thread4.join(); } public void doMixedOperationWithDeadlock() { System.out.println("Mix access mode with possible Deadlock"); int[] distribution = new int[] {25, 25, 25, 25}; MixedAccessThread[] mixedThreads = new MixedAccessThread[threadNum]; for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i] = new MixedAccessThread(i, distribution, false, false); } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].start(); } try { Thread.sleep(runtime); } catch (InterruptedException e) { } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].setDone(true); } for (int i = 0; i < mixedThreads.length; i++) { try { mixedThreads[i].join(); } catch (InterruptedException e) { } } } public void doMixedOperationSortedToNoDeadlock() { System.out.println("Mix access mode sorted to no deadlock"); int[] distribution = new int[] {25, 25, 25, 25}; MixedAccessThread[] mixedThreads = new MixedAccessThread[threadNum]; for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i] = new MixedAccessThread(i, distribution, true, false); } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].start(); } try { Thread.sleep(runtime); } catch (InterruptedException e) { } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].setDone(true); } for (int i = 0; i < mixedThreads.length; i++) { try { mixedThreads[i].join(); } catch (InterruptedException e) { } } } public void doMixedOperationNoInteraction() { System.out.println("Mix access mode no interaction to no deadlock"); int[] distribution = new int[] {25, 25, 25, 25}; MixedAccessThread[] mixedThreads = new MixedAccessThread[threadNum]; for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i] = new MixedAccessThread(i, distribution, false, true); } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].start(); } try { Thread.sleep(runtime); } catch (InterruptedException e) { } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].setDone(true); } for (int i = 0; i < mixedThreads.length; i++) { try { mixedThreads[i].join(); } catch (InterruptedException e) { } } } public void doMixedOperationWithDeadlockSecondary() { System.out.println("Mix access mode with possible Deadlock Secondary"); int[] distribution = new int[] {25, 25, 25, 25}; SecondaryAccessThread[] mixedThreads = new SecondaryAccessThread[threadNum]; SecondaryDatabase sdb = openSecondary(env, db, "secDb", new SecondaryConfig()); boolean secondary; Database usedDb; for (int i = 0; i < mixedThreads.length; i++) { if ( i % 2 == 0) { secondary = false; usedDb = db; } else { secondary = true; usedDb = sdb; } mixedThreads[i] = new SecondaryAccessThread( i, distribution, false, false, secondary, usedDb); } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].start(); } try { Thread.sleep(runtime); } catch (InterruptedException e) { } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].setDone(true); } for (int i = 0; i < mixedThreads.length; i++) { try { mixedThreads[i].join(); } catch (InterruptedException e) { } } if (sdb != null) { sdb.close(); } } public void doMixedOperationSortedToNoDeadlockSecondary() { System.out.println("Mix access mode sorted to no deadlock Secondary"); int[] distribution = new int[] {25, 25, 25, 25}; SecondaryAccessThread[] mixedThreads = new SecondaryAccessThread[threadNum]; SecondaryDatabase sdb = openSecondary(env, db, "secDb", new SecondaryConfig()); boolean secondary; Database usedDb; for (int i = 0; i < mixedThreads.length; i++) { if ( i % 2 == 0) { secondary = false; usedDb = db; } else { secondary = true; usedDb = sdb; } mixedThreads[i] = new SecondaryAccessThread( i, distribution, true, false, secondary, usedDb); } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].start(); } try { Thread.sleep(runtime); } catch (InterruptedException e) { } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].setDone(true); } for (int i = 0; i < mixedThreads.length; i++) { try { mixedThreads[i].join(); } catch (InterruptedException e) { } } if (sdb != null) { sdb.close(); } } public void doMixedOperationNoInteractionSecondary() { System.out.println("Mix access mode no interaction Secondary"); int[] distribution = new int[] {25, 25, 25, 25}; SecondaryAccessThread[] mixedThreads = new SecondaryAccessThread[threadNum]; SecondaryDatabase sdb = openSecondary(env, db, "secDb", new SecondaryConfig()); boolean secondary; Database usedDb; for (int i = 0; i < mixedThreads.length; i++) { if ( i % 2 == 0) { secondary = false; usedDb = db; } else { secondary = true; usedDb = sdb; } mixedThreads[i] = new SecondaryAccessThread( i, distribution, false, true, secondary, usedDb); } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].start(); } try { Thread.sleep(runtime); } catch (InterruptedException e) { } for (int i = 0; i < mixedThreads.length; i++) { mixedThreads[i].setDone(true); } for (int i = 0; i < mixedThreads.length; i++) { try { mixedThreads[i].join(); } catch (InterruptedException e) { } } if (sdb != null) { sdb.close(); } } private SecondaryDatabase openSecondary( Environment env, Database priDb, String dbName, SecondaryConfig dbConfig) { dbConfig.setAllowPopulate(true); dbConfig.setSortedDuplicates(true); dbConfig.setTransactional(true); dbConfig.setAllowCreate(true); dbConfig.setKeyCreator(new MyKeyCreator()); return env.openSecondaryDatabase(null, dbName, priDb, dbConfig); } class SecondaryAccessThread extends Thread { boolean done = false; private int id; private CRUDGenerator cg; private int opsNumEachThread = dbSize / threadNum; private boolean secondary; private Database usedDb; /* * The records involved in each Txn of different threads may contain * the same record(s), but in order to avoid deadlock, we sort these * records by their int key and at the same time, guarantee that in * each txn, the int keys of records are different. */ private boolean sort = false; /* * In order to avoid deadlock, the records involved in each Txn of * different threads do not have intersection. */ private boolean noInteraction = false; SecondaryAccessThread(int id, int[] distribution, boolean sort, boolean noInteraction, boolean secondary, Database usedDb) { this.id = id; this.sort = sort; this.noInteraction = noInteraction; this.secondary = secondary; this.usedDb = usedDb; cg = new CRUDGenerator(id, distribution); } public void run() { long startTime = System.currentTimeMillis(); long count = 0; while (!done) { doOneTxnWithRetry(); count++; } long endTime = System.currentTimeMillis(); float elapsedSec = (float) ((endTime - startTime) / 1e3); float throughput = ((float) count) / elapsedSec; System.out.println ("Thread " + id + " finishes " + count + " iterations in: " + elapsedSec + " sec, average throughput: " + throughput + " op/sec."); } @SuppressWarnings("unchecked") public void doOneTxnWithRetry() { ArrayList ops = new ArrayList<>(); ArrayList keyInts = new ArrayList<>(); for (int i = 0; i < opNum; i++) { int keyInt = generateKeyInt(keyInts); DatabaseEntry key = new DatabaseEntry(); IntegerBinding.intToEntry(keyInt, key); CRUDTYPE op = cg.nextRandomCRUD(); switch (op) { case CREATE: ops.add(new CursorCreate(key, secondary)); break; case READ: ops.add(new CursorRead(key, secondary)); break; case UPDATE: ops.add(new CursorUpdate(key, secondary)); break; case DELETE: ops.add(new CursorDelete(key, secondary)); break; default: throw new IllegalStateException("Unknown op: " + op); } } if (sort) { final CursorOperation[] coArray = ops.toArray(new CursorOperation[0]); final CursorComparator cc = new CursorComparator(); Arrays.sort(coArray, cc); ops.clear(); ops.addAll(Arrays.asList(coArray)); } int tries = 0; while (tries < maxRetry) { Transaction txn = env.beginTransaction(null, null); Cursor c = usedDb.openCursor(txn, null); try { for (CursorOperation cursorOp: ops) { cursorOp.execute(txn, c); } if (c != null) { c.close(); } txn.commit(); break; } catch (LockConflictException e) { if (c != null) { c.close(); } txn.abort(); tries++; } catch (OperationFailureException ofe) { ofe.printStackTrace(); } } //if (tries == maxRetry) { //if (tries > 0) { // System.out.println("Thread: " + id + " Retry times: " + tries); //} } private int generateKeyInt(ArrayList keyInts) { if (sort) { while (true) { boolean repeated = false; int tmp = cg.nextRandomKeyInt(dbSize); for (Integer I : keyInts) { if (I.intValue() == tmp) { repeated = true; break; } } if (!repeated) { keyInts.add(new Integer(tmp)); return tmp; } } } else if (noInteraction) { int tmp = cg.nextRandomKeyInt(opsNumEachThread); return tmp + opsNumEachThread * id; } else { return cg.nextRandomKeyInt(dbSize); } } public synchronized void setDone(boolean done) { this.done = done; } } class MyKeyCreator implements SecondaryKeyCreator { @Override public boolean createSecondaryKey(SecondaryDatabase secondary, DatabaseEntry key, DatabaseEntry data, DatabaseEntry result) { result.setData(key.getData()); return true; } } abstract class CursorOperation { protected final DatabaseEntry key; protected final boolean secondary; public CursorOperation(DatabaseEntry key, boolean secondary) { this.key = key; this.secondary = secondary; } abstract OperationStatus execute(Transaction txn, Cursor c) throws DatabaseException; public int getKeyInt() { return IntegerBinding.entryToInt(key); } } /* * Create. * * For secondary database, we can not create record. So we actually do * read actions with different search mode: getSearchBothRange. */ class CursorCreate extends CursorOperation { CursorCreate(DatabaseEntry key, boolean secondary) { super(key, secondary); } @Override OperationStatus execute(Transaction txn, Cursor c) throws DatabaseException { if (txn.isValid()) { if (secondary) { return ((SecondaryCursor)c).getSearchBothRange( key, key, new DatabaseEntry(), null); } else { return c.put(key, new DatabaseEntry(new byte[10])); } } else { return null; } } } /* Read */ class CursorRead extends CursorOperation { CursorRead(DatabaseEntry key, boolean secondary) { super(key, secondary); } @Override OperationStatus execute(Transaction txn, Cursor c) throws DatabaseException { if (txn.isValid()) { if (secondary) { return ((SecondaryCursor)c).getSearchKey( key, new DatabaseEntry(), new DatabaseEntry(), null); } else { return c.getSearchKey(key, new DatabaseEntry(), null); } } else { return null; } } } /* * Update. * * For secondary database, we can not create record. So we actually do * read actions with different search mode: getSearchKeyRange. */ class CursorUpdate extends CursorOperation { CursorUpdate(DatabaseEntry key, boolean secondary) { super(key, secondary); } @Override OperationStatus execute(Transaction txn, Cursor c) throws DatabaseException { if (txn.isValid()) { if (secondary) { return ((SecondaryCursor)c).getSearchKeyRange( key, new DatabaseEntry(), new DatabaseEntry(), null); } else { return c.getSearchKey(key, new DatabaseEntry(), null); } } else { return null; } } } /* Delete */ class CursorDelete extends CursorOperation { CursorDelete(DatabaseEntry key, boolean secondary) { super(key, secondary); } @Override OperationStatus execute(Transaction txn, Cursor c) throws DatabaseException { if (txn.isValid()) { if (secondary) { if (((SecondaryCursor)c).getSearchKey( key, new DatabaseEntry(), new DatabaseEntry(), null) == OperationStatus.SUCCESS) { return ((SecondaryCursor)c).delete(); } } else { if (c.getSearchKey(key, new DatabaseEntry(), null) == OperationStatus.SUCCESS) { return c.delete(); } } } else { return null; } return null; } } class CursorComparator implements Comparator { @Override public int compare (Object obj1, Object obj2) { final CursorOperation cop1 = (CursorOperation) obj1; final CursorOperation cop2 = (CursorOperation) obj2; return (cop1.getKeyInt() - cop2.getKeyInt()); } } class MixedAccessThread extends Thread { boolean done = false; private int id; private CRUDGenerator cg; private int opsNumEachThread = dbSize / threadNum; /* * The records involved in each Txn of different threads may contain * the same record(s), but in order to avoid deadlock, we sort these * records by their int key and at the same time, guarantee that in * each txn, the int keys of records are different. */ private boolean sort = false; /* * In order to avoid deadlock, the records involved in each Txn of * different threads do not have intersection. */ private boolean noInteraction = false; MixedAccessThread(int id, int[] distribution, boolean sort, boolean noInteraction) { this.id = id; this.sort = sort; this.noInteraction = noInteraction; cg = new CRUDGenerator(id, distribution); } public void run() { long startTime = System.currentTimeMillis(); long count = 0; while (!done) { doOneTxnWithRetry(); count++; } long endTime = System.currentTimeMillis(); float elapsedSec = (float) ((endTime - startTime) / 1e3); float throughput = ((float) count) / elapsedSec; System.out.println ("Thread " + id + " finishes " + count + " iterations in: " + elapsedSec + " sec, average throughput: " + throughput + " op/sec."); } @SuppressWarnings("unchecked") public void doOneTxnWithRetry() { ArrayList ops = new ArrayList<>(); ArrayList keyInts = new ArrayList<>(); for (int i = 0; i < opNum; i++) { int keyInt = generateKeyInt(keyInts); DatabaseEntry key = new DatabaseEntry(); IntegerBinding.intToEntry(keyInt, key); CRUDTYPE op = cg.nextRandomCRUD(); switch (op) { case CREATE: ops.add(new Create(db, key)); break; case READ: ops.add(new Read(db, key)); break; case UPDATE: ops.add(new Update(db, key)); break; case DELETE: ops.add(new Delete(db, key)); break; default: throw new IllegalStateException("Unknown op: " + op); } } if (sort) { final CRUDOperation[] coArray = ops.toArray(new CRUDOperation[0]); final OpsComparator oc = new OpsComparator(); Arrays.sort(coArray, oc); ops.clear(); ops.addAll(Arrays.asList(coArray)); } int tries = 0; while (tries < maxRetry) { Transaction txn = env.beginTransaction(null, null); try { for (CRUDOperation crudOp: ops) { crudOp.execute(txn); } txn.commit(); break; } catch (LockConflictException e) { txn.abort(); tries++; } } //if (tries == maxRetry) { //if (tries > 0) { // System.out.println("Thread: " + id + " Retry times: " + tries); //} } private int generateKeyInt(ArrayList keyInts) { if (sort) { while (true) { boolean repeated = false; int tmp = cg.nextRandomKeyInt(dbSize); for (Integer I : keyInts) { if (I.intValue() == tmp) { repeated = true; break; } } if (!repeated) { keyInts.add(new Integer(tmp)); return tmp; } } } else if (noInteraction) { int tmp = cg.nextRandomKeyInt(opsNumEachThread); return tmp + opsNumEachThread * id; } else { return cg.nextRandomKeyInt(dbSize); } } public synchronized void setDone(boolean done) { this.done = done; } } class OpsComparator implements Comparator { @Override public int compare (Object obj1, Object obj2) { final CRUDOperation op1 = (CRUDOperation) obj1; final CRUDOperation op2 = (CRUDOperation) obj2; return (op1.getKeyInt() - op2.getKeyInt()); } } /* The type of possible operations. */ enum CRUDTYPE { CREATE, READ, UPDATE, DELETE; } class CRUDGenerator { int[] distribution; int id; Random opRandom; CRUDGenerator(int id, int distribution[]) { this.id = id; this.distribution = distribution; opRandom = new Random(System.currentTimeMillis() * id); int total = 0; for (int i = 0; i < distribution.length; i++) { total += distribution[i]; } if (total != 100) { throw new IllegalArgumentException( "Distribution should add to 100 not " + total); } } /* * Returns the next random CRUDTYPE based on the current * distribution setup. */ public CRUDTYPE nextRandomCRUD() { int rpercent = opRandom.nextInt(100); int total = 0; for (int i = 0; i < distribution.length; i++) { total += distribution[i]; if (rpercent < total) { switch (i) { case 0: return CRUDTYPE.CREATE; case 1: return CRUDTYPE.READ; case 2: return CRUDTYPE.UPDATE; case 3: return CRUDTYPE.DELETE; } } } throw new IllegalArgumentException("Something is wrong"); } public int nextRandomKeyInt(int range) { return opRandom.nextInt(range); } } abstract class CRUDOperation { protected final Database db; protected final DatabaseEntry key; public CRUDOperation(Database db, DatabaseEntry key) { this.db=db; this.key = key; } abstract OperationStatus execute(Transaction txn) throws DatabaseException; public int getKeyInt() { return IntegerBinding.entryToInt(key); } } /* Create */ class Create extends CRUDOperation { Create(Database db, DatabaseEntry key) { super(db, key); } @Override OperationStatus execute(Transaction txn) throws DatabaseException { DatabaseEntry dataEntry = new DatabaseEntry(new byte[10]); if (txn.isValid()) { return db.put(txn, key, dataEntry); } else { return null; } } } /* Read */ class Read extends CRUDOperation { Read(Database db, DatabaseEntry key) { super(db, key); } @Override OperationStatus execute(Transaction txn) throws DatabaseException { DatabaseEntry dataEntry = new DatabaseEntry(); if (txn.isValid()) { return db.get(txn, key, dataEntry, null); } else { return null; } } } /* Update */ class Update extends CRUDOperation { Update(Database db, DatabaseEntry key) { super(db, key); } @Override OperationStatus execute(Transaction txn) throws DatabaseException { DatabaseEntry dataEntry = new DatabaseEntry(new byte[10]); if (txn.isValid()) { return db.put(txn, key, dataEntry); } else { return null; } } } /* Delete */ class Delete extends CRUDOperation { Delete(Database db, DatabaseEntry key) { super(db, key); } @Override OperationStatus execute(Transaction txn) throws DatabaseException { if (txn.isValid()) { return db.delete(txn, key); } else { return null; } } } class AccessThread extends Thread { /** The identifier of the current thread. */ private int id; private int key1; private int key2; private int key3; /* * Determine whether do the read access or write access. * * 1. When deadlock is formed on one record, i.e. two threads first * read and then write. We need to let the first operation be * read access. * * 2. When two deadlock cycles involove the same locker, two threads * need to own the lock on one same record, so now we also need to * do the read request to let two threads own the read lock on the * same record. */ boolean firstRead; public AccessThread( int id, int key1, int key2, int key3, boolean firstRead) { this.id = id; this.key1 = key1; this.key2 = key2; this.key3 = key3; this.firstRead = firstRead; } /** * This thread is responsible for executing transactions. */ public void run() { try { startSignal.await(); long startTime = System.currentTimeMillis(); for (int op = 0; op < totalTxns; op++) { int tries = 0; while (tries < maxRetry) { Transaction txn = env.beginTransaction(null, null); try { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); IntegerBinding.intToEntry(key1, key); if (firstRead) { db.get(txn, key, data, LockMode.DEFAULT); } else { IntegerBinding.intToEntry(key1, data); db.put(txn, key, data); } IntegerBinding.intToEntry(key2, key); IntegerBinding.intToEntry(key2, data); db.put(txn, key, data); if (key3 > 0) { IntegerBinding.intToEntry(key3, key); IntegerBinding.intToEntry(key3, data); db.put(txn, key, data); } txn.commit(); break; } catch (LockConflictException e) { txn.abort(); tries++; } } /* if (verbose && tries > 0) { System.out.println( "Thread: " + id + " Retry times: " + tries); } */ } long endTime = System.currentTimeMillis(); float elapsedSec = (float) ((endTime - startTime) / 1e3); float throughput = ((float) totalTxns) / elapsedSec; System.out.println ("Thread " + id + " finishes " + totalTxns + " iterations in: " + elapsedSec + " sec, average throughput: " + throughput + " op/sec."); } catch (InterruptedException e) { e.printStackTrace(); } } } class AccessThreadBreakWhenDeadlock extends Thread { /** The identifier of the current thread. */ private int id; private int key1; private int key2; private int key3; /* * Determine whether do the read access or write access. * * 1. When deadlock is formed on one record, i.e. two threads first * read and then write. We need to let the first operation be * read access. * * 2. When two deadlock cycles involove the same locker, two threads * need to own the lock on one same record, so now we also need to * do the read request to let two threads own the read lock on the * same record. */ boolean firstRead; public AccessThreadBreakWhenDeadlock( int id, int key1, int key2, int key3, boolean firstRead) { this.id = id; this.key1 = key1; this.key2 = key2; this.key3 = key3; this.firstRead = firstRead; } /** * This thread is responsible for executing transactions. */ public void run() { try { startSignal.await(); for (int op = 0; op < totalTxns && !deadlockDone; op++) { Transaction txn = env.beginTransaction(null, null); try { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); IntegerBinding.intToEntry(key1, key); if (firstRead) { db.get(txn, key, data, LockMode.DEFAULT); } else { IntegerBinding.intToEntry(key1, data); db.put(txn, key, data); } IntegerBinding.intToEntry(key2, key); IntegerBinding.intToEntry(key2, data); db.put(txn, key, data); if (key3 > 0) { IntegerBinding.intToEntry(key3, key); IntegerBinding.intToEntry(key3, data); db.put(txn, key, data); } txn.commit(); } catch (LockConflictException e) { System.out.println(e.getMessage()); deadlockDone = true; txn.abort(); } } } catch (InterruptedException e) { e.printStackTrace(); } } } }