Procházet zdrojové kódy

reject execution when shut down

Sebastian Stenzel před 9 roky
rodič
revize
c3652a22a0

+ 16 - 23
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java

@@ -15,6 +15,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -22,12 +24,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.cryptomator.common.UncheckedInterruptedException;
 
 /**
- * Executes long-running computations and returns the result strictly in order
- * of the job submissions, no matter how long each job takes.
+ * Executes long-running computations and returns the result strictly in order of the job submissions, no matter how long each job takes.
  * 
- * The internally used thread pool is shut down automatically as soon as this
- * FifiParallelDataProcessor is no longer referenced (see Finalization behaviour
- * of {@link ThreadPoolExecutor}).
+ * The internally used thread pool is shut down automatically as soon as this FifiParallelDataProcessor is no longer referenced (see Finalization behaviour of {@link ThreadPoolExecutor}).
  */
 class FifoParallelDataProcessor<T> {
 
@@ -37,11 +36,8 @@ class FifoParallelDataProcessor<T> {
 	private final ExecutorService executorService;
 
 	/**
-	 * @param numThreads
-	 *            How many jobs can run in parallel.
-	 * @param workQueueSize
-	 *            Maximum number of jobs accepted without blocking, when no
-	 *            results are polled from {@link #processedData()}.
+	 * @param numThreads How many jobs can run in parallel.
+	 * @param workQueueSize Maximum number of jobs accepted without blocking, when no results are polled from {@link #processedData()}.
 	 */
 	public FifoParallelDataProcessor(int numThreads, int workQueueSize) {
 		this.workQueue = new ArrayBlockingQueue<>(workQueueSize);
@@ -49,12 +45,15 @@ class FifoParallelDataProcessor<T> {
 	}
 
 	/**
-	 * Enqueues tasks into the blocking queue, if they can not be executed
-	 * immediately.
+	 * Enqueues tasks into the blocking queue, if they can not be executed immediately.
 	 * 
 	 * @see ThreadPoolExecutor#execute(Runnable)
+	 * @see RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)
 	 */
 	private void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+		if (executor.isShutdown()) {
+			throw new RejectedExecutionException("Executor has been shut down.");
+		}
 		try {
 			this.workQueue.put(r);
 		} catch (InterruptedException e) {
@@ -63,11 +62,9 @@ class FifoParallelDataProcessor<T> {
 	}
 
 	/**
-	 * Enqueues a job for execution. The results of multiple submissions can be
-	 * polled in FIFO order using {@link #processedData()}.
+	 * Enqueues a job for execution. The results of multiple submissions can be polled in FIFO order using {@link #processedData()}.
 	 * 
-	 * @param processingJob
-	 *            A task, that will compute a result.
+	 * @param processingJob A task, that will compute a result.
 	 * @throws InterruptedException
 	 */
 	void submit(Callable<T> processingJob) throws InterruptedException {
@@ -80,8 +77,7 @@ class FifoParallelDataProcessor<T> {
 	}
 
 	/**
-	 * Submits already pre-processed data, that can be polled in FIFO order from
-	 * {@link #processedData()}.
+	 * Submits already pre-processed data, that can be polled in FIFO order from {@link #processedData()}.
 	 * 
 	 * @throws InterruptedException
 	 */
@@ -92,13 +88,10 @@ class FifoParallelDataProcessor<T> {
 	}
 
 	/**
-	 * Result of previously {@link #submit(Callable) submitted} jobs in the same
-	 * order as they have been submitted. Blocks if the job didn't finish yet.
+	 * Result of previously {@link #submit(Callable) submitted} jobs in the same order as they have been submitted. Blocks if the job didn't finish yet.
 	 * 
 	 * @return Next job result
-	 * @throws InterruptedException
-	 *             If the calling thread was interrupted while waiting for the
-	 *             next result.
+	 * @throws InterruptedException If the calling thread was interrupted while waiting for the next result.
 	 */
 	T processedData() throws InterruptedException {
 		return processedData.take().get();

+ 13 - 0
main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java

@@ -8,7 +8,10 @@
  *******************************************************************************/
 package org.cryptomator.crypto.engine.impl;
 
+import java.lang.reflect.Field;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
@@ -29,6 +32,16 @@ public class FifoParallelDataProcessorTest {
 		processor.processedData();
 	}
 
+	@Test(expected = RejectedExecutionException.class)
+	public void testRejectExecutionAfterShutdown() throws InterruptedException, ReflectiveOperationException, SecurityException {
+		FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
+		Field field = FifoParallelDataProcessor.class.getDeclaredField("executorService");
+		field.setAccessible(true);
+		ExecutorService executorService = (ExecutorService) field.get(processor);
+		executorService.shutdownNow();
+		processor.submit(new IntegerJob(0, 1));
+	}
+
 	@Test
 	public void testStrictFifoOrder() throws InterruptedException {
 		FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(4, 10);