فهرست منبع

simplified code

Sebastian Stenzel 9 سال پیش
والد
کامیت
ecb178d5b2

+ 50 - 124
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java

@@ -21,21 +21,9 @@ import java.security.InvalidKeyException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.crypto.BadPaddingException;
 import javax.crypto.Cipher;
@@ -428,22 +416,11 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 		final Long fileSize = sensitiveHeaderContentBuf.getLong();
 		sensitiveHeaderContentBuf.get(fileKeyBytes);
 
-		// content decryption:
-		encryptedFile.position(104l);
+		// prepare content decryption:
 		final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM);
-
-		// prepare some crypto workers:
-		final int numWorkers = Runtime.getRuntime().availableProcessors();
-		final Lock lock = new ReentrantLock();
-		final Condition blockDone = lock.newCondition();
-		final AtomicLong currentBlock = new AtomicLong();
-		final BlockingQueue<BlocksData> inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
 		final LengthLimitingOutputStream paddingRemovingOutputStream = new LengthLimitingOutputStream(plaintextFile, fileSize);
-		final List<DecryptWorker> workers = new ArrayList<>();
-		final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers);
-		final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
-		for (int i = 0; i < numWorkers; i++) {
-			final DecryptWorker worker = new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, Channels.newChannel(paddingRemovingOutputStream)) {
+		final CryptoWorkerExecutor executor = new CryptoWorkerExecutor(Runtime.getRuntime().availableProcessors(), (lock, blockDone, currentBlock, inputQueue) -> {
+			return new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, Channels.newChannel(paddingRemovingOutputStream)) {
 
 				@Override
 				protected Cipher initCipher(long startBlockNum) {
@@ -483,48 +460,33 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 				}
 
 			};
-			workers.add(worker);
-			completionService.submit(worker);
-		}
+		});
 
-		// reading ciphered input and MACs interleaved:
+		// read as many blocks from file as possible, but wait if queue is full:
+		encryptedFile.position(104l);
+		final int maxNumBlocks = 64;
+		int numBlocks = 1;
 		int bytesRead = 0;
 		long blockNumber = 0;
-		try {
-			// read as many blocks from file as possible, but wait if queue is full:
-			final int maxNumBlocks = 128;
-			int numBlocks = 0;
-			do {
-				if (numBlocks < maxNumBlocks) {
-					numBlocks++;
-				}
-				final int inBufSize = numBlocks * (CONTENT_MAC_BLOCK + 32);
-				final ByteBuffer buf = ByteBuffer.allocate(inBufSize);
-				bytesRead = encryptedFile.read(buf);
-				buf.flip();
-				final int blocksRead = (int) Math.ceil(bytesRead / (double) (CONTENT_MAC_BLOCK + 32));
-				final boolean consumedInTime = inputQueue.offer(new BlocksData(buf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
-				if (!consumedInTime) {
-					// interrupt read loop and make room for some poisons:
-					inputQueue.clear();
-					break;
-				}
-				blockNumber += numBlocks;
-			} while (bytesRead == numBlocks * (CONTENT_MAC_BLOCK + 32));
-
-			// each worker has to swallow some poison:
-			for (int i = 0; i < numWorkers; i++) {
-				inputQueue.put(CryptoWorker.POISON);
+		do {
+			if (numBlocks < maxNumBlocks) {
+				numBlocks++;
 			}
-		} catch (InterruptedException e) {
-			LOG.error("Thread interrupted", e);
-		}
+			final int inBufSize = numBlocks * (CONTENT_MAC_BLOCK + 32);
+			final ByteBuffer buf = ByteBuffer.allocate(inBufSize);
+			bytesRead = encryptedFile.read(buf);
+			buf.flip();
+			final int blocksRead = (int) Math.ceil(bytesRead / (double) (CONTENT_MAC_BLOCK + 32));
+			final boolean consumedInTime = executor.offer(new BlocksData(buf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
+			if (!consumedInTime) {
+				break;
+			}
+			blockNumber += numBlocks;
+		} while (bytesRead == numBlocks * (CONTENT_MAC_BLOCK + 32));
 
 		// wait for decryption workers to finish:
 		try {
-			for (int i = 0; i < numWorkers; i++) {
-				completionService.take().get();
-			}
+			executor.waitUntilDone();
 		} catch (ExecutionException e) {
 			final Throwable cause = e.getCause();
 			if (cause instanceof IOException) {
@@ -534,14 +496,10 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 			} else {
 				LOG.error("Unexpected exception", e);
 			}
-		} catch (InterruptedException e) {
-			LOG.error("Thread interrupted", e);
 		} finally {
-			// shutdown either after normal decryption or if ANY worker threw an exception:
-			executorService.shutdownNow();
+			destroyQuietly(fileKey);
 		}
 
-		destroyQuietly(fileKey);
 		return paddingRemovingOutputStream.getBytesWritten();
 	}
 
@@ -674,24 +632,10 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 		headerBuf.limit(104);
 		encryptedFile.write(headerBuf);
 
-		// add random length padding to obfuscate file length:
-		final byte[] randomPadding = this.randomData(AES_BLOCK_LENGTH);
-		final LengthObfuscatingInputStream in = new LengthObfuscatingInputStream(plaintextFile, randomPadding);
-
-		// content encryption:
+		// prepare content encryption:
 		final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM);
-
-		// prepare some crypto workers:
-		final int numWorkers = Runtime.getRuntime().availableProcessors();
-		final Lock lock = new ReentrantLock();
-		final Condition blockDone = lock.newCondition();
-		final AtomicLong currentBlock = new AtomicLong();
-		final BlockingQueue<BlocksData> inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
-		final List<EncryptWorker> workers = new ArrayList<>();
-		final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers);
-		final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
-		for (int i = 0; i < numWorkers; i++) {
-			final EncryptWorker worker = new EncryptWorker(lock, blockDone, currentBlock, inputQueue, encryptedFile) {
+		final CryptoWorkerExecutor executor = new CryptoWorkerExecutor(Runtime.getRuntime().availableProcessors(), (lock, blockDone, currentBlock, inputQueue) -> {
+			return new EncryptWorker(lock, blockDone, currentBlock, inputQueue, encryptedFile) {
 
 				@Override
 				protected Cipher initCipher(long startBlockNum) {
@@ -725,49 +669,35 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 					}
 				}
 			};
-			workers.add(worker);
-			completionService.submit(worker);
-		}
+		});
 
-		// writing ciphered output and MACs interleaved:
+		// read as many blocks from file as possible, but wait if queue is full:
+		final byte[] randomPadding = this.randomData(AES_BLOCK_LENGTH);
+		final LengthObfuscatingInputStream in = new LengthObfuscatingInputStream(plaintextFile, randomPadding);
+		final ReadableByteChannel channel = Channels.newChannel(in);
 		int bytesRead = 0;
 		long blockNumber = 0;
-		try {
-			final ReadableByteChannel channel = Channels.newChannel(in);
-			// read as many blocks from file as possible, but wait if queue is full:
-			final int maxNumBlocks = 128;
-			int numBlocks = 0;
-			do {
-				if (numBlocks < maxNumBlocks) {
-					numBlocks++;
-				}
-				final int inBufSize = numBlocks * CONTENT_MAC_BLOCK;
-				final ByteBuffer inBuf = ByteBuffer.allocate(inBufSize);
-				bytesRead = channel.read(inBuf);
-				inBuf.flip();
-				final int blocksRead = (int) Math.ceil(bytesRead / (double) CONTENT_MAC_BLOCK);
-				final boolean consumedInTime = inputQueue.offer(new BlocksData(inBuf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
-				if (!consumedInTime) {
-					// interrupt read loop and make room for some poisons:
-					inputQueue.clear();
-					break;
-				}
-				blockNumber += numBlocks;
-			} while (bytesRead == numBlocks * CONTENT_MAC_BLOCK);
-
-			// each worker has to swallow some poison:
-			for (int i = 0; i < numWorkers; i++) {
-				inputQueue.put(CryptoWorker.POISON);
+		final int maxNumBlocks = 64;
+		int numBlocks = 0;
+		do {
+			if (numBlocks < maxNumBlocks) {
+				numBlocks++;
 			}
-		} catch (InterruptedException e) {
-			LOG.error("Thread interrupted", e);
-		}
+			final int inBufSize = numBlocks * CONTENT_MAC_BLOCK;
+			final ByteBuffer inBuf = ByteBuffer.allocate(inBufSize);
+			bytesRead = channel.read(inBuf);
+			inBuf.flip();
+			final int blocksRead = (int) Math.ceil(bytesRead / (double) CONTENT_MAC_BLOCK);
+			final boolean consumedInTime = executor.offer(new BlocksData(inBuf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
+			if (!consumedInTime) {
+				break;
+			}
+			blockNumber += numBlocks;
+		} while (bytesRead == numBlocks * CONTENT_MAC_BLOCK);
 
 		// wait for encryption workers to finish:
 		try {
-			for (int i = 0; i < numWorkers; i++) {
-				completionService.take().get();
-			}
+			executor.waitUntilDone();
 		} catch (ExecutionException e) {
 			final Throwable cause = e.getCause();
 			if (cause instanceof IOException) {
@@ -777,13 +707,9 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 			} else {
 				LOG.error("Unexpected exception", e);
 			}
-		} catch (InterruptedException e) {
-			LOG.error("Thread interrupted", e);
 		} finally {
-			// shutdown either after normal encryption or if ANY worker threw an exception:
-			executorService.shutdownNow();
+			destroyQuietly(fileKey);
 		}
-		destroyQuietly(fileKey);
 
 		// create and write header:
 		final long plaintextSize = in.getRealInputLength();

+ 112 - 0
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java

@@ -0,0 +1,112 @@
+package org.cryptomator.crypto.aes256;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CryptoWorkerExecutor {
+
+	private static final Logger LOG = LoggerFactory.getLogger(CryptoWorkerExecutor.class);
+
+	private final int numWorkers;
+	private final Lock lock;
+	private final Condition blockDone;
+	private final AtomicLong currentBlock;
+	private final BlockingQueue<BlocksData> inputQueue;
+	private final ExecutorService executorService;
+	private final CompletionService<Void> completionService;
+	private boolean acceptWork;
+
+	/**
+	 * Starts as many {@link CryptoWorker} as specified in the constructor, that start working immediately on the items submitted via {@link #offer(BlocksData, long, TimeUnit)}.
+	 */
+	public CryptoWorkerExecutor(int numWorkers, WorkerFactory workerFactory) {
+		this.numWorkers = numWorkers;
+		this.lock = new ReentrantLock();
+		this.blockDone = lock.newCondition();
+		this.currentBlock = new AtomicLong();
+		this.inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
+		this.executorService = Executors.newFixedThreadPool(numWorkers);
+		this.completionService = new ExecutorCompletionService<>(executorService);
+		this.acceptWork = true;
+
+		// start workers:
+		for (int i = 0; i < numWorkers; i++) {
+			final CryptoWorker worker = workerFactory.createWorker(lock, blockDone, currentBlock, inputQueue);
+			completionService.submit(worker);
+		}
+	}
+
+	/**
+	 * Adds work to the work queue. On timeout all workers will be shut down.
+	 * 
+	 * @see BlockingQueue#offer(Object, long, TimeUnit)
+	 * @return <code>true</code> if the work has been added in time. <code>false</code> in any other case.
+	 */
+	public boolean offer(BlocksData data, long timeout, TimeUnit unit) {
+		if (!acceptWork) {
+			return false;
+		}
+		try {
+			final boolean success = inputQueue.offer(data, timeout, unit);
+			if (!success) {
+				this.acceptWork = false;
+				inputQueue.clear();
+				poisonWorkers();
+			}
+			return success;
+		} catch (InterruptedException e) {
+			LOG.error("Interrupted thread.", e);
+			executorService.shutdownNow();
+			Thread.currentThread().interrupt();
+		}
+		return false;
+	}
+
+	/**
+	 * Graceful shutdown of this executor, waiting for all jobs to finish (normally or by throwing exceptions).
+	 * 
+	 * @throws ExecutionException If any of the workers failed.
+	 */
+	public void waitUntilDone() throws ExecutionException {
+		this.acceptWork = false;
+		try {
+			poisonWorkers();
+			// now workers will one after another finish their work, potentially throwing an ExecutionException:
+			for (int i = 0; i < numWorkers; i++) {
+				completionService.take().get();
+			}
+		} catch (InterruptedException e) {
+			LOG.error("Interrupted thread.", e);
+			Thread.currentThread().interrupt();
+		} finally {
+			// shutdown either after normal decryption or if ANY worker threw an exception:
+			executorService.shutdownNow();
+		}
+	}
+
+	private void poisonWorkers() throws InterruptedException {
+		// add enough poison for each worker:
+		for (int i = 0; i < numWorkers; i++) {
+			inputQueue.put(CryptoWorker.POISON);
+		}
+	}
+
+	@FunctionalInterface
+	interface WorkerFactory {
+		CryptoWorker createWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<BlocksData> inputQueue);
+	}
+
+}