|
@@ -9,6 +9,7 @@ import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.ServerSocket;
|
|
|
+import java.net.Socket;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.ReadableByteChannel;
|
|
|
import java.nio.channels.WritableByteChannel;
|
|
@@ -56,11 +57,9 @@ abstract class InterProcessCommunicator implements InterProcessCommunicationProt
|
|
|
// visible for testing
|
|
|
static InterProcessCommunicator start(Path portFilePath, InterProcessCommunicationProtocol endpoint) throws IOException {
|
|
|
System.setProperty("java.rmi.server.hostname", "localhost");
|
|
|
- // try to connect to existing server:
|
|
|
- int port = readPort(portFilePath);
|
|
|
- LOG.debug("Connecting to running process on TCP port {}...", port);
|
|
|
try {
|
|
|
- ClientCommunicator client = new ClientCommunicator(port);
|
|
|
+ // try to connect to existing server:
|
|
|
+ ClientCommunicator client = new ClientCommunicator(portFilePath);
|
|
|
LOG.trace("Connected to running process.");
|
|
|
return client;
|
|
|
} catch (ConnectException | ConnectIOException | NotBoundException e) {
|
|
@@ -70,8 +69,7 @@ abstract class InterProcessCommunicator implements InterProcessCommunicationProt
|
|
|
|
|
|
// spawn a new server:
|
|
|
LOG.trace("Spawning new server...");
|
|
|
- ServerCommunicator server = new ServerCommunicator(endpoint);
|
|
|
- writePort(portFilePath, server.getPort());
|
|
|
+ ServerCommunicator server = new ServerCommunicator(endpoint, portFilePath);
|
|
|
LOG.debug("Server listening on port {}.", server.getPort());
|
|
|
return server;
|
|
|
}
|
|
@@ -98,12 +96,30 @@ abstract class InterProcessCommunicator implements InterProcessCommunicationProt
|
|
|
|
|
|
private final IpcProtocolRemote remote;
|
|
|
|
|
|
- private ClientCommunicator(int port) throws ConnectException, NotBoundException, RemoteException {
|
|
|
- if (port == 0) {
|
|
|
- throw new ConnectException("Can not connect to port 0.");
|
|
|
+ private ClientCommunicator(Path portFilePath) throws ConnectException, NotBoundException, RemoteException {
|
|
|
+ if (Files.notExists(portFilePath)) {
|
|
|
+ throw new ConnectException("No IPC port file.");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ int port = ClientCommunicator.readPort(portFilePath);
|
|
|
+ LOG.debug("Connecting to port {}...");
|
|
|
+ Registry registry = LocateRegistry.getRegistry("localhost", port, new ClientSocketFactory());
|
|
|
+ this.remote = (IpcProtocolRemote) registry.lookup(RMI_NAME);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ConnectException("Error reading IPC port file.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static int readPort(Path portFilePath) throws IOException {
|
|
|
+ ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES);
|
|
|
+ try (ReadableByteChannel ch = Files.newByteChannel(portFilePath, StandardOpenOption.READ)) {
|
|
|
+ if (ch.read(buf) == Integer.BYTES) {
|
|
|
+ buf.flip();
|
|
|
+ return buf.getInt();
|
|
|
+ } else {
|
|
|
+ throw new IOException("Invalid IPC port file.");
|
|
|
+ }
|
|
|
}
|
|
|
- Registry registry = LocateRegistry.getRegistry("localhost", port);
|
|
|
- this.remote = (IpcProtocolRemote) registry.lookup(RMI_NAME);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -132,8 +148,9 @@ abstract class InterProcessCommunicator implements InterProcessCommunicationProt
|
|
|
private final ServerSocket socket;
|
|
|
private final Registry registry;
|
|
|
private final IpcProtocolRemoteImpl remote;
|
|
|
+ private final Path portFilePath;
|
|
|
|
|
|
- private ServerCommunicator(InterProcessCommunicationProtocol delegate) throws IOException {
|
|
|
+ private ServerCommunicator(InterProcessCommunicationProtocol delegate, Path portFilePath) throws IOException {
|
|
|
this.socket = new ServerSocket(0, Byte.MAX_VALUE, InetAddress.getByName("localhost"));
|
|
|
RMIClientSocketFactory csf = RMISocketFactory.getDefaultSocketFactory();
|
|
|
SingletonServerSocketFactory ssf = new SingletonServerSocketFactory(socket);
|
|
@@ -141,6 +158,20 @@ abstract class InterProcessCommunicator implements InterProcessCommunicationProt
|
|
|
this.remote = new IpcProtocolRemoteImpl(delegate);
|
|
|
UnicastRemoteObject.exportObject(remote, 0);
|
|
|
registry.rebind(RMI_NAME, remote);
|
|
|
+ this.portFilePath = portFilePath;
|
|
|
+ ServerCommunicator.writePort(portFilePath, socket.getLocalPort());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void writePort(Path portFilePath, int port) throws IOException {
|
|
|
+ ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES);
|
|
|
+ buf.putInt(port);
|
|
|
+ buf.flip();
|
|
|
+ MoreFiles.createParentDirectories(portFilePath);
|
|
|
+ try (WritableByteChannel ch = Files.newByteChannel(portFilePath, StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
|
|
|
+ if (ch.write(buf) != Integer.BYTES) {
|
|
|
+ throw new IOException("Did not write expected number of bytes.");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -163,6 +194,7 @@ abstract class InterProcessCommunicator implements InterProcessCommunicationProt
|
|
|
registry.unbind(RMI_NAME);
|
|
|
UnicastRemoteObject.unexportObject(remote, true);
|
|
|
socket.close();
|
|
|
+ Files.deleteIfExists(portFilePath);
|
|
|
LOG.debug("Server shut down.");
|
|
|
} catch (NotBoundException | IOException e) {
|
|
|
LOG.warn("Failed to close IPC Server.", e);
|
|
@@ -211,31 +243,18 @@ abstract class InterProcessCommunicator implements InterProcessCommunicationProt
|
|
|
|
|
|
}
|
|
|
|
|
|
- private static int readPort(Path path) throws IOException {
|
|
|
- if (Files.notExists(path)) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES);
|
|
|
- try (ReadableByteChannel ch = Files.newByteChannel(path, StandardOpenOption.READ)) {
|
|
|
- if (ch.read(buf) == Integer.BYTES) {
|
|
|
- buf.flip();
|
|
|
- return buf.getInt();
|
|
|
- } else {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Creates client sockets with short timeouts.
|
|
|
+ */
|
|
|
+ private static class ClientSocketFactory implements RMIClientSocketFactory {
|
|
|
|
|
|
- private static void writePort(Path path, int port) throws IOException {
|
|
|
- ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES);
|
|
|
- buf.putInt(port);
|
|
|
- buf.flip();
|
|
|
- MoreFiles.createParentDirectories(path);
|
|
|
- try (WritableByteChannel ch = Files.newByteChannel(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
|
|
|
- if (ch.write(buf) != Integer.BYTES) {
|
|
|
- throw new IOException("Did not write expected number of bytes.");
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public Socket createSocket(String host, int port) throws IOException {
|
|
|
+ Socket socket = new Socket(host, port);
|
|
|
+ socket.setSoTimeout(1000); // 1s
|
|
|
+ return socket;
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
}
|