Kaynağa Gözat

cancel work if poisoning (i.e. graceful termination) fails after timeout

Sebastian Stenzel 9 yıl önce
ebeveyn
işleme
fc06595977

+ 2 - 0
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java

@@ -506,6 +506,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 				throw (RuntimeException) cause;
 			} else {
 				LOG.error("Unexpected exception", e);
+				throw new RuntimeException(cause);
 			}
 		} finally {
 			destroyQuietly(fileKey);
@@ -730,6 +731,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 				throw (RuntimeException) cause;
 			} else {
 				LOG.error("Unexpected exception", e);
+				throw new RuntimeException(cause);
 			}
 		} finally {
 			destroyQuietly(fileKey);

+ 14 - 12
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java

@@ -32,7 +32,6 @@ class CryptoWorkerExecutor {
 	private final BlockingQueue<BlocksData> inputQueue;
 	private final ExecutorService executorService;
 	private final Future<Void> allWork;
-	private volatile boolean acceptWork;
 
 	/**
 	 * Starts as many {@link CryptoWorker} as specified in the constructor, that start working immediately on the items submitted via {@link #offer(BlocksData, long, TimeUnit)}.
@@ -44,7 +43,6 @@ class CryptoWorkerExecutor {
 		this.currentBlock = new AtomicLong();
 		this.inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
 		this.executorService = Executors.newFixedThreadPool(numWorkers);
-		this.acceptWork = true;
 
 		// start workers:
 		final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
@@ -64,15 +62,15 @@ class CryptoWorkerExecutor {
 	 * @return <code>true</code> if the work has been added in time. <code>false</code> in any other case.
 	 */
 	public boolean offer(BlocksData data, long timeout, TimeUnit unit) {
-		if (!acceptWork) {
+		if (allWork.isDone()) {
 			return false;
 		}
 		try {
 			final boolean success = inputQueue.offer(data, timeout, unit);
 			if (!success) {
-				this.acceptWork = false;
+				LOG.error("inputQueue is full.");
 				inputQueue.clear();
-				poisonWorkers(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+				allWork.cancel(true);
 			}
 			return success;
 		} catch (InterruptedException e) {
@@ -91,32 +89,36 @@ class CryptoWorkerExecutor {
 	 * @throws ExecutionException If any of the workers failed.
 	 */
 	public void waitUntilDone(long timeout, TimeUnit unit) throws ExecutionException {
-		this.acceptWork = false;
 		try {
 			if (allWork.isDone()) {
 				// Work is done before workers being poisoned? This will most likely throw an ExecutionException:
 				allWork.get();
+			} else if (!poisonWorkers(timeout, unit)) {
+				// Attempt to enqueue poison pill for all workers failed:
+				allWork.cancel(true);
 			} else {
-				// Work not done yet, enqueue poison pill and wait for workers to finish:
-				poisonWorkers(timeout, unit);
+				// All poisons enqueued successfully. Now wait for termination by poison or exception:
 				allWork.get();
 			}
 		} catch (InterruptedException e) {
 			LOG.error("Interrupted thread.", e);
 			Thread.currentThread().interrupt();
+		} catch (CancellationException e) {
+			throw new ExecutionException("Work canceled", e);
 		} finally {
-			// shutdown either after normal decryption or if ANY worker threw an exception:
+			// in any case (normal or exceptional execution): shutdown executor including all workers and supervisor:
 			executorService.shutdownNow();
 		}
 	}
 
-	private void poisonWorkers(long timeout, TimeUnit unit) throws InterruptedException {
-		// add enough poison for each worker:
+	private boolean poisonWorkers(long timeout, TimeUnit unit) throws InterruptedException {
+		// add enough poison for each worker; each worker will consume excatly one:
 		for (int i = 0; i < numWorkers; i++) {
 			if (!inputQueue.offer(CryptoWorker.POISON, timeout, unit)) {
-				break;
+				return false;
 			}
 		}
+		return true;
 	}
 
 	@FunctionalInterface