瀏覽代碼

- mutlithreaded encryption
- moved to bytebuffer (experimental)

Sebastian Stenzel 9 年之前
父節點
當前提交
aac9ead633

+ 11 - 1
main/core/src/main/java/org/cryptomator/webdav/jackrabbit/WebDavServlet.java

@@ -89,6 +89,16 @@ public class WebDavServlet extends AbstractWebdavServlet {
 		this.davResourceFactory = resourceFactory;
 	}
 
+	@Override
+	protected void doPut(WebdavRequest request, WebdavResponse response, DavResource resource) throws IOException, DavException {
+		long t0 = System.nanoTime();
+		super.doPut(request, response, resource);
+		if (LOG.isDebugEnabled()) {
+			long t1 = System.nanoTime();
+			LOG.debug("PUT TIME: " + (t1 - t0) / 1000 / 1000.0 + " ms");
+		}
+	}
+
 	@Override
 	protected void doGet(WebdavRequest request, WebdavResponse response, DavResource resource) throws IOException, DavException {
 		long t0 = System.nanoTime();
@@ -101,7 +111,7 @@ public class WebDavServlet extends AbstractWebdavServlet {
 		}
 		if (LOG.isDebugEnabled()) {
 			long t1 = System.nanoTime();
-			LOG.debug("REQUEST TIME: " + (t1 - t0) / 1000 / 1000.0 + " ms");
+			LOG.debug("GET TIME: " + (t1 - t0) / 1000 / 1000.0 + " ms");
 		}
 	}
 

+ 106 - 30
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java

@@ -12,6 +12,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.security.InvalidAlgorithmParameterException;
@@ -41,6 +42,7 @@ import javax.crypto.IllegalBlockSizeException;
 import javax.crypto.Mac;
 import javax.crypto.NoSuchPaddingException;
 import javax.crypto.SecretKey;
+import javax.crypto.ShortBufferException;
 import javax.crypto.spec.IvParameterSpec;
 import javax.crypto.spec.SecretKeySpec;
 import javax.security.auth.DestroyFailedException;
@@ -56,12 +58,15 @@ import org.cryptomator.crypto.exceptions.UnsupportedKeyLengthException;
 import org.cryptomator.crypto.exceptions.UnsupportedVaultException;
 import org.cryptomator.crypto.exceptions.WrongPasswordException;
 import org.cryptomator.crypto.io.SeekableByteChannelInputStream;
-import org.cryptomator.crypto.io.SeekableByteChannelOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 
+	private static final Logger LOG = LoggerFactory.getLogger(Aes256Cryptor.class);
+
 	/**
 	 * Defined in static initializer. Defaults to 256, but falls back to maximum value possible, if JCE Unlimited Strength Jurisdiction Policy Files isn't installed. Those files can be downloaded
 	 * here: http://www.oracle.com/technetwork/java/javase/downloads/.
@@ -454,26 +459,35 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 		final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers);
 		final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
 		for (int i = 0; i < numWorkers; i++) {
-			final DecryptWorker worker = new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, paddingRemovingOutputStream) {
+			final DecryptWorker worker = new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, Channels.newChannel(paddingRemovingOutputStream)) {
 				private final Mac mac = hmacSha256(hMacMasterKey);
 
 				@Override
-				protected byte[] decrypt(Block block) {
+				protected ByteBuffer decrypt(Block block) {
 					final ByteBuffer nonceAndCounterBuf = ByteBuffer.allocate(AES_BLOCK_LENGTH);
 					nonceAndCounterBuf.put(nonce);
 					nonceAndCounterBuf.putLong(block.blockNumber * CONTENT_MAC_BLOCK / AES_BLOCK_LENGTH);
 					final byte[] nonceAndCounter = nonceAndCounterBuf.array();
+					final ByteBuffer input = ByteBuffer.wrap(block.buffer);
+					input.limit(block.buffer.length - mac.getMacLength());
 					final Cipher cipher = aesCtrCipher(fileKey, nonceAndCounter, Cipher.DECRYPT_MODE);
-					return cipher.update(block.buffer, 0, block.numBytes - mac.getMacLength());
+					try {
+						assert cipher.getOutputSize(block.buffer.length) == block.buffer.length;
+						final ByteBuffer output = ByteBuffer.allocate(input.limit());
+						cipher.update(input, output);
+						return output;
+					} catch (ShortBufferException e) {
+						throw new IllegalStateException("Output buffer size too short, even though outlen = inlen in CTR mode.", e);
+					}
 				}
 
 				@Override
 				protected void checkMac(Block block) throws MacAuthenticationFailedException {
 					mac.update(iv);
 					mac.update(longToByteArray(block.blockNumber));
-					mac.update(block.buffer, 0, block.numBytes - mac.getMacLength());
+					mac.update(block.buffer, 0, block.buffer.length - mac.getMacLength());
 					final byte[] calculatedMac = mac.doFinal();
-					final byte[] storedMac = Arrays.copyOfRange(block.buffer, block.numBytes - mac.getMacLength(), block.numBytes);
+					final byte[] storedMac = Arrays.copyOfRange(block.buffer, block.buffer.length - mac.getMacLength(), block.buffer.length);
 					if (!MessageDigest.isEqual(calculatedMac, storedMac)) {
 						throw new MacAuthenticationFailedException("Content MAC authentication failed.");
 					}
@@ -490,11 +504,11 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 		final InputStream in = new SeekableByteChannelInputStream(encryptedFile);
 		final byte[] buffer = new byte[CONTENT_MAC_BLOCK + 32];
 		int n = 0;
-		int blockNumber = 0;
+		long blockNumber = 0;
 		try {
 			// read as many blocks from file as possible, but wait if queue is full:
 			while ((n = IOUtils.read(in, buffer)) > 0) {
-				final boolean consumedInTime = inputQueue.offer(new Block(n, Arrays.copyOf(buffer, n), blockNumber++), 1, TimeUnit.SECONDS);
+				final boolean consumedInTime = inputQueue.offer(new Block(Arrays.copyOf(buffer, n), blockNumber++), 1, TimeUnit.SECONDS);
 				if (!consumedInTime) {
 					// interrupt read loop and make room for some poisons:
 					inputQueue.clear();
@@ -506,8 +520,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 				inputQueue.put(CryptoWorker.POISON);
 			}
 		} catch (InterruptedException e) {
-			// TODO
-			e.printStackTrace();
+			LOG.error("Thread interrupted", e);
 		}
 
 		// wait for decryption workers to finish:
@@ -519,10 +532,13 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 			final Throwable cause = e.getCause();
 			if (cause instanceof IOException) {
 				throw (IOException) cause;
+			} else if (cause instanceof RuntimeException) {
+				throw (RuntimeException) cause;
+			} else {
+				LOG.error("Unexpected exception", e);
 			}
 		} catch (InterruptedException e) {
-			// TODO
-			e.printStackTrace();
+			LOG.error("Thread interrupted", e);
 		} finally {
 			// shutdown either after normal decryption or if ANY worker threw an exception:
 			executorService.shutdownNow();
@@ -653,10 +669,6 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 
 		// chosse 8 byte random nonce and 8 byte counter set to zero:
 		final byte[] nonce = randomData(8);
-		final ByteBuffer nonceAndCounterBuf = ByteBuffer.allocate(AES_BLOCK_LENGTH);
-		nonceAndCounterBuf.put(nonce);
-		nonceAndCounterBuf.putLong(0L);
-		final byte[] nonceAndCounter = nonceAndCounterBuf.array();
 
 		// choose a random content key:
 		final byte[] fileKeyBytes = randomData(32);
@@ -672,24 +684,88 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
 
 		// content encryption:
 		final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM);
-		final Cipher cipher = this.aesCtrCipher(fileKey, nonceAndCounter, Cipher.ENCRYPT_MODE);
-		final Mac contentMac = this.hmacSha256(hMacMasterKey);
-		@SuppressWarnings("resource")
-		final OutputStream out = new SeekableByteChannelOutputStream(encryptedFile);
+
+		// prepare some crypto workers:
+		final int numWorkers = Runtime.getRuntime().availableProcessors();
+		final Lock lock = new ReentrantLock();
+		final Condition blockDone = lock.newCondition();
+		final AtomicLong currentBlock = new AtomicLong();
+		final BlockingQueue<Block> inputQueue = new LinkedBlockingQueue<>(numWorkers);
+		final List<EncryptWorker> workers = new ArrayList<>();
+		final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers);
+		final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
+		for (int i = 0; i < numWorkers; i++) {
+			final EncryptWorker worker = new EncryptWorker(lock, blockDone, currentBlock, inputQueue, encryptedFile) {
+				private final Mac mac = hmacSha256(hMacMasterKey);
+
+				@Override
+				protected void encrypt(Block block, ByteBuffer output) {
+					final ByteBuffer nonceAndCounterBuf = ByteBuffer.allocate(AES_BLOCK_LENGTH);
+					nonceAndCounterBuf.put(nonce);
+					nonceAndCounterBuf.putLong(block.blockNumber * CONTENT_MAC_BLOCK / AES_BLOCK_LENGTH);
+					final byte[] nonceAndCounter = nonceAndCounterBuf.array();
+					final Cipher cipher = aesCtrCipher(fileKey, nonceAndCounter, Cipher.ENCRYPT_MODE);
+					try {
+						assert cipher.getOutputSize(block.buffer.length) == block.buffer.length;
+						cipher.update(ByteBuffer.wrap(block.buffer), output);
+					} catch (ShortBufferException e) {
+						throw new IllegalStateException("Output buffer size too short, even though outlen = inlen in CTR mode.", e);
+					}
+				}
+
+				@Override
+				protected byte[] mac(long blockNumber, ByteBuffer ciphertext) {
+					mac.update(iv);
+					mac.update(longToByteArray(blockNumber));
+					mac.update(ciphertext);
+					return mac.doFinal();
+				}
+			};
+			workers.add(worker);
+			completionService.submit(worker);
+		}
 
 		// writing ciphered output and MACs interleaved:
 		final byte[] buffer = new byte[CONTENT_MAC_BLOCK];
 		int n = 0;
-		long blockNum = 0;
-		while ((n = IOUtils.read(in, buffer)) > 0) {
-			final byte[] ciphertext = cipher.update(buffer, 0, n);
-			out.write(ciphertext);
-			contentMac.update(iv);
-			contentMac.update(longToByteArray(blockNum));
-			contentMac.update(ciphertext);
-			final byte[] mac = contentMac.doFinal();
-			out.write(mac);
-			blockNum++;
+		long blockNumber = 0;
+		try {
+			// read as many blocks from file as possible, but wait if queue is full:
+			while ((n = IOUtils.read(in, buffer)) > 0) {
+				final boolean consumedInTime = inputQueue.offer(new Block(Arrays.copyOf(buffer, n), blockNumber++), 1, TimeUnit.SECONDS);
+				if (!consumedInTime) {
+					// interrupt read loop and make room for some poisons:
+					inputQueue.clear();
+					break;
+				}
+			}
+			// each worker has to swallow some poison:
+			for (int i = 0; i < numWorkers; i++) {
+				inputQueue.put(CryptoWorker.POISON);
+			}
+		} catch (InterruptedException e) {
+			LOG.error("Thread interrupted", e);
+		}
+
+		// wait for encryption workers to finish:
+		try {
+			for (int i = 0; i < numWorkers; i++) {
+				completionService.take().get();
+			}
+		} catch (ExecutionException e) {
+			final Throwable cause = e.getCause();
+			if (cause instanceof IOException) {
+				throw (IOException) cause;
+			} else if (cause instanceof RuntimeException) {
+				throw (RuntimeException) cause;
+			} else {
+				LOG.error("Unexpected exception", e);
+			}
+		} catch (InterruptedException e) {
+			LOG.error("Thread interrupted", e);
+		} finally {
+			// shutdown either after normal encryption or if ANY worker threw an exception:
+			executorService.shutdownNow();
 		}
 		destroyQuietly(fileKey);
 

+ 1 - 3
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Block.java

@@ -2,12 +2,10 @@ package org.cryptomator.crypto.aes256;
 
 class Block {
 
-	final int numBytes;
 	final byte[] buffer;
 	final long blockNumber;
 
-	Block(int numBytes, byte[] buffer, long blockNumber) {
-		this.numBytes = numBytes;
+	Block(byte[] buffer, long blockNumber) {
 		this.buffer = buffer;
 		this.blockNumber = blockNumber;
 	}

+ 5 - 4
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorker.java

@@ -1,6 +1,7 @@
 package org.cryptomator.crypto.aes256;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
@@ -11,7 +12,7 @@ import org.cryptomator.crypto.exceptions.CryptingException;
 
 abstract class CryptoWorker implements Callable<Void> {
 
-	static final Block POISON = new Block(0, new byte[0], -1L);
+	static final Block POISON = new Block(new byte[0], -1L);
 
 	final Lock lock;
 	final Condition blockDone;
@@ -34,7 +35,7 @@ abstract class CryptoWorker implements Callable<Void> {
 					// put poison back in for other threads:
 					break;
 				}
-				final byte[] processedBytes = this.process(block);
+				final ByteBuffer processedBytes = this.process(block);
 				lock.lock();
 				try {
 					while (currentBlock.get() != block.blockNumber) {
@@ -56,8 +57,8 @@ abstract class CryptoWorker implements Callable<Void> {
 		return null;
 	}
 
-	protected abstract byte[] process(Block block) throws CryptingException;
+	protected abstract ByteBuffer process(Block block) throws CryptingException;
 
-	protected abstract void write(byte[] processedBytes) throws IOException;
+	protected abstract void write(ByteBuffer processedBytes) throws IOException;
 
 }

+ 9 - 7
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/DecryptWorker.java

@@ -1,7 +1,8 @@
 package org.cryptomator.crypto.aes256;
 
 import java.io.IOException;
-import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -14,17 +15,17 @@ import org.cryptomator.crypto.exceptions.MacAuthenticationFailedException;
 abstract class DecryptWorker extends CryptoWorker implements AesCryptographicConfiguration {
 
 	private final boolean shouldAuthenticate;
-	private final OutputStream out;
+	private final WritableByteChannel out;
 
-	public DecryptWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<Block> queue, boolean shouldAuthenticate, OutputStream out) {
+	public DecryptWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<Block> queue, boolean shouldAuthenticate, WritableByteChannel out) {
 		super(lock, blockDone, currentBlock, queue);
 		this.shouldAuthenticate = shouldAuthenticate;
 		this.out = out;
 	}
 
 	@Override
-	protected byte[] process(Block block) throws CryptingException {
-		if (block.numBytes < 32) {
+	protected ByteBuffer process(Block block) throws CryptingException {
+		if (block.buffer.length < 32) {
 			throw new DecryptFailedException("Invalid file content, missing MAC.");
 		}
 
@@ -38,12 +39,13 @@ abstract class DecryptWorker extends CryptoWorker implements AesCryptographicCon
 	}
 
 	@Override
-	protected void write(byte[] processedBytes) throws IOException {
+	protected void write(ByteBuffer processedBytes) throws IOException {
+		processedBytes.flip();
 		out.write(processedBytes);
 	}
 
 	protected abstract void checkMac(Block block) throws MacAuthenticationFailedException;
 
-	protected abstract byte[] decrypt(Block block);
+	protected abstract ByteBuffer decrypt(Block block);
 
 }

+ 43 - 0
main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/EncryptWorker.java

@@ -0,0 +1,43 @@
+package org.cryptomator.crypto.aes256;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import org.cryptomator.crypto.exceptions.CryptingException;
+
+abstract class EncryptWorker extends CryptoWorker implements AesCryptographicConfiguration {
+
+	private final WritableByteChannel out;
+
+	public EncryptWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<Block> queue, WritableByteChannel out) {
+		super(lock, blockDone, currentBlock, queue);
+		this.out = out;
+	}
+
+	@Override
+	protected ByteBuffer process(Block block) throws CryptingException {
+		final ByteBuffer buf = ByteBuffer.allocateDirect(block.buffer.length + 32);
+		encrypt(block, buf);
+		final ByteBuffer ciphertextBuffer = buf.duplicate();
+		ciphertextBuffer.flip();
+		final byte[] mac = mac(block.blockNumber, ciphertextBuffer);
+		buf.put(mac);
+		return buf;
+	}
+
+	@Override
+	protected void write(ByteBuffer processedBytes) throws IOException {
+		processedBytes.flip();
+		out.write(processedBytes);
+	}
+
+	protected abstract byte[] mac(long blockNumber, ByteBuffer ciphertext);
+
+	protected abstract void encrypt(Block block, ByteBuffer ciphertext);
+
+}