|
@@ -7,61 +7,51 @@ import org.cryptomator.cryptofs.event.ConflictResolutionFailedEvent;
|
|
|
import org.cryptomator.cryptofs.event.ConflictResolvedEvent;
|
|
|
import org.cryptomator.cryptofs.event.DecryptionFailedEvent;
|
|
|
import org.cryptomator.cryptofs.event.FilesystemEvent;
|
|
|
+import org.jetbrains.annotations.NotNull;
|
|
|
|
|
|
import javax.inject.Inject;
|
|
|
import javax.inject.Singleton;
|
|
|
-import javafx.application.Platform;
|
|
|
-import javafx.collections.FXCollections;
|
|
|
-import javafx.collections.MapChangeListener;
|
|
|
-import javafx.collections.ObservableMap;
|
|
|
import java.nio.file.Path;
|
|
|
-import java.util.List;
|
|
|
-import java.util.TreeSet;
|
|
|
+import java.time.Instant;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.ConcurrentMap;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
+/**
|
|
|
+ * Angenommen:
|
|
|
+ * Datenstruktur die
|
|
|
+ * 1. Thread-Safe ist
|
|
|
+ * ??
|
|
|
+ *
|
|
|
+ *
|
|
|
+ *
|
|
|
+ *
|
|
|
+ * 1. Wenn ein Set verwendet wird, dann können wir nach Timestamp sortieren, aber wir können einen Eintrag nur durch entfernen und hinzufügen updaten
|
|
|
+ * 2. Wenn eine Map verwendet wird, dann können wir Einträge updaten. Aber
|
|
|
+ *
|
|
|
+ */
|
|
|
+//TODO: Rename to aggregator
|
|
|
+//TODO: lru cache
|
|
|
@Singleton
|
|
|
public class FileSystemEventRegistry {
|
|
|
|
|
|
private static final int MAX_MAP_SIZE = 400;
|
|
|
|
|
|
- public record Key(Vault vault, Path idPath, Class<? extends FilesystemEvent> c) {}
|
|
|
+ public record Key(Vault vault, Path idPath, Class<? extends FilesystemEvent> c) {};
|
|
|
|
|
|
public record Value(FilesystemEvent mostRecentEvent, int count) {}
|
|
|
|
|
|
- /**
|
|
|
- * Queue of elements to be inserted into the map
|
|
|
- */
|
|
|
- private final ConcurrentMap<Key, Value> queue;
|
|
|
- /**
|
|
|
- * Least-recently-used cache.
|
|
|
- */
|
|
|
- private final TreeSet<Key> lruCache;
|
|
|
- /**
|
|
|
- * Actual map
|
|
|
- */
|
|
|
- private final ObservableMap<Key, Value> map;
|
|
|
-
|
|
|
- private final ScheduledExecutorService scheduler;
|
|
|
-
|
|
|
- private final AtomicBoolean queueHasElements;
|
|
|
+ private final ConcurrentHashMap<Key, Value> map;
|
|
|
+ private final AtomicBoolean hasUpdates;
|
|
|
|
|
|
@Inject
|
|
|
public FileSystemEventRegistry(ScheduledExecutorService scheduledExecutorService) {
|
|
|
- this.queue = new ConcurrentHashMap<>();
|
|
|
- this.lruCache = new TreeSet<>(this::compareKeys);
|
|
|
- this.map = FXCollections.observableHashMap();
|
|
|
- this.scheduler = scheduledExecutorService;
|
|
|
- this.queueHasElements = new AtomicBoolean(false);
|
|
|
- scheduler.scheduleWithFixedDelay(() -> {
|
|
|
- if (queueHasElements.get()) {
|
|
|
- flush();
|
|
|
- }
|
|
|
- }, 1000, 1000, TimeUnit.MILLISECONDS);
|
|
|
+ this.map = new ConcurrentHashMap<>();
|
|
|
+ this.hasUpdates = new AtomicBoolean(false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -70,9 +60,9 @@ public class FileSystemEventRegistry {
|
|
|
* @param v Vault where the event occurred
|
|
|
* @param e Actual {@link FilesystemEvent}
|
|
|
*/
|
|
|
- public synchronized void enque(Vault v, FilesystemEvent e) {
|
|
|
+ public synchronized void enqueue(Vault v, FilesystemEvent e) {
|
|
|
var key = computeKey(v, e);
|
|
|
- queue.compute(key, (k, val) -> {
|
|
|
+ map.compute(key, (k, val) -> {
|
|
|
if (val == null) {
|
|
|
return new Value(e, 1);
|
|
|
} else {
|
|
@@ -80,45 +70,21 @@ public class FileSystemEventRegistry {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- queueHasElements.set(true);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Lists all entries in this map as {@link FileSystemEventBucket}. The list is sorted ascending by the timestamp of event occurral (and more if it is the same timestamp).
|
|
|
- * Must be executed on the JavaFX application thread
|
|
|
- *
|
|
|
- * @return a list of vault events, mainly sorted ascending by the event timestamp
|
|
|
- * @implNote Method is not synchronized, because it is only executed if executed by JavaFX application thread
|
|
|
- */
|
|
|
- public List<FileSystemEventBucket> listAll() {
|
|
|
- if (!Platform.isFxApplicationThread()) {
|
|
|
- throw new IllegalStateException("Listing map entries must be performed on JavaFX application thread");
|
|
|
- }
|
|
|
- return lruCache.stream().map(key -> {
|
|
|
- var value = map.get(key);
|
|
|
- return new FileSystemEventBucket(key.vault(), value.mostRecentEvent(), value.count());
|
|
|
- }).toList();
|
|
|
+ hasUpdates.set(true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Removes an event from the map.
|
|
|
* <p>
|
|
|
* To identify the event, a similar event (in the sense of map key) is given.
|
|
|
- * Must be executed on the JavaFX application thread
|
|
|
*
|
|
|
- * @param v Vault where the event occurred
|
|
|
- * @param similar A similar {@link FilesystemEvent} (same class, same idPath)
|
|
|
* @return the removed {@link Value}
|
|
|
* @implNote Method is not synchronized, because it is only executed if executed by JavaFX application thread
|
|
|
*/
|
|
|
- public Value remove(Vault v, FilesystemEvent similar) {
|
|
|
- if (!Platform.isFxApplicationThread()) {
|
|
|
- throw new IllegalStateException("Map removal must be performed on JavaFX application thread");
|
|
|
- }
|
|
|
- var key = computeKey(v, similar);
|
|
|
- lruCache.remove(key);
|
|
|
- return map.remove(key);
|
|
|
+ public Value remove(Key key) {
|
|
|
+ var result = map.remove(key);
|
|
|
+ hasUpdates.set(true);
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -129,93 +95,8 @@ public class FileSystemEventRegistry {
|
|
|
* @implNote Method is not synchronized, because it is only executed if executed by JavaFX application thread
|
|
|
*/
|
|
|
public void clear() {
|
|
|
- if (!Platform.isFxApplicationThread()) {
|
|
|
- throw new IllegalStateException("Map removal must be performed on JavaFX application thread");
|
|
|
- }
|
|
|
- lruCache.clear();
|
|
|
map.clear();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Flushes all changes from the queue into the map
|
|
|
- */
|
|
|
- private synchronized void flush() {
|
|
|
- //Lock queue
|
|
|
- var latch = new CountDownLatch(1);
|
|
|
- Platform.runLater(() -> {
|
|
|
- queue.forEach(this::updateMap);
|
|
|
- queue.clear();
|
|
|
- latch.countDown();
|
|
|
- });
|
|
|
- try {
|
|
|
- latch.await();
|
|
|
- queueHasElements.set(false);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Updates a single map entry
|
|
|
- *
|
|
|
- * @param k Key of the entry to update
|
|
|
- * @param v Value of the entry to update
|
|
|
- * @implNote Method is not synchronized, because it is only called on the (one-and-only) JavaFX application thread
|
|
|
- */
|
|
|
- private void updateMap(Key k, Value v) {
|
|
|
- var entry = map.get(k);
|
|
|
- if (entry == null) {
|
|
|
- if (map.size() == MAX_MAP_SIZE) {
|
|
|
- var toRemove = lruCache.first();
|
|
|
- lruCache.remove(toRemove);
|
|
|
- map.remove(toRemove);
|
|
|
- }
|
|
|
- map.put(k, v);
|
|
|
- lruCache.add(k);
|
|
|
- } else {
|
|
|
- lruCache.remove(k);
|
|
|
- map.put(k, new Value(v.mostRecentEvent, entry.count + v.count));
|
|
|
- lruCache.add(k); //correct, because cache-sorting uses the map in comparsionMethod
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* Observability */
|
|
|
-
|
|
|
- public void addListener(MapChangeListener<? super Key, ? super Value> mapChangeListener) {
|
|
|
- map.addListener(mapChangeListener);
|
|
|
- }
|
|
|
-
|
|
|
- public void removeListener(MapChangeListener<? super Key, ? super Value> mapChangeListener) {
|
|
|
- map.removeListener(mapChangeListener);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /* Internal stuff */
|
|
|
-
|
|
|
- /**
|
|
|
- * Comparsion method for the lru cache. During comparsion the map is accessed.
|
|
|
- * First the entries are compared by the event timestamp, then vaultId, then identifying path and lastly by class name.
|
|
|
- *
|
|
|
- * @param left a {@link Key} object
|
|
|
- * @param right another {@link Key} object, compared to {@code left}
|
|
|
- * @return a negative integer, zero, or a positive integer as the first argument is less than, equal to, or greater than the second.
|
|
|
- */
|
|
|
- private int compareKeys(Key left, Key right) {
|
|
|
- var t1 = map.get(left).mostRecentEvent.getTimestamp();
|
|
|
- var t2 = map.get(right).mostRecentEvent.getTimestamp();
|
|
|
- var timeComparsion = t1.compareTo(t2);
|
|
|
- if (timeComparsion != 0) {
|
|
|
- return timeComparsion;
|
|
|
- }
|
|
|
- var vaultIdComparsion = left.vault.getId().compareTo(right.vault.getId());
|
|
|
- if (vaultIdComparsion != 0) {
|
|
|
- return vaultIdComparsion;
|
|
|
- }
|
|
|
- var pathComparsion = left.idPath.compareTo(right.idPath);
|
|
|
- if (pathComparsion != 0) {
|
|
|
- return pathComparsion;
|
|
|
- }
|
|
|
- return left.c.getName().compareTo(right.c.getName());
|
|
|
+ hasUpdates.set(true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -236,4 +117,20 @@ public class FileSystemEventRegistry {
|
|
|
return new Key(v, p, event.getClass());
|
|
|
}
|
|
|
|
|
|
+ public boolean hasUpdates() {
|
|
|
+ return hasUpdates.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Clones the map entries into a set.
|
|
|
+ * <p>
|
|
|
+ * The set is first cleared, then all map entries are added in one bulk operation. Sets the updates status of the event registry.
|
|
|
+ *
|
|
|
+ * @param targetCollection
|
|
|
+ */
|
|
|
+ public void cloneTo(Collection<Map.Entry<Key, Value>> targetCollection) {
|
|
|
+ targetCollection.clear();
|
|
|
+ targetCollection.addAll(map.entrySet());
|
|
|
+ hasUpdates.set(false);
|
|
|
+ }
|
|
|
}
|