Ver código fonte

Simplified FifoParallelDataProcessor

Sebastian Stenzel 9 anos atrás
pai
commit
9665ca8dff

+ 14 - 58
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java

@@ -13,13 +13,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.cryptomator.common.UncheckedInterruptedException;
 
@@ -30,35 +26,16 @@ import org.cryptomator.common.UncheckedInterruptedException;
  */
 class FifoParallelDataProcessor<T> {
 
-	private final BlockingQueue<SequencedFutureResult> processedData = new PriorityBlockingQueue<>();
-	private final AtomicLong jobSequence = new AtomicLong();
-	private final BlockingQueue<Runnable> workQueue;
+	private final BlockingQueue<Future<T>> processedData;
 	private final ExecutorService executorService;
 
 	/**
 	 * @param numThreads How many jobs can run in parallel.
-	 * @param workQueueSize Maximum number of jobs accepted without blocking, when no results are polled from {@link #processedData()}.
+	 * @param workAhead Maximum number of jobs accepted in {@link #submit(Callable)} without blocking until results are polled from {@link #processedData()}.
 	 */
-	public FifoParallelDataProcessor(int numThreads, int workQueueSize) {
-		this.workQueue = new ArrayBlockingQueue<>(workQueueSize);
-		this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 1, TimeUnit.SECONDS, workQueue, this::rejectedExecution);
-	}
-
-	/**
-	 * Enqueues tasks into the blocking queue, if they can not be executed immediately.
-	 * 
-	 * @see ThreadPoolExecutor#execute(Runnable)
-	 * @see RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)
-	 */
-	private void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-		if (executor.isShutdown()) {
-			throw new RejectedExecutionException("Executor has been shut down.");
-		}
-		try {
-			this.workQueue.put(r);
-		} catch (InterruptedException e) {
-			throw new UncheckedInterruptedException(e);
-		}
+	public FifoParallelDataProcessor(int numThreads, int workAhead) {
+		this.processedData = new ArrayBlockingQueue<>(workAhead);
+		this.executorService = Executors.newFixedThreadPool(numThreads);
 	}
 
 	/**
@@ -70,7 +47,7 @@ class FifoParallelDataProcessor<T> {
 	void submit(Callable<T> processingJob) throws InterruptedException {
 		try {
 			Future<T> future = executorService.submit(processingJob);
-			processedData.offer(new SequencedFutureResult(future, jobSequence.getAndIncrement()));
+			processedData.put(future);
 		} catch (UncheckedInterruptedException e) {
 			throw e.getCause();
 		}
@@ -94,36 +71,15 @@ class FifoParallelDataProcessor<T> {
 	 * @throws InterruptedException If the calling thread was interrupted while waiting for the next result.
 	 */
 	T processedData() throws InterruptedException {
-		return processedData.take().get();
-	}
-
-	private class SequencedFutureResult implements Comparable<SequencedFutureResult> {
-
-		private final Future<T> result;
-		private final long sequenceNumber;
-
-		public SequencedFutureResult(Future<T> result, long sequenceNumber) {
-			this.result = result;
-			this.sequenceNumber = sequenceNumber;
-		}
-
-		public T get() throws InterruptedException {
-			try {
-				return result.get();
-			} catch (ExecutionException e) {
-				if (e.getCause() instanceof RuntimeException) {
-					throw (RuntimeException) e.getCause();
-				} else {
-					throw new RuntimeException(e);
-				}
+		try {
+			return processedData.take().get();
+		} catch (ExecutionException e) {
+			if (e.getCause() instanceof RuntimeException) {
+				throw (RuntimeException) e.getCause();
+			} else {
+				throw new RuntimeException(e);
 			}
 		}
-
-		@Override
-		public int compareTo(SequencedFutureResult other) {
-			return Long.compare(this.sequenceNumber, other.sequenceNumber);
-		}
-
 	}
 
 }

+ 7 - 11
main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java

@@ -63,36 +63,33 @@ public class FifoParallelDataProcessorTest {
 	@Test
 	public void testBlockingBehaviour() throws InterruptedException {
 		FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
-		processor.submit(new IntegerJob(100, 1)); // runs immediatley
-		processor.submit(new IntegerJob(100, 2)); // #1 in queue
+		processor.submitPreprocessed(1); // #1 in queue
 
 		Thread t1 = new Thread(() -> {
 			try {
-				processor.submit(new IntegerJob(10, 3)); // #2 in queue
+				processor.submitPreprocessed(2); // #2 in queue
 			} catch (InterruptedException e) {
 				Thread.currentThread().interrupt();
 			}
 		});
 		t1.start();
 		t1.join(10);
-		// job 3 should not have been submitted by now, thus t1 is still alive
+		// job 2 should not have been submitted by now, thus t1 is still alive
 		Assert.assertTrue(t1.isAlive());
 		Assert.assertEquals(1, (int) processor.processedData());
 		Assert.assertEquals(2, (int) processor.processedData());
-		Assert.assertEquals(3, (int) processor.processedData());
-		Assert.assertFalse(t1.isAlive());
+		t1.join();
 	}
 
 	@Test
 	public void testInterruptionDuringSubmission() throws InterruptedException {
 		FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
-		processor.submit(new IntegerJob(100, 1)); // runs immediatley
-		processor.submit(new IntegerJob(100, 2)); // #1 in queue
+		processor.submitPreprocessed(1); // #1 in queue
 
 		final AtomicBoolean interruptedExceptionThrown = new AtomicBoolean(false);
 		Thread t1 = new Thread(() -> {
 			try {
-				processor.submit(new IntegerJob(10, 3)); // #2 in queue
+				processor.submitPreprocessed(2); // #2 in queue
 			} catch (InterruptedException e) {
 				interruptedExceptionThrown.set(true);
 				Thread.currentThread().interrupt();
@@ -102,11 +99,10 @@ public class FifoParallelDataProcessorTest {
 		t1.join(10);
 		t1.interrupt();
 		t1.join(10);
-		// job 3 should not have been submitted by now, thus t1 is still alive
+		// job 2 should not have been submitted by now, thus t1 is still alive
 		Assert.assertFalse(t1.isAlive());
 		Assert.assertTrue(interruptedExceptionThrown.get());
 		Assert.assertEquals(1, (int) processor.processedData());
-		Assert.assertEquals(2, (int) processor.processedData());
 	}
 
 	private static class IntegerJob implements Callable<Integer> {

+ 37 - 27
main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FileContentCryptorTest.java

@@ -104,7 +104,7 @@ public class FileContentCryptorTest {
 		Assert.assertArrayEquals("cleartext message".getBytes(), result);
 	}
 
-	@Test
+	@Test(timeout = 20000) // assuming a minimum speed of 10mb/s during encryption and decryption 20s should be enough
 	public void testEncryptionAndDecryptionSpeed() throws InterruptedException, IOException {
 		final byte[] keyBytes = new byte[32];
 		final SecretKey encryptionKey = new SecretKeySpec(keyBytes, "AES");
@@ -112,49 +112,59 @@ public class FileContentCryptorTest {
 		final FileContentCryptor cryptor = new FileContentCryptorImpl(encryptionKey, macKey, RANDOM_MOCK);
 		final Path tmpFile = Files.createTempFile("encrypted", ".tmp");
 
+		final Thread fileWriter;
 		final ByteBuffer header;
 		final long encStart = System.nanoTime();
-		try (FileContentEncryptor encryptor = cryptor.createFileContentEncryptor(Optional.empty()); FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.WRITE)) {
-			final ByteBuffer cleartext = ByteBuffer.allocate(32768); // 32k
-			ByteBuffer ciphertext;
-			for (int i = 0; i < 4096; i++) { // 128M total
+		try (FileContentEncryptor encryptor = cryptor.createFileContentEncryptor(Optional.empty())) {
+			fileWriter = new Thread(() -> {
+				try (FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.WRITE)) {
+					ByteBuffer ciphertext;
+					while ((ciphertext = encryptor.ciphertext()) != FileContentCryptor.EOF) {
+						fc.write(ciphertext);
+					}
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+			});
+			fileWriter.start();
+
+			final ByteBuffer cleartext = ByteBuffer.allocate(100000); // 100k
+			for (int i = 0; i < 1000; i++) { // 100M total
 				cleartext.rewind();
 				encryptor.append(cleartext);
-				if (i > Runtime.getRuntime().availableProcessors()) {
-					ciphertext = encryptor.ciphertext();
-					Assert.assertEquals(32 * 1024 + 32, ciphertext.remaining());
-					fc.write(ciphertext);
-				}
 			}
 			encryptor.append(FileContentCryptor.EOF);
-			while ((ciphertext = encryptor.ciphertext()) != FileContentCryptor.EOF) {
-				fc.write(ciphertext);
-			}
 			header = encryptor.getHeader();
 		}
+		fileWriter.join();
 		final long encEnd = System.nanoTime();
-		LOG.debug("Encryption of 128M took {}ms", (encEnd - encStart) / 1000 / 1000);
+		LOG.debug("Encryption of 100M took {}ms", (encEnd - encStart) / 1000 / 1000);
 
+		final Thread fileReader;
 		final long decStart = System.nanoTime();
-		try (FileContentDecryptor decryptor = cryptor.createFileContentDecryptor(header); FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.READ)) {
-			final ByteBuffer ciphertext = ByteBuffer.allocate(654321);
-			ByteBuffer cleartext;
-			for (int i = 0; fc.read(ciphertext) != -1; i++) {
-				ciphertext.flip();
-				decryptor.append(ciphertext);
-				ciphertext.clear();
-				if (i > Runtime.getRuntime().availableProcessors()) {
-					cleartext = decryptor.cleartext();
-					Assert.assertTrue(cleartext.hasRemaining());
+		try (FileContentDecryptor decryptor = cryptor.createFileContentDecryptor(header)) {
+			fileReader = new Thread(() -> {
+				try (FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.READ)) {
+					ByteBuffer ciphertext = ByteBuffer.allocate(654321);
+					while (fc.read(ciphertext) != -1) {
+						ciphertext.flip();
+						decryptor.append(ciphertext);
+						ciphertext.clear();
+					}
+					decryptor.append(FileContentCryptor.EOF);
+				} catch (Exception e) {
+					e.printStackTrace();
 				}
-			}
-			decryptor.append(FileContentCryptor.EOF);
+			});
+			fileReader.start();
+
 			while (decryptor.cleartext() != FileContentCryptor.EOF) {
 				// no-op
 			}
 		}
+		fileReader.join();
 		final long decEnd = System.nanoTime();
-		LOG.debug("Decryption of 128M took {}ms", (decEnd - decStart) / 1000 / 1000);
+		LOG.debug("Decryption of 100M took {}ms", (decEnd - decStart) / 1000 / 1000);
 		Files.delete(tmpFile);
 	}
 }