Parcourir la source

(now really) fixed deadlock when crypto workers die due to exceptions

Sebastian Stenzel il y a 9 ans
Parent
commit
e4220246ab

+ 57 - 13
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java

@@ -1,6 +1,10 @@
 package org.cryptomator.crypto.aes256;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
@@ -27,7 +31,7 @@ class CryptoWorkerExecutor {
 	private final AtomicLong currentBlock;
 	private final BlockingQueue<BlocksData> inputQueue;
 	private final ExecutorService executorService;
-	private final CompletionService<Void> completionService;
+	private final Future<Void> allWork;
 	private volatile boolean acceptWork;
 
 	/**
@@ -40,14 +44,17 @@ class CryptoWorkerExecutor {
 		this.currentBlock = new AtomicLong();
 		this.inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
 		this.executorService = Executors.newFixedThreadPool(numWorkers);
-		this.completionService = new ExecutorCompletionService<>(executorService);
 		this.acceptWork = true;
 
 		// start workers:
+		final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
+		final Collection<Future<?>> workers = new ArrayList<>(numWorkers);
 		for (int i = 0; i < numWorkers; i++) {
 			final CryptoWorker worker = workerFactory.createWorker(lock, blockDone, currentBlock, inputQueue);
-			completionService.submit(worker);
+			workers.add(completionService.submit(worker));
 		}
+		final Supervisor supervisor = new Supervisor(workers, completionService);
+		this.allWork = executorService.submit(supervisor);
 	}
 
 	/**
@@ -79,22 +86,20 @@ class CryptoWorkerExecutor {
 	/**
 	 * Graceful shutdown of this executor, waiting for all jobs to finish (normally or by throwing exceptions).
 	 * 
-	 * @param timeout Maximum time spent <em>per worker</em> to wait for a graceful shutdown (technically worst case is: <code>2 * numWorkers * time</code>)
+	 * @param timeout Maximum time spent <em>per worker</em> to wait for a graceful shutdown
 	 * @param unit Timeout unit
 	 * @throws ExecutionException If any of the workers failed.
 	 */
 	public void waitUntilDone(long timeout, TimeUnit unit) throws ExecutionException {
 		this.acceptWork = false;
 		try {
-			// fail fast, if workers are done before being poisoned (i.e. exceptionally):
-			for (Future<Void> task = completionService.poll(); task != null; task = completionService.poll()) {
-				task.get(); // this will most likely throw an ExecutionException
-			}
-			// if we got to this point without any exception, all workers are still running, so lets poison them:
-			poisonWorkers(timeout, unit);
-			// now workers will one after another finish their work, potentially throwing an ExecutionException:
-			for (Future<Void> task = completionService.poll(timeout, unit); task != null; task = completionService.poll(timeout, unit)) {
-				task.get();
+			if (allWork.isDone()) {
+				// Work is done before workers being poisoned? This will most likely throw an ExecutionException:
+				allWork.get();
+			} else {
+				// Work not done yet, enqueue poison pill and wait for workers to finish:
+				poisonWorkers(timeout, unit);
+				allWork.get();
 			}
 		} catch (InterruptedException e) {
 			LOG.error("Interrupted thread.", e);
@@ -119,4 +124,43 @@ class CryptoWorkerExecutor {
 		CryptoWorker createWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<BlocksData> inputQueue);
 	}
 
+	/**
+	 * A supervisor watches the work results of a collection of workers. The supervisor waits for all workers to finish.
+	 * The supvervisor itself does not cause any exceptions, but if <em>one</em> worker fails, all other workers are cancelled immediately and the exception propagates through this supvervisor.
+	 * Anyone waiting for the supervisor to finish will thus effectively wait for all supvervisees to finish.
+	 */
+	private static class Supervisor implements Callable<Void> {
+
+		private final Collection<Future<?>> workers;
+		private final CompletionService<?> completionService;
+
+		public Supervisor(Collection<Future<?>> workers, CompletionService<?> completionService) {
+			this.workers = workers;
+			this.completionService = completionService;
+		}
+
+		@Override
+		public Void call() throws ExecutionException {
+			try {
+				for (int i = 0; i < workers.size(); i++) {
+					try {
+						// any ExecutionException thrown here will propagate up (after work is canceled in finally block)
+						completionService.take().get();
+					} catch (CancellationException ignore) {
+					}
+				}
+			} catch (InterruptedException e) {
+				// supervisor may be interrupted when executorservice is shut down.
+				Thread.currentThread().interrupt();
+			} finally {
+				// make sure, that at the end of the day all remaining workers leave the building.
+				for (Future<?> worker : workers) {
+					worker.cancel(true);
+				}
+			}
+			// no exception up to this point -> all workers finished work normally.
+			return null;
+		}
+	}
+
 }