Pārlūkot izejas kodu

Adding blocking behaviour when appending data to be en/decrypted.
Using composite instead of inheritance for FileContentDecryptorImpl and FileContentEncryptorImpl

Sebastian Stenzel 9 gadi atpakaļ
vecāks
revīzija
80e1185325

+ 1 - 1
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentDecryptor.java

@@ -21,7 +21,7 @@ public interface FileContentDecryptor extends Destroyable, Closeable {
 	 * @param cleartext Cleartext data or {@link FileContentCryptor#EOF} to indicate the end of a ciphertext.
 	 * @see #skipToPosition(long)
 	 */
-	void append(ByteBuffer ciphertext);
+	void append(ByteBuffer ciphertext) throws InterruptedException;
 
 	/**
 	 * Returns the next decrypted cleartext in byte-by-byte FIFO order, meaning in the order ciphertext has been appended to this encryptor.

+ 1 - 1
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentEncryptor.java

@@ -23,7 +23,7 @@ public interface FileContentEncryptor extends Destroyable, Closeable {
 	 * 
 	 * @param cleartext Cleartext data or {@link FileContentCryptor#EOF} to indicate the end of a cleartext.
 	 */
-	void append(ByteBuffer cleartext);
+	void append(ByteBuffer cleartext) throws InterruptedException;
 
 	/**
 	 * Returns the next ciphertext in byte-by-byte FIFO order, meaning in the order cleartext has been appended to this encryptor.

+ 0 - 60
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/AbstractFileContentProcessor.java

@@ -1,60 +0,0 @@
-package org.cryptomator.crypto.engine.impl;
-
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang3.concurrent.ConcurrentUtils;
-
-abstract class AbstractFileContentProcessor implements Closeable {
-
-	private static final int NUM_WORKERS = Runtime.getRuntime().availableProcessors();
-	private static final int READ_AHEAD = 0;
-
-	private final BlockingQueue<BytesWithSequenceNumber> processedData = new PriorityBlockingQueue<>();
-	private final BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(NUM_WORKERS + READ_AHEAD);
-	private final ExecutorService executorService = new ThreadPoolExecutor(1, NUM_WORKERS, 1, TimeUnit.SECONDS, workQueue);
-	private final AtomicLong jobSequence = new AtomicLong();
-
-	/**
-	 * Enqueues a job for execution. The results of multiple submissions can be polled in FIFO order using {@link #processedData()}.
-	 * 
-	 * @param processingJob A ByteBuffer-generating task.
-	 */
-	protected void submit(Callable<ByteBuffer> processingJob) {
-		Future<ByteBuffer> result = executorService.submit(processingJob);
-		processedData.offer(new BytesWithSequenceNumber(result, jobSequence.getAndIncrement()));
-	}
-
-	/**
-	 * Submits already processed data, that can be polled in FIFO order from {@link #processedData()}.
-	 */
-	protected void submitPreprocessed(ByteBuffer preprocessedData) {
-		Future<ByteBuffer> resolvedFuture = ConcurrentUtils.constantFuture(preprocessedData);
-		processedData.offer(new BytesWithSequenceNumber(resolvedFuture, jobSequence.getAndIncrement()));
-	}
-
-	/**
-	 * Result of previously {@link #submit(Callable) submitted} jobs in the same order as they have been submitted. Blocks if the job didn't finish yet.
-	 * 
-	 * @return Next job result
-	 * @throws InterruptedException If the calling thread was interrupted while waiting for the next result.
-	 */
-	protected ByteBuffer processedData() throws InterruptedException {
-		return processedData.take().get();
-	}
-
-	@Override
-	public void close() {
-		executorService.shutdown();
-	}
-
-}

+ 0 - 31
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/BytesWithSequenceNumber.java

@@ -1,31 +0,0 @@
-package org.cryptomator.crypto.engine.impl;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-class BytesWithSequenceNumber implements Comparable<BytesWithSequenceNumber> {
-
-	private final Future<ByteBuffer> byteBuffer;
-	private final long sequenceNumber;
-
-	public BytesWithSequenceNumber(Future<ByteBuffer> byteBuffer, long sequenceNumber) {
-		this.byteBuffer = byteBuffer;
-		this.sequenceNumber = sequenceNumber;
-	}
-
-	public ByteBuffer get() throws InterruptedException {
-		try {
-			return byteBuffer.get();
-		} catch (ExecutionException e) {
-			assert e.getCause() instanceof RuntimeException;
-			throw (RuntimeException) e.getCause();
-		}
-	}
-
-	@Override
-	public int compareTo(BytesWithSequenceNumber other) {
-		return Long.compare(this.sequenceNumber, other.sequenceNumber);
-	}
-
-}

+ 128 - 0
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java

@@ -0,0 +1,128 @@
+package org.cryptomator.crypto.engine.impl;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Executes long-running computations and returns the result strictly in order of the job submissions, no matter how long each job takes.
+ * 
+ * The internally used thread pool is shut down automatically as soon as this FifiParallelDataProcessor is no longer referenced (see Finalization behaviour of {@link ThreadPoolExecutor}).
+ */
+class FifoParallelDataProcessor<T> {
+
+	private final BlockingQueue<SequencedFutureResult> processedData = new PriorityBlockingQueue<>();
+	private final AtomicLong jobSequence = new AtomicLong();
+	private final BlockingQueue<Runnable> workQueue;
+	private final ExecutorService executorService;
+
+	/**
+	 * @param numThreads How many jobs can run in parallel.
+	 * @param workQueueSize Maximum number of jobs accepted without blocking, when no results are polled from {@link #processedData()}.
+	 */
+	public FifoParallelDataProcessor(int numThreads, int workQueueSize) {
+		this.workQueue = new ArrayBlockingQueue<>(workQueueSize);
+		this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 1, TimeUnit.SECONDS, workQueue, this::rejectedExecution);
+	}
+
+	/**
+	 * Enqueues tasks into the blocking queue, if they can not be executed immediately.
+	 * 
+	 * @see ThreadPoolExecutor#execute(Runnable)
+	 */
+	private void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+		try {
+			this.workQueue.put(r);
+		} catch (InterruptedException e) {
+			throw new SneakyInterruptedException(e);
+		}
+	}
+
+	/**
+	 * Enqueues a job for execution. The results of multiple submissions can be polled in FIFO order using {@link #processedData()}.
+	 * 
+	 * @param processingJob A task, that will compute a result.
+	 * @throws InterruptedException
+	 */
+	void submit(Callable<T> processingJob) throws InterruptedException {
+		try {
+			Future<T> future = executorService.submit(processingJob);
+			processedData.offer(new SequencedFutureResult(future, jobSequence.getAndIncrement()));
+		} catch (SneakyInterruptedException e) {
+			throw e.getCause();
+		}
+	}
+
+	/**
+	 * Submits already pre-processed data, that can be polled in FIFO order from {@link #processedData()}.
+	 * 
+	 * @throws InterruptedException
+	 */
+	void submitPreprocessed(T preprocessedData) throws InterruptedException {
+		this.submit(() -> {
+			return preprocessedData;
+		});
+	}
+
+	/**
+	 * Result of previously {@link #submit(Callable) submitted} jobs in the same order as they have been submitted. Blocks if the job didn't finish yet.
+	 * 
+	 * @return Next job result
+	 * @throws InterruptedException If the calling thread was interrupted while waiting for the next result.
+	 */
+	T processedData() throws InterruptedException {
+		return processedData.take().get();
+	}
+
+	private class SequencedFutureResult implements Comparable<SequencedFutureResult> {
+
+		private final Future<T> result;
+		private final long sequenceNumber;
+
+		public SequencedFutureResult(Future<T> result, long sequenceNumber) {
+			this.result = result;
+			this.sequenceNumber = sequenceNumber;
+		}
+
+		public T get() throws InterruptedException {
+			try {
+				return result.get();
+			} catch (ExecutionException e) {
+				if (e.getCause() instanceof RuntimeException) {
+					throw (RuntimeException) e.getCause();
+				} else {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+
+		@Override
+		public int compareTo(SequencedFutureResult other) {
+			return Long.compare(this.sequenceNumber, other.sequenceNumber);
+		}
+
+	}
+
+	private static class SneakyInterruptedException extends RuntimeException {
+
+		private static final long serialVersionUID = 331817765088138556L;
+
+		public SneakyInterruptedException(InterruptedException cause) {
+			super(cause);
+		}
+
+		@Override
+		public InterruptedException getCause() {
+			return (InterruptedException) super.getCause();
+		}
+
+	}
+
+}

+ 11 - 14
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java

@@ -23,7 +23,7 @@ import org.cryptomator.crypto.engine.FileContentCryptor;
 import org.cryptomator.crypto.engine.FileContentDecryptor;
 import org.cryptomator.io.ByteBuffers;
 
-class FileContentDecryptorImpl extends AbstractFileContentProcessor implements FileContentDecryptor {
+class FileContentDecryptorImpl implements FileContentDecryptor {
 
 	private static final String AES = "AES";
 	private static final int AES_BLOCK_LENGTH_IN_BYTES = 16;
@@ -31,7 +31,10 @@ class FileContentDecryptorImpl extends AbstractFileContentProcessor implements F
 	private static final String HMAC_SHA256 = "HmacSHA256";
 	private static final int CHUNK_SIZE = 32 * 1024;
 	private static final int MAC_SIZE = 32;
+	private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
+	private static final int READ_AHEAD = 2;
 
+	private final FifoParallelDataProcessor<ByteBuffer> dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS, NUM_THREADS + READ_AHEAD);
 	private final ThreadLocal<Mac> hmacSha256;
 	private final SecretKey contentKey;
 	private final byte[] nonce;
@@ -101,7 +104,7 @@ class FileContentDecryptorImpl extends AbstractFileContentProcessor implements F
 	}
 
 	@Override
-	public void append(ByteBuffer ciphertext) {
+	public void append(ByteBuffer ciphertext) throws InterruptedException {
 		if (ciphertext == FileContentCryptor.EOF) {
 			submitCiphertextBuffer();
 			submitEof();
@@ -113,26 +116,26 @@ class FileContentDecryptorImpl extends AbstractFileContentProcessor implements F
 		}
 	}
 
-	private void submitCiphertextBufferIfFull() {
+	private void submitCiphertextBufferIfFull() throws InterruptedException {
 		if (!ciphertextBuffer.hasRemaining()) {
 			submitCiphertextBuffer();
 			ciphertextBuffer = ByteBuffer.allocate(CHUNK_SIZE + MAC_SIZE);
 		}
 	}
 
-	private void submitCiphertextBuffer() {
+	private void submitCiphertextBuffer() throws InterruptedException {
 		ciphertextBuffer.flip();
 		Callable<ByteBuffer> encryptionJob = new DecryptionJob(ciphertextBuffer, chunkNumber++);
-		submit(encryptionJob);
+		dataProcessor.submit(encryptionJob);
 	}
 
-	private void submitEof() {
-		submitPreprocessed(FileContentCryptor.EOF);
+	private void submitEof() throws InterruptedException {
+		dataProcessor.submitPreprocessed(FileContentCryptor.EOF);
 	}
 
 	@Override
 	public ByteBuffer cleartext() throws InterruptedException {
-		return processedData();
+		return dataProcessor.processedData();
 	}
 
 	@Override
@@ -154,12 +157,6 @@ class FileContentDecryptorImpl extends AbstractFileContentProcessor implements F
 		}
 	}
 
-	@Override
-	public void close() {
-		this.destroy();
-		super.close();
-	}
-
 	private class DecryptionJob implements Callable<ByteBuffer> {
 
 		private final ByteBuffer ciphertextChunk;

+ 11 - 14
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java

@@ -24,14 +24,17 @@ import org.cryptomator.crypto.engine.FileContentCryptor;
 import org.cryptomator.crypto.engine.FileContentEncryptor;
 import org.cryptomator.io.ByteBuffers;
 
-class FileContentEncryptorImpl extends AbstractFileContentProcessor implements FileContentEncryptor {
+class FileContentEncryptorImpl implements FileContentEncryptor {
 
 	private static final String AES = "AES";
 	private static final int AES_BLOCK_LENGTH_IN_BYTES = 16;
 	private static final String AES_CBC = "AES/CBC/PKCS5Padding";
 	private static final String HMAC_SHA256 = "HmacSHA256";
 	private static final int CHUNK_SIZE = 32 * 1024;
+	private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
+	private static final int READ_AHEAD = 2;
 
+	private final FifoParallelDataProcessor<ByteBuffer> dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS, NUM_THREADS + READ_AHEAD);
 	private final ThreadLocal<Mac> hmacSha256;
 	private final SecretKey headerKey;
 	private final SecretKey contentKey;
@@ -98,7 +101,7 @@ class FileContentEncryptorImpl extends AbstractFileContentProcessor implements F
 	}
 
 	@Override
-	public void append(ByteBuffer cleartext) {
+	public void append(ByteBuffer cleartext) throws InterruptedException {
 		cleartextBytesEncrypted.add(cleartext.remaining());
 		if (cleartext == FileContentCryptor.EOF) {
 			submitCleartextBuffer();
@@ -111,26 +114,26 @@ class FileContentEncryptorImpl extends AbstractFileContentProcessor implements F
 		}
 	}
 
-	private void submitCleartextBufferIfFull() {
+	private void submitCleartextBufferIfFull() throws InterruptedException {
 		if (!cleartextBuffer.hasRemaining()) {
 			submitCleartextBuffer();
 			cleartextBuffer = ByteBuffer.allocate(CHUNK_SIZE);
 		}
 	}
 
-	private void submitCleartextBuffer() {
+	private void submitCleartextBuffer() throws InterruptedException {
 		cleartextBuffer.flip();
 		Callable<ByteBuffer> encryptionJob = new EncryptionJob(cleartextBuffer, chunkNumber++);
-		submit(encryptionJob);
+		dataProcessor.submit(encryptionJob);
 	}
 
-	private void submitEof() {
-		submitPreprocessed(FileContentCryptor.EOF);
+	private void submitEof() throws InterruptedException {
+		dataProcessor.submitPreprocessed(FileContentCryptor.EOF);
 	}
 
 	@Override
 	public ByteBuffer ciphertext() throws InterruptedException {
-		return processedData();
+		return dataProcessor.processedData();
 	}
 
 	@Override
@@ -152,12 +155,6 @@ class FileContentEncryptorImpl extends AbstractFileContentProcessor implements F
 		}
 	}
 
-	@Override
-	public void close() {
-		this.destroy();
-		super.close();
-	}
-
 	private class EncryptionJob implements Callable<ByteBuffer> {
 
 		private final ByteBuffer cleartextChunk;

+ 14 - 10
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoReadableFile.java

@@ -92,16 +92,20 @@ class CryptoReadableFile implements ReadableFile {
 		public Void call() {
 			file.read(EMPTY_BUFFER, startpos);
 			int bytesRead = -1;
-			do {
-				ByteBuffer ciphertext = ByteBuffer.allocate(READ_BUFFER_SIZE);
-				file.read(ciphertext);
-				ciphertext.flip();
-				bytesRead = ciphertext.remaining();
-				if (bytesRead > 0) {
-					decryptor.append(ciphertext);
-				}
-			} while (bytesRead > 0);
-			decryptor.append(FileContentCryptor.EOF);
+			try {
+				do {
+					ByteBuffer ciphertext = ByteBuffer.allocate(READ_BUFFER_SIZE);
+					file.read(ciphertext);
+					ciphertext.flip();
+					bytesRead = ciphertext.remaining();
+					if (bytesRead > 0) {
+						decryptor.append(ciphertext);
+					}
+				} while (bytesRead > 0);
+				decryptor.append(FileContentCryptor.EOF);
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			}
 			return null;
 		}
 

+ 6 - 2
main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoWritableFile.java

@@ -40,8 +40,12 @@ class CryptoWritableFile implements WritableFile {
 		final ByteBuffer cleartextCopy = ByteBuffer.allocate(source.remaining());
 		ByteBuffers.copy(source, cleartextCopy);
 		cleartextCopy.flip();
-		encryptor.append(cleartextCopy);
-		file.write(source);
+		try {
+			encryptor.append(cleartextCopy);
+			file.write(source);
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+		}
 	}
 
 	@Override

+ 96 - 0
main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java

@@ -0,0 +1,96 @@
+package org.cryptomator.crypto.engine.impl;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FifoParallelDataProcessorTest {
+
+	@Test
+	public void testStrictFifoOrder() throws InterruptedException {
+		FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(4, 10);
+		processor.submit(new IntegerJob(100, 1));
+		processor.submit(new IntegerJob(50, 2));
+		processor.submitPreprocessed(3);
+		processor.submit(new IntegerJob(10, 4));
+		processor.submit(new IntegerJob(10, 5));
+		processor.submitPreprocessed(6);
+
+		Assert.assertEquals(1, (int) processor.processedData());
+		Assert.assertEquals(2, (int) processor.processedData());
+		Assert.assertEquals(3, (int) processor.processedData());
+		Assert.assertEquals(4, (int) processor.processedData());
+		Assert.assertEquals(5, (int) processor.processedData());
+		Assert.assertEquals(6, (int) processor.processedData());
+	}
+
+	@Test
+	public void testBlockingBehaviour() throws InterruptedException {
+		FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
+		processor.submit(new IntegerJob(100, 1)); // runs immediatley
+		processor.submit(new IntegerJob(100, 2)); // #1 in queue
+
+		Thread t1 = new Thread(() -> {
+			try {
+				processor.submit(new IntegerJob(10, 3)); // #2 in queue
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			}
+		});
+		t1.start();
+		t1.join(10);
+		// job 3 should not have been submitted by now, thus t1 is still alive
+		Assert.assertTrue(t1.isAlive());
+		Assert.assertEquals(1, (int) processor.processedData());
+		Assert.assertEquals(2, (int) processor.processedData());
+		Assert.assertEquals(3, (int) processor.processedData());
+		Assert.assertFalse(t1.isAlive());
+	}
+
+	@Test
+	public void testInterruptionDuringSubmission() throws InterruptedException {
+		FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
+		processor.submit(new IntegerJob(100, 1)); // runs immediatley
+		processor.submit(new IntegerJob(100, 2)); // #1 in queue
+
+		final AtomicBoolean interruptedExceptionThrown = new AtomicBoolean(false);
+		Thread t1 = new Thread(() -> {
+			try {
+				processor.submit(new IntegerJob(10, 3)); // #2 in queue
+			} catch (InterruptedException e) {
+				interruptedExceptionThrown.set(true);
+				Thread.currentThread().interrupt();
+			}
+		});
+		t1.start();
+		t1.join(10);
+		t1.interrupt();
+		t1.join(10);
+		// job 3 should not have been submitted by now, thus t1 is still alive
+		Assert.assertFalse(t1.isAlive());
+		Assert.assertTrue(interruptedExceptionThrown.get());
+		Assert.assertEquals(1, (int) processor.processedData());
+		Assert.assertEquals(2, (int) processor.processedData());
+	}
+
+	private static class IntegerJob implements Callable<Integer> {
+
+		private final long waitMillis;
+		private final int result;
+
+		public IntegerJob(long waitMillis, int result) {
+			this.waitMillis = waitMillis;
+			this.result = result;
+		}
+
+		@Override
+		public Integer call() throws Exception {
+			Thread.sleep(waitMillis);
+			return result;
+		}
+
+	}
+
+}