瀏覽代碼

Fixed NioFileSystemIntegrationTests on windows

* Streams returned from NioFolder#children, files and folders are now
closed automatically after a terminal operation
* Not closing them lead to a bug on windows causing directories to be
not deleted after a successful Files.delete invocation
Markus Kreusch 9 年之前
父節點
當前提交
9c844e626a

+ 173 - 0
main/commons/src/main/java/org/cryptomator/common/AutoClosingStream.java

@@ -0,0 +1,173 @@
+package org.cryptomator.common;
+
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+/**
+ * <p>
+ * A Stream which is automatically closed on after execution of a terminal operation.
+ * <p>
+ * The stream keeps its auto closing behavior on invocations of intermediate operations as long as no conversion to an {@link IntStream}, {@link LongStream} or {@link DoubleStream} occurs.
+ * 
+ * @author Markus Kreusch
+ */
+public final class AutoClosingStream<T> extends DelegatingStream<T> {
+
+	public static <T> Stream<T> from(Stream<T> delegate) {
+		return new AutoClosingStream<>(delegate);
+	}
+
+	@SuppressWarnings("unchecked")
+	private AutoClosingStream(Stream<T> delegate) {
+		super(delegate, AutoClosingStream::new);
+	}
+
+	public void forEach(Consumer<? super T> action) {
+		try {
+			super.forEach(action);
+		} finally {
+			close();
+		}
+	}
+
+	public void forEachOrdered(Consumer<? super T> action) {
+		try {
+			super.forEachOrdered(action);
+		} finally {
+			close();
+		}
+	}
+
+	public Object[] toArray() {
+		try {
+			return super.toArray();
+		} finally {
+			close();
+		}
+	}
+
+	public <A> A[] toArray(IntFunction<A[]> generator) {
+		try {
+			return super.toArray(generator);
+		} finally {
+			close();
+		}
+	}
+
+	public T reduce(T identity, BinaryOperator<T> accumulator) {
+		try {
+			return super.reduce(identity, accumulator);
+		} finally {
+			close();
+		}
+	}
+
+	public Optional<T> reduce(BinaryOperator<T> accumulator) {
+		try {
+			return super.reduce(accumulator);
+		} finally {
+			close();
+		}
+	}
+
+	public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
+		try {
+			return super.reduce(identity, accumulator, combiner);
+		} finally {
+			close();
+		}
+	}
+
+	public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
+		try {
+			return super.collect(supplier, accumulator, combiner);
+		} finally {
+			close();
+		}
+	}
+
+	public <R, A> R collect(Collector<? super T, A, R> collector) {
+		try {
+			return super.collect(collector);
+		} finally {
+			close();
+		}
+	}
+
+	public Optional<T> min(Comparator<? super T> comparator) {
+		try {
+			return super.min(comparator);
+		} finally {
+			close();
+		}
+	}
+
+	public Optional<T> max(Comparator<? super T> comparator) {
+		try {
+			return super.max(comparator);
+		} finally {
+			close();
+		}
+	}
+
+	public long count() {
+		try {
+			return super.count();
+		} finally {
+			close();
+		}
+	}
+
+	public boolean anyMatch(Predicate<? super T> predicate) {
+		try {
+			return super.anyMatch(predicate);
+		} finally {
+			close();
+		}
+	}
+
+	public boolean allMatch(Predicate<? super T> predicate) {
+		try {
+			return super.allMatch(predicate);
+		} finally {
+			close();
+		}
+	}
+
+	public boolean noneMatch(Predicate<? super T> predicate) {
+		try {
+			return super.noneMatch(predicate);
+		} finally {
+			close();
+		}
+	}
+
+	public Optional<T> findFirst() {
+		try {
+			return super.findFirst();
+		} finally {
+			close();
+		}
+	}
+
+	public Optional<T> findAny() {
+		try {
+			return super.findAny();
+		} finally {
+			close();
+		}
+	}
+
+}

+ 208 - 0
main/commons/src/main/java/org/cryptomator/common/DelegatingStream.java

@@ -0,0 +1,208 @@
+package org.cryptomator.common;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.function.ToDoubleFunction;
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collector;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+abstract class DelegatingStream<T> implements Stream<T> {
+
+	private final Stream<T> delegate;
+	private final StreamWrapper wrapper;
+
+	protected DelegatingStream(Stream<T> delegate, StreamWrapper wrapper) {
+		this.delegate = delegate;
+		this.wrapper = wrapper;
+	}
+
+	private <S> Stream<S> wrapped(Stream<S> other) {
+		if (getClass().isInstance(other)) {
+			return other;
+		} else {
+			return wrapper.wrap(other);
+		}
+	}
+
+	public Iterator<T> iterator() {
+		return delegate.iterator();
+	}
+
+	public Spliterator<T> spliterator() {
+		return delegate.spliterator();
+	}
+
+	public boolean isParallel() {
+		return delegate.isParallel();
+	}
+
+	public Stream<T> sequential() {
+		return wrapped(delegate.sequential());
+	}
+
+	public Stream<T> parallel() {
+		return wrapped(delegate.parallel());
+	}
+
+	public Stream<T> unordered() {
+		return wrapped(delegate.unordered());
+	}
+
+	public Stream<T> onClose(Runnable closeHandler) {
+		return wrapped(delegate.onClose(closeHandler));
+	}
+
+	public void close() {
+		delegate.close();
+	}
+
+	public Stream<T> filter(Predicate<? super T> predicate) {
+		return wrapped(delegate.filter(predicate));
+	}
+
+	public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
+		return wrapped(delegate.map(mapper));
+	}
+
+	public IntStream mapToInt(ToIntFunction<? super T> mapper) {
+		return delegate.mapToInt(mapper);
+	}
+
+	public LongStream mapToLong(ToLongFunction<? super T> mapper) {
+		return delegate.mapToLong(mapper);
+	}
+
+	public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
+		return delegate.mapToDouble(mapper);
+	}
+
+	public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
+		return wrapped(delegate.flatMap(mapper));
+	}
+
+	public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
+		return delegate.flatMapToInt(mapper);
+	}
+
+	public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
+		return delegate.flatMapToLong(mapper);
+	}
+
+	public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
+		return delegate.flatMapToDouble(mapper);
+	}
+
+	public Stream<T> distinct() {
+		return wrapped(delegate.distinct());
+	}
+
+	public Stream<T> sorted() {
+		return wrapped(delegate.sorted());
+	}
+
+	public Stream<T> sorted(Comparator<? super T> comparator) {
+		return wrapped(delegate.sorted(comparator));
+	}
+
+	public Stream<T> peek(Consumer<? super T> action) {
+		return wrapped(delegate.peek(action));
+	}
+
+	public Stream<T> limit(long maxSize) {
+		return wrapped(delegate.limit(maxSize));
+	}
+
+	public Stream<T> skip(long n) {
+		return wrapped(delegate.skip(n));
+	}
+
+	public void forEach(Consumer<? super T> action) {
+		delegate.forEach(action);
+	}
+
+	public void forEachOrdered(Consumer<? super T> action) {
+		delegate.forEachOrdered(action);
+	}
+
+	public Object[] toArray() {
+		return delegate.toArray();
+	}
+
+	public <A> A[] toArray(IntFunction<A[]> generator) {
+		return delegate.toArray(generator);
+	}
+
+	public T reduce(T identity, BinaryOperator<T> accumulator) {
+		return delegate.reduce(identity, accumulator);
+	}
+
+	public Optional<T> reduce(BinaryOperator<T> accumulator) {
+		return delegate.reduce(accumulator);
+	}
+
+	public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
+		return delegate.reduce(identity, accumulator, combiner);
+	}
+
+	public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
+		return delegate.collect(supplier, accumulator, combiner);
+	}
+
+	public <R, A> R collect(Collector<? super T, A, R> collector) {
+		return delegate.collect(collector);
+	}
+
+	public Optional<T> min(Comparator<? super T> comparator) {
+		return delegate.min(comparator);
+	}
+
+	public Optional<T> max(Comparator<? super T> comparator) {
+		return delegate.max(comparator);
+	}
+
+	public long count() {
+		return delegate.count();
+	}
+
+	public boolean anyMatch(Predicate<? super T> predicate) {
+		return delegate.anyMatch(predicate);
+	}
+
+	public boolean allMatch(Predicate<? super T> predicate) {
+		return delegate.allMatch(predicate);
+	}
+
+	public boolean noneMatch(Predicate<? super T> predicate) {
+		return delegate.noneMatch(predicate);
+	}
+
+	public Optional<T> findFirst() {
+		return delegate.findFirst();
+	}
+
+	public Optional<T> findAny() {
+		return delegate.findAny();
+	}
+
+	public interface StreamWrapper {
+
+		<S> Stream<S> wrap(Stream<S> other);
+
+	}
+
+}

+ 59 - 0
main/commons/src/test/java/org/cryptomator/common/AutoClosingStreamTest.java

@@ -0,0 +1,59 @@
+package org.cryptomator.common;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+@SuppressWarnings("unchecked")
+public class AutoClosingStreamTest {
+
+	private Stream<Object> delegate;
+	private Stream<Object> inTest;
+
+	@Before
+	public void setUp() {
+		delegate = mock(Stream.class);
+		inTest = AutoClosingStream.from(delegate);
+	}
+
+	@Test
+	public void testSequentialReturnsNewAutoClosingStream() {
+		Stream<Object> newDelegate = mock(Stream.class);
+		when(delegate.sequential()).thenReturn(newDelegate);
+
+		Stream<Object> result = inTest.sequential();
+
+		assertThat(result, is(instanceOf(AutoClosingStream.class)));
+		verifyDelegate(result, newDelegate);
+	}
+
+	@Test
+	public void testForEachDelegatesToAndClosesDelegate() {
+		Consumer<Object> consumer = mock(Consumer.class);
+
+		inTest.forEach(consumer);
+
+		InOrder inOrder = inOrder(delegate);
+		inOrder.verify(delegate).forEach(consumer);
+		inOrder.verify(delegate).close();
+	}
+
+	private void verifyDelegate(Stream<Object> result, Stream<Object> newDelegate) {
+		result.close();
+		verify(newDelegate).close();
+	}
+
+	// TODO Markus Kreusch test additional methods
+
+}

+ 3 - 1
main/filesystem-nio/src/main/java/org/cryptomator/filesystem/nio/NioFolder.java

@@ -9,6 +9,7 @@ import java.time.Instant;
 import java.util.Optional;
 import java.util.stream.Stream;
 
+import org.cryptomator.common.AutoClosingStream;
 import org.cryptomator.common.WeakValuedCache;
 import org.cryptomator.filesystem.File;
 import org.cryptomator.filesystem.Folder;
@@ -27,7 +28,7 @@ class NioFolder extends NioNode implements Folder {
 	@Override
 	public Stream<? extends Node> children() throws UncheckedIOException {
 		try {
-			return nioAccess.list(path).map(this::childPathToNode);
+			return AutoClosingStream.from(nioAccess.list(path).map(this::childPathToNode));
 		} catch (IOException e) {
 			throw new UncheckedIOException(e);
 		}
@@ -130,6 +131,7 @@ class NioFolder extends NioNode implements Folder {
 		if (!exists()) {
 			return;
 		}
+
 		folders().forEach(Folder::delete);
 		files().forEach(NioFolder::deleteFile);
 		try {

+ 10 - 0
main/filesystem-nio/src/test/java/org/cryptomator/filesystem/nio/NioFolderTest.java

@@ -5,6 +5,7 @@ import static java.util.stream.Collectors.toList;
 import static org.cryptomator.common.test.matcher.ContainsMatcher.contains;
 import static org.cryptomator.filesystem.nio.ReflectiveClassMatchers.aClassThatDoesDeclareMethod;
 import static org.cryptomator.filesystem.nio.ReflectiveClassMatchers.aClassThatDoesNotDeclareMethod;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.theInstance;
 import static org.junit.Assert.assertThat;
@@ -23,6 +24,7 @@ import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
+import org.cryptomator.common.AutoClosingStream;
 import org.cryptomator.filesystem.File;
 import org.cryptomator.filesystem.FileSystem;
 import org.cryptomator.filesystem.Folder;
@@ -75,6 +77,14 @@ public class NioFolderTest {
 
 	public class ChildrenTests {
 
+		@Test
+		public void testChildrenReturnsAnAutoClosingStream() throws IOException {
+			Stream<Path> childrenPaths = Stream.<Path>builder().build();
+			when(nioAccess.list(path)).thenReturn(childrenPaths);
+
+			assertThat(inTest.children(), is(instanceOf(AutoClosingStream.class)));
+		}
+
 		@Test
 		public void testChildrenConvertsPathWhichIsADirectoryToAnNioFolderUsingTheInstanceFactory() throws IOException {
 			Path childFolderPath = mock(Path.class);