Parcourir la source

slow start, increasing number of blocks worked on per thread, reusing ciphers for multiple consecutive blocks

Sebastian Stenzel il y a 10 ans
Parent
commit
ed7dc60f5e

+ 2 - 1
main/core/src/test/java/org/cryptomator/webdav/jackrabbit/RangeRequestTest.java

@@ -261,7 +261,8 @@ public class RangeRequestTest {
 		final HttpMethod getMethod = new GetMethod(testResourceUrl.toString());
 		getMethod.addRequestHeader("Range", "chunks=1-2");
 		final int getResponse = client.executeMethod(getMethod);
-		final byte[] response = getMethod.getResponseBody();
+		final byte[] response = new byte[fileContent.length];
+		IOUtils.read(getMethod.getResponseBodyAsStream(), response);
 		getMethod.releaseConnection();
 		Assert.assertEquals(416, getResponse);
 		Assert.assertArrayEquals(fileContent, response);

+ 73 - 49
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java

@@ -437,72 +437,80 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 		final Lock lock = new ReentrantLock();
 		final Condition blockDone = lock.newCondition();
 		final AtomicLong currentBlock = new AtomicLong();
-		final BlockingQueue<Block> inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
+		final BlockingQueue<BlocksData> inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
 		final LengthLimitingOutputStream paddingRemovingOutputStream = new LengthLimitingOutputStream(plaintextFile, fileSize);
 		final List<DecryptWorker> workers = new ArrayList<>();
 		final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers);
 		final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
 		for (int i = 0; i < numWorkers; i++) {
 			final DecryptWorker worker = new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, Channels.newChannel(paddingRemovingOutputStream)) {
-				private final Mac mac = hmacSha256(hMacMasterKey);
 
 				@Override
-				protected ByteBuffer decrypt(Block block) {
+				protected Cipher initCipher(long startBlockNum) {
 					final ByteBuffer nonceAndCounterBuf = ByteBuffer.allocate(AES_BLOCK_LENGTH);
 					nonceAndCounterBuf.put(nonce);
-					nonceAndCounterBuf.putLong(block.blockNumber * CONTENT_MAC_BLOCK / AES_BLOCK_LENGTH);
+					nonceAndCounterBuf.putLong(startBlockNum * CONTENT_MAC_BLOCK / AES_BLOCK_LENGTH);
 					final byte[] nonceAndCounter = nonceAndCounterBuf.array();
-					final ByteBuffer ciphertext = block.buffer.duplicate();
-					ciphertext.rewind();
-					ciphertext.limit(block.buffer.limit() - mac.getMacLength());
-					final Cipher cipher = aesCtrCipher(fileKey, nonceAndCounter, Cipher.DECRYPT_MODE);
-					try {
-						assert cipher.getOutputSize(ciphertext.limit()) == ciphertext.limit();
-						final ByteBuffer output = ByteBuffer.allocate(ciphertext.limit());
-						cipher.update(ciphertext, output);
-						return output;
-					} catch (ShortBufferException e) {
-						throw new IllegalStateException("Output buffer size too short, even though outlen = inlen in CTR mode.", e);
-					}
+					return aesCtrCipher(fileKey, nonceAndCounter, Cipher.DECRYPT_MODE);
 				}
 
 				@Override
-				protected void checkMac(Block block) throws MacAuthenticationFailedException {
-					final ByteBuffer ciphertext = block.buffer.duplicate();
-					ciphertext.rewind();
-					ciphertext.limit(block.buffer.limit() - mac.getMacLength());
+				protected Mac initMac() {
+					return hmacSha256(hMacMasterKey);
+				}
+
+				@Override
+				protected void checkMac(Mac mac, long blockNum, ByteBuffer ciphertextBuf, ByteBuffer macBuf) throws MacAuthenticationFailedException {
 					mac.update(iv);
-					mac.update(longToByteArray(block.blockNumber));
-					mac.update(ciphertext);
+					mac.update(longToByteArray(blockNum));
+					mac.update(ciphertextBuf);
 					final byte[] calculatedMac = mac.doFinal();
 					final byte[] storedMac = new byte[mac.getMacLength()];
-					block.buffer.position(ciphertext.limit());
-					block.buffer.get(storedMac);
+					macBuf.get(storedMac);
 					if (!MessageDigest.isEqual(calculatedMac, storedMac)) {
 						throw new MacAuthenticationFailedException("Content MAC authentication failed.");
 					}
 				}
+
+				@Override
+				protected void decrypt(Cipher cipher, ByteBuffer ciphertextBuf, ByteBuffer plaintextBuf) throws DecryptFailedException {
+					assert plaintextBuf.remaining() >= cipher.getOutputSize(ciphertextBuf.remaining());
+					try {
+						cipher.update(ciphertextBuf, plaintextBuf);
+					} catch (ShortBufferException e) {
+						throw new DecryptFailedException(e);
+					}
+				}
+
 			};
 			workers.add(worker);
 			completionService.submit(worker);
 		}
 
 		// reading ciphered input and MACs interleaved:
-		int n = 0;
+		int bytesRead = 0;
 		long blockNumber = 0;
 		try {
 			// read as many blocks from file as possible, but wait if queue is full:
+			final int maxNumBlocks = 128;
+			int numBlocks = 0;
 			do {
-				final ByteBuffer buf = ByteBuffer.allocate(CONTENT_MAC_BLOCK + 32);
-				n = encryptedFile.read(buf); // IOUtils.read(fileIn, buffer);
+				if (numBlocks < maxNumBlocks) {
+					numBlocks++;
+				}
+				final int inBufSize = numBlocks * (CONTENT_MAC_BLOCK + 32);
+				final ByteBuffer buf = ByteBuffer.allocate(inBufSize);
+				bytesRead = encryptedFile.read(buf);
 				buf.flip();
-				final boolean consumedInTime = inputQueue.offer(new Block(buf.asReadOnlyBuffer(), blockNumber++), 1, TimeUnit.SECONDS);
+				final int blocksRead = (int) Math.ceil(bytesRead / (double) (CONTENT_MAC_BLOCK + 32));
+				final boolean consumedInTime = inputQueue.offer(new BlocksData(buf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
 				if (!consumedInTime) {
 					// interrupt read loop and make room for some poisons:
 					inputQueue.clear();
 					break;
 				}
-			} while (n == CONTENT_MAC_BLOCK + 32);
+				blockNumber += numBlocks;
+			} while (bytesRead == numBlocks * (CONTENT_MAC_BLOCK + 32));
 
 			// each worker has to swallow some poison:
 			for (int i = 0; i < numWorkers; i++) {
@@ -678,58 +686,74 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 		final Lock lock = new ReentrantLock();
 		final Condition blockDone = lock.newCondition();
 		final AtomicLong currentBlock = new AtomicLong();
-		final BlockingQueue<Block> inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
+		final BlockingQueue<BlocksData> inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
 		final List<EncryptWorker> workers = new ArrayList<>();
 		final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers);
 		final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
 		for (int i = 0; i < numWorkers; i++) {
 			final EncryptWorker worker = new EncryptWorker(lock, blockDone, currentBlock, inputQueue, encryptedFile) {
-				private final Mac mac = hmacSha256(hMacMasterKey);
 
 				@Override
-				protected void encrypt(Block block, ByteBuffer output) {
+				protected Cipher initCipher(long startBlockNum) {
 					final ByteBuffer nonceAndCounterBuf = ByteBuffer.allocate(AES_BLOCK_LENGTH);
 					nonceAndCounterBuf.put(nonce);
-					nonceAndCounterBuf.putLong(block.blockNumber * CONTENT_MAC_BLOCK / AES_BLOCK_LENGTH);
+					nonceAndCounterBuf.putLong(startBlockNum * CONTENT_MAC_BLOCK / AES_BLOCK_LENGTH);
 					final byte[] nonceAndCounter = nonceAndCounterBuf.array();
-					final Cipher cipher = aesCtrCipher(fileKey, nonceAndCounter, Cipher.ENCRYPT_MODE);
-					try {
-						assert cipher.getOutputSize(block.buffer.limit()) == block.buffer.limit();
-						cipher.update(block.buffer, output);
-					} catch (ShortBufferException e) {
-						throw new IllegalStateException("Output buffer size too short, even though outlen = inlen in CTR mode.", e);
-					}
+					return aesCtrCipher(fileKey, nonceAndCounter, Cipher.ENCRYPT_MODE);
 				}
 
 				@Override
-				protected byte[] mac(long blockNumber, ByteBuffer ciphertext) {
+				protected Mac initMac() {
+					return hmacSha256(hMacMasterKey);
+				}
+
+				@Override
+				protected byte[] calcMac(Mac mac, long blockNum, ByteBuffer ciphertextBuf) {
 					mac.update(iv);
-					mac.update(longToByteArray(blockNumber));
-					mac.update(ciphertext);
+					mac.update(longToByteArray(blockNum));
+					mac.update(ciphertextBuf);
 					return mac.doFinal();
 				}
+
+				@Override
+				protected void encrypt(Cipher cipher, ByteBuffer plaintextBuf, ByteBuffer ciphertextBuf) throws EncryptFailedException {
+					try {
+						assert ciphertextBuf.remaining() >= cipher.getOutputSize(plaintextBuf.remaining());
+						cipher.update(plaintextBuf, ciphertextBuf);
+					} catch (ShortBufferException e) {
+						throw new EncryptFailedException(e);
+					}
+				}
 			};
 			workers.add(worker);
 			completionService.submit(worker);
 		}
 
 		// writing ciphered output and MACs interleaved:
-		int n = 0;
+		int bytesRead = 0;
 		long blockNumber = 0;
 		try {
 			final ReadableByteChannel channel = Channels.newChannel(in);
 			// read as many blocks from file as possible, but wait if queue is full:
+			final int maxNumBlocks = 128;
+			int numBlocks = 0;
 			do {
-				final ByteBuffer buf = ByteBuffer.allocate(CONTENT_MAC_BLOCK);
-				n = channel.read(buf); // IOUtils.read(fileIn, buffer);
-				buf.flip();
-				final boolean consumedInTime = inputQueue.offer(new Block(buf.asReadOnlyBuffer(), blockNumber++), 1, TimeUnit.SECONDS);
+				if (numBlocks < maxNumBlocks) {
+					numBlocks++;
+				}
+				final int inBufSize = numBlocks * CONTENT_MAC_BLOCK;
+				final ByteBuffer inBuf = ByteBuffer.allocate(inBufSize);
+				bytesRead = channel.read(inBuf);
+				inBuf.flip();
+				final int blocksRead = (int) Math.ceil(bytesRead / (double) CONTENT_MAC_BLOCK);
+				final boolean consumedInTime = inputQueue.offer(new BlocksData(inBuf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
 				if (!consumedInTime) {
 					// interrupt read loop and make room for some poisons:
 					inputQueue.clear();
 					break;
 				}
-			} while (n == CONTENT_MAC_BLOCK);
+				blockNumber += numBlocks;
+			} while (bytesRead == numBlocks * CONTENT_MAC_BLOCK);
 
 			// each worker has to swallow some poison:
 			for (int i = 0; i < numWorkers; i++) {

+ 0 - 15
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Block.java

@@ -1,15 +0,0 @@
-package org.cryptomator.crypto.aes256;
-
-import java.nio.ByteBuffer;
-
-class Block {
-
-	final ByteBuffer buffer;
-	final long blockNumber;
-
-	Block(ByteBuffer buffer, long blockNumber) {
-		this.buffer = buffer;
-		this.blockNumber = blockNumber;
-	}
-
-}

+ 22 - 0
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/BlocksData.java

@@ -0,0 +1,22 @@
+package org.cryptomator.crypto.aes256;
+
+import java.nio.ByteBuffer;
+
+class BlocksData {
+
+	public static final int MAX_NUM_BLOCKS = 128;
+
+	final ByteBuffer buffer;
+	final long startBlockNum;
+	final int numBlocks;
+
+	BlocksData(ByteBuffer buffer, long startBlockNum, int numBlocks) {
+		if (numBlocks > MAX_NUM_BLOCKS) {
+			throw new IllegalArgumentException("Too many blocks to process at once: " + numBlocks);
+		}
+		this.buffer = buffer;
+		this.startBlockNum = startBlockNum;
+		this.numBlocks = numBlocks;
+	}
+
+}

+ 10 - 10
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorker.java

@@ -12,14 +12,14 @@ import org.cryptomator.crypto.exceptions.CryptingException;
 
 abstract class CryptoWorker implements Callable<Void> {
 
-	static final Block POISON = new Block(ByteBuffer.allocate(0), -1L);
+	static final BlocksData POISON = new BlocksData(ByteBuffer.allocate(0), -1L, 0);
 
 	final Lock lock;
 	final Condition blockDone;
 	final AtomicLong currentBlock;
-	final BlockingQueue<Block> queue;
+	final BlockingQueue<BlocksData> queue;
 
-	public CryptoWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<Block> queue) {
+	public CryptoWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<BlocksData> queue) {
 		this.lock = lock;
 		this.blockDone = blockDone;
 		this.currentBlock = currentBlock;
@@ -30,22 +30,22 @@ abstract class CryptoWorker implements Callable<Void> {
 	public final Void call() throws IOException {
 		try {
 			while (!Thread.currentThread().isInterrupted()) {
-				final Block block = queue.take();
-				if (block == POISON) {
+				final BlocksData blocksData = queue.take();
+				if (blocksData == POISON) {
 					// put poison back in for other threads:
 					break;
 				}
-				final ByteBuffer processedBytes = this.process(block);
+				final ByteBuffer processedBytes = this.process(blocksData);
 				lock.lock();
 				try {
-					while (currentBlock.get() != block.blockNumber) {
+					while (currentBlock.get() != blocksData.startBlockNum) {
 						blockDone.await();
 					}
-					assert currentBlock.get() == block.blockNumber;
+					assert currentBlock.get() == blocksData.startBlockNum;
 					// yay, its my turn!
 					this.write(processedBytes);
 					// signal worker working on next block:
-					currentBlock.set(block.blockNumber + 1);
+					currentBlock.set(blocksData.startBlockNum + blocksData.numBlocks);
 					blockDone.signalAll();
 				} finally {
 					lock.unlock();
@@ -57,7 +57,7 @@ abstract class CryptoWorker implements Callable<Void> {
 		return null;
 	}
 
-	protected abstract ByteBuffer process(Block block) throws CryptingException;
+	protected abstract ByteBuffer process(BlocksData block) throws CryptingException;
 
 	protected abstract void write(ByteBuffer processedBytes) throws IOException;
 

+ 37 - 13
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/DecryptWorker.java

@@ -8,6 +8,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 
+import javax.crypto.Cipher;
+import javax.crypto.Mac;
+
 import org.cryptomator.crypto.exceptions.CryptingException;
 import org.cryptomator.crypto.exceptions.DecryptFailedException;
 import org.cryptomator.crypto.exceptions.MacAuthenticationFailedException;
@@ -17,35 +20,56 @@ abstract class DecryptWorker extends CryptoWorker implements AesCryptographicCon
 	private final boolean shouldAuthenticate;
 	private final WritableByteChannel out;
 
-	public DecryptWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<Block> queue, boolean shouldAuthenticate, WritableByteChannel out) {
+	public DecryptWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<BlocksData> queue, boolean shouldAuthenticate, WritableByteChannel out) {
 		super(lock, blockDone, currentBlock, queue);
 		this.shouldAuthenticate = shouldAuthenticate;
 		this.out = out;
 	}
 
 	@Override
-	protected ByteBuffer process(Block block) throws CryptingException {
-		if (block.buffer.limit() < 32) {
-			throw new DecryptFailedException("Invalid file content, missing MAC.");
-		}
+	protected ByteBuffer process(BlocksData data) throws CryptingException {
+		final Cipher cipher = initCipher(data.startBlockNum);
+		final Mac mac = initMac();
+
+		final ByteBuffer plaintextBuf = ByteBuffer.allocate(cipher.getOutputSize(CONTENT_MAC_BLOCK) * data.numBlocks);
 
-		// check MAC of current block:
-		if (shouldAuthenticate) {
-			checkMac(block);
+		final ByteBuffer ciphertextBuf = data.buffer.asReadOnlyBuffer();
+		final ByteBuffer macBuf = data.buffer.asReadOnlyBuffer();
+
+		for (long blockNum = data.startBlockNum; blockNum < data.startBlockNum + data.numBlocks; blockNum++) {
+			assert (blockNum - data.startBlockNum) < BlocksData.MAX_NUM_BLOCKS;
+			assert (blockNum - data.startBlockNum) * CONTENT_MAC_BLOCK < Integer.MAX_VALUE;
+			final int pos = (int) (blockNum - data.startBlockNum) * (CONTENT_MAC_BLOCK + mac.getMacLength());
+			ciphertextBuf.limit(Math.min(data.buffer.limit() - mac.getMacLength(), pos + CONTENT_MAC_BLOCK));
+			ciphertextBuf.position(pos);
+			try {
+				macBuf.limit(ciphertextBuf.limit() + mac.getMacLength());
+				macBuf.position(ciphertextBuf.limit());
+			} catch (IllegalArgumentException e) {
+				throw new DecryptFailedException("Invalid file content, missing MAC.");
+			}
+			if (shouldAuthenticate) {
+				checkMac(mac, blockNum, ciphertextBuf, macBuf);
+			}
+			ciphertextBuf.position(pos);
+			decrypt(cipher, ciphertextBuf, plaintextBuf);
 		}
 
-		// decrypt block:
-		return decrypt(block);
+		plaintextBuf.flip();
+		return plaintextBuf;
 	}
 
 	@Override
 	protected void write(ByteBuffer processedBytes) throws IOException {
-		processedBytes.flip();
 		out.write(processedBytes);
 	}
 
-	protected abstract void checkMac(Block block) throws MacAuthenticationFailedException;
+	protected abstract Cipher initCipher(long startBlockNum);
+
+	protected abstract Mac initMac();
+
+	protected abstract void checkMac(Mac mac, long blockNum, ByteBuffer ciphertextBuf, ByteBuffer macBuf) throws MacAuthenticationFailedException;
 
-	protected abstract ByteBuffer decrypt(Block block);
+	protected abstract void decrypt(Cipher cipher, ByteBuffer ciphertextBuf, ByteBuffer plaintextBuf) throws DecryptFailedException;
 
 }

+ 30 - 12
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/EncryptWorker.java

@@ -8,36 +8,54 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 
+import javax.crypto.Cipher;
+import javax.crypto.Mac;
+
 import org.cryptomator.crypto.exceptions.CryptingException;
+import org.cryptomator.crypto.exceptions.EncryptFailedException;
 
 abstract class EncryptWorker extends CryptoWorker implements AesCryptographicConfiguration {
 
 	private final WritableByteChannel out;
 
-	public EncryptWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<Block> queue, WritableByteChannel out) {
+	public EncryptWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<BlocksData> queue, WritableByteChannel out) {
 		super(lock, blockDone, currentBlock, queue);
 		this.out = out;
 	}
 
 	@Override
-	protected ByteBuffer process(Block block) throws CryptingException {
-		final ByteBuffer buf = ByteBuffer.allocateDirect(block.buffer.limit() + 32);
-		encrypt(block, buf);
-		final ByteBuffer ciphertextBuffer = buf.duplicate();
-		ciphertextBuffer.flip();
-		final byte[] mac = mac(block.blockNumber, ciphertextBuffer);
-		buf.put(mac);
-		return buf;
+	protected ByteBuffer process(BlocksData data) throws CryptingException {
+		final Cipher cipher = initCipher(data.startBlockNum);
+		final Mac mac = initMac();
+
+		final ByteBuffer ciphertextBuf = ByteBuffer.allocate((cipher.getOutputSize(CONTENT_MAC_BLOCK) + mac.getMacLength()) * data.numBlocks);
+		final ByteBuffer plaintextBuf = data.buffer.asReadOnlyBuffer();
+
+		for (long blockNum = data.startBlockNum; blockNum < data.startBlockNum + data.numBlocks; blockNum++) {
+			final int pos = (int) (blockNum - data.startBlockNum) * CONTENT_MAC_BLOCK;
+			plaintextBuf.limit(Math.min(data.buffer.limit(), pos + CONTENT_MAC_BLOCK));
+			encrypt(cipher, plaintextBuf, ciphertextBuf);
+			final ByteBuffer toMac = ciphertextBuf.asReadOnlyBuffer();
+			toMac.limit(ciphertextBuf.position());
+			toMac.position((int) (blockNum - data.startBlockNum) * (CONTENT_MAC_BLOCK + mac.getMacLength()));
+			ciphertextBuf.put(calcMac(mac, blockNum, toMac));
+		}
+
+		ciphertextBuf.flip();
+		return ciphertextBuf;
 	}
 
 	@Override
 	protected void write(ByteBuffer processedBytes) throws IOException {
-		processedBytes.flip();
 		out.write(processedBytes);
 	}
 
-	protected abstract byte[] mac(long blockNumber, ByteBuffer ciphertext);
+	protected abstract Cipher initCipher(long startBlockNum);
+
+	protected abstract Mac initMac();
+
+	protected abstract byte[] calcMac(Mac mac, long blockNum, ByteBuffer ciphertextBuf);
 
-	protected abstract void encrypt(Block block, ByteBuffer ciphertext);
+	protected abstract void encrypt(Cipher cipher, ByteBuffer plaintextBuf, ByteBuffer ciphertextBuf) throws EncryptFailedException;
 
 }

+ 4 - 0
main/crypto-api/src/main/java/org/cryptomator/crypto/exceptions/EncryptFailedException.java

@@ -3,6 +3,10 @@ package org.cryptomator.crypto.exceptions;
 public class EncryptFailedException extends CryptingException {
 	private static final long serialVersionUID = -3855673600374897828L;
 
+	public EncryptFailedException(Throwable t) {
+		super("Encryption failed.", t);
+	}
+
 	public EncryptFailedException(String msg) {
 		super(msg);
 	}