Skip to content

Commit

Permalink
FLINK-16697][metrics][jmx] Disable rebinding
Browse files Browse the repository at this point in the history
  • Loading branch information
coheigea authored and zentol committed Apr 9, 2020
1 parent 61cb1fb commit 48ca137
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,25 @@
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
import javax.management.remote.rmi.RMIConnectorServer;
import javax.management.remote.rmi.RMIJRMPServerImpl;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.rmi.NoSuchObjectException;
import java.rmi.registry.LocateRegistry;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
Expand Down Expand Up @@ -474,42 +478,31 @@ public long getCount() {
/**
* JMX Server implementation that JMX clients can connect to.
*
* <p>Heavily based on j256 simplejmx project
* <p>Originally based on j256 simplejmx project
*
* <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
*/
private static class JMXServer {
private Registry rmiRegistry;
private JMXConnectorServer connector;
private int port;
private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();

public void start(int port) throws IOException {
if (rmiRegistry != null && connector != null) {
LOG.debug("JMXServer is already running.");
return;
}
startRmiRegistry(port);
startJmxService(port);
internalStart(port);
this.port = port;
}

/**
* Starts an RMI Registry that allows clients to lookup the JMX IP/port.
*
* @param port rmi port to use
* @throws IOException
*/
private void startRmiRegistry(int port) throws IOException {
rmiRegistry = LocateRegistry.createRegistry(port);
}
private void internalStart(int port) throws IOException {
rmiServerReference.set(null);

// this allows clients to lookup the JMX service
rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);

/**
* Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations.
*
* @param port jmx port to use
* @throws IOException
*/
private void startJmxService(int port) throws IOException {
String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
JMXServiceURL url;
try {
Expand All @@ -518,12 +511,20 @@ private void startJmxService(int port) throws IOException {
throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
}

connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);

connector = new RMIConnectorServer(url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
connector.start();

// we can't pass the created stub directly to the registry since this would form a cyclic dependency:
// - you can only start the connector after the registry was started
// - you can only create the stub after the connector was started
// - you can only start the registry after the stub was created
rmiServerReference.set(rmiServer.toStub());
}

public void stop() throws IOException {
rmiServerReference.set(null);
if (connector != null) {
try {
connector.stop();
Expand All @@ -541,5 +542,51 @@ public void stop() throws IOException {
}
}
}

/**
* A registry that only exposes a single remote object.
*/
@SuppressWarnings("restriction")
private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
private final String lookupName;
private final AtomicReference<Remote> remoteServerStub;

JmxRegistry(final int port, final String lookupName, final AtomicReference<Remote> remoteServerStub) throws RemoteException {
super(port);
this.lookupName = lookupName;
this.remoteServerStub = remoteServerStub;
}

@Override
public Remote lookup(String s) throws NotBoundException {
if (lookupName.equals(s)) {
final Remote remote = remoteServerStub.get();
if (remote != null) {
return remote;
}
}
throw new NotBoundException("Not bound.");
}

@Override
public void bind(String s, Remote remote) {
// this is called from RMIConnectorServer#start; don't throw a general AccessException
}

@Override
public void unbind(String s) {
// this is called from RMIConnectorServer#stop; don't throw a general AccessException
}

@Override
public void rebind(String s, Remote remote) {
// might as well not throw an exception here given that the others don't
}

@Override
public String[] list() {
return new String[]{lookupName};
}
}
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,7 @@ under the License.
<compilerArgs combine.children="append">
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
Expand Down

0 comments on commit 48ca137

Please sign in to comment.