|
@@ -11,10 +11,15 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
+import org.cryptomator.common.UncheckedInterruptedException;
|
|
|
+
|
|
|
/**
|
|
|
- * Executes long-running computations and returns the result strictly in order of the job submissions, no matter how long each job takes.
|
|
|
+ * Executes long-running computations and returns the result strictly in order
|
|
|
+ * of the job submissions, no matter how long each job takes.
|
|
|
*
|
|
|
- * The internally used thread pool is shut down automatically as soon as this FifiParallelDataProcessor is no longer referenced (see Finalization behaviour of {@link ThreadPoolExecutor}).
|
|
|
+ * The internally used thread pool is shut down automatically as soon as this
|
|
|
+ * FifiParallelDataProcessor is no longer referenced (see Finalization behaviour
|
|
|
+ * of {@link ThreadPoolExecutor}).
|
|
|
*/
|
|
|
class FifoParallelDataProcessor<T> {
|
|
|
|
|
@@ -24,8 +29,11 @@ class FifoParallelDataProcessor<T> {
|
|
|
private final ExecutorService executorService;
|
|
|
|
|
|
/**
|
|
|
- * @param numThreads How many jobs can run in parallel.
|
|
|
- * @param workQueueSize Maximum number of jobs accepted without blocking, when no results are polled from {@link #processedData()}.
|
|
|
+ * @param numThreads
|
|
|
+ * How many jobs can run in parallel.
|
|
|
+ * @param workQueueSize
|
|
|
+ * Maximum number of jobs accepted without blocking, when no
|
|
|
+ * results are polled from {@link #processedData()}.
|
|
|
*/
|
|
|
public FifoParallelDataProcessor(int numThreads, int workQueueSize) {
|
|
|
this.workQueue = new ArrayBlockingQueue<>(workQueueSize);
|
|
@@ -33,7 +41,8 @@ class FifoParallelDataProcessor<T> {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Enqueues tasks into the blocking queue, if they can not be executed immediately.
|
|
|
+ * Enqueues tasks into the blocking queue, if they can not be executed
|
|
|
+ * immediately.
|
|
|
*
|
|
|
* @see ThreadPoolExecutor#execute(Runnable)
|
|
|
*/
|
|
@@ -41,27 +50,30 @@ class FifoParallelDataProcessor<T> {
|
|
|
try {
|
|
|
this.workQueue.put(r);
|
|
|
} catch (InterruptedException e) {
|
|
|
- throw new SneakyInterruptedException(e);
|
|
|
+ throw new UncheckedInterruptedException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Enqueues a job for execution. The results of multiple submissions can be polled in FIFO order using {@link #processedData()}.
|
|
|
+ * Enqueues a job for execution. The results of multiple submissions can be
|
|
|
+ * polled in FIFO order using {@link #processedData()}.
|
|
|
*
|
|
|
- * @param processingJob A task, that will compute a result.
|
|
|
+ * @param processingJob
|
|
|
+ * A task, that will compute a result.
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
void submit(Callable<T> processingJob) throws InterruptedException {
|
|
|
try {
|
|
|
Future<T> future = executorService.submit(processingJob);
|
|
|
processedData.offer(new SequencedFutureResult(future, jobSequence.getAndIncrement()));
|
|
|
- } catch (SneakyInterruptedException e) {
|
|
|
+ } catch (UncheckedInterruptedException e) {
|
|
|
throw e.getCause();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Submits already pre-processed data, that can be polled in FIFO order from {@link #processedData()}.
|
|
|
+ * Submits already pre-processed data, that can be polled in FIFO order from
|
|
|
+ * {@link #processedData()}.
|
|
|
*
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
@@ -72,10 +84,13 @@ class FifoParallelDataProcessor<T> {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Result of previously {@link #submit(Callable) submitted} jobs in the same order as they have been submitted. Blocks if the job didn't finish yet.
|
|
|
+ * Result of previously {@link #submit(Callable) submitted} jobs in the same
|
|
|
+ * order as they have been submitted. Blocks if the job didn't finish yet.
|
|
|
*
|
|
|
* @return Next job result
|
|
|
- * @throws InterruptedException If the calling thread was interrupted while waiting for the next result.
|
|
|
+ * @throws InterruptedException
|
|
|
+ * If the calling thread was interrupted while waiting for the
|
|
|
+ * next result.
|
|
|
*/
|
|
|
T processedData() throws InterruptedException {
|
|
|
return processedData.take().get();
|
|
@@ -110,19 +125,4 @@ class FifoParallelDataProcessor<T> {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private static class SneakyInterruptedException extends RuntimeException {
|
|
|
-
|
|
|
- private static final long serialVersionUID = 331817765088138556L;
|
|
|
-
|
|
|
- public SneakyInterruptedException(InterruptedException cause) {
|
|
|
- super(cause);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public InterruptedException getCause() {
|
|
|
- return (InterruptedException) super.getCause();
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
}
|