Browse Source

fixed deadlock when crypto workers die due to exceptions

Sebastian Stenzel 9 years ago
parent
commit
128a93d44e

+ 2 - 2
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java

@@ -497,7 +497,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 
 		// wait for decryption workers to finish:
 		try {
-			executor.waitUntilDone();
+			executor.waitUntilDone(1, TimeUnit.SECONDS);
 		} catch (ExecutionException e) {
 			final Throwable cause = e.getCause();
 			if (cause instanceof IOException) {
@@ -721,7 +721,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 
 		// wait for encryption workers to finish:
 		try {
-			executor.waitUntilDone();
+			executor.waitUntilDone(1, TimeUnit.SECONDS);
 		} catch (ExecutionException e) {
 			final Throwable cause = e.getCause();
 			if (cause instanceof IOException) {

+ 1 - 1
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorker.java

@@ -32,7 +32,6 @@ abstract class CryptoWorker implements Callable<Void> {
 			while (!Thread.currentThread().isInterrupted()) {
 				final BlocksData blocksData = queue.take();
 				if (blocksData == POISON) {
-					// put poison back in for other threads:
 					break;
 				}
 				final ByteBuffer processedBytes = this.process(blocksData);
@@ -52,6 +51,7 @@ abstract class CryptoWorker implements Callable<Void> {
 				}
 			}
 		} catch (InterruptedException e) {
+			// will happen for executorService.shutdownNow()
 			Thread.currentThread().interrupt();
 		}
 		return null;

+ 18 - 8
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java

@@ -6,6 +6,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,7 +28,7 @@ class CryptoWorkerExecutor {
 	private final BlockingQueue<BlocksData> inputQueue;
 	private final ExecutorService executorService;
 	private final CompletionService<Void> completionService;
-	private boolean acceptWork;
+	private volatile boolean acceptWork;
 
 	/**
 	 * Starts as many {@link CryptoWorker} as specified in the constructor, that start working immediately on the items submitted via {@link #offer(BlocksData, long, TimeUnit)}.
@@ -64,7 +65,7 @@ class CryptoWorkerExecutor {
 			if (!success) {
 				this.acceptWork = false;
 				inputQueue.clear();
-				poisonWorkers();
+				poisonWorkers(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
 			}
 			return success;
 		} catch (InterruptedException e) {
@@ -78,15 +79,22 @@ class CryptoWorkerExecutor {
 	/**
 	 * Graceful shutdown of this executor, waiting for all jobs to finish (normally or by throwing exceptions).
 	 * 
+	 * @param timeout Maximum time spent <em>per worker</em> to wait for a graceful shutdown (technically worst case is: <code>2 * numWorkers * time</code>)
+	 * @param unit Timeout unit
 	 * @throws ExecutionException If any of the workers failed.
 	 */
-	public void waitUntilDone() throws ExecutionException {
+	public void waitUntilDone(long timeout, TimeUnit unit) throws ExecutionException {
 		this.acceptWork = false;
 		try {
-			poisonWorkers();
+			// fail fast, if workers are done before being poisoned (i.e. exceptionally):
+			for (Future<Void> task = completionService.poll(); task != null; task = completionService.poll()) {
+				task.get(); // this will most likely throw an ExecutionException
+			}
+			// if we got to this point without any exception, all workers are still running, so lets poison them:
+			poisonWorkers(timeout, unit);
 			// now workers will one after another finish their work, potentially throwing an ExecutionException:
-			for (int i = 0; i < numWorkers; i++) {
-				completionService.take().get();
+			for (Future<Void> task = completionService.poll(timeout, unit); task != null; task = completionService.poll(timeout, unit)) {
+				task.get();
 			}
 		} catch (InterruptedException e) {
 			LOG.error("Interrupted thread.", e);
@@ -97,10 +105,12 @@ class CryptoWorkerExecutor {
 		}
 	}
 
-	private void poisonWorkers() throws InterruptedException {
+	private void poisonWorkers(long timeout, TimeUnit unit) throws InterruptedException {
 		// add enough poison for each worker:
 		for (int i = 0; i < numWorkers; i++) {
-			inputQueue.put(CryptoWorker.POISON);
+			if (!inputQueue.offer(CryptoWorker.POISON, timeout, unit)) {
+				break;
+			}
 		}
 	}