Skip to content

Commit

Permalink
CAMEL-10512-add managedOperations for connection status check for bot…
Browse files Browse the repository at this point in the history
…h consumer&producer
  • Loading branch information
onderson authored and davsclaus committed May 8, 2017
1 parent bcd4577 commit 71c6b05
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
Expand All @@ -36,6 +37,7 @@
* into the log files. Logging of PHI can be globally disabled by setting the org.apache.camel.mllp.logPHI system
* property to false.
*/
@ManagedResource(description = "Mllp Endpoint")
@UriEndpoint(firstVersion = "2.17.0", scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "hl7")
public class MllpEndpoint extends DefaultEndpoint {
public static final char START_OF_BLOCK = 0x0b; // VT (vertical tab) - decimal 11, octal 013
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.mllp.impl.Hl7Util;
import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
import org.apache.camel.component.mllp.impl.MllpSocketReader;
Expand All @@ -45,6 +47,7 @@
/**
* The MLLP producer.
*/
@ManagedResource(description = "MllpTcpClient Producer")
public class MllpTcpClientProducer extends DefaultProducer {
MllpEndpoint endpoint;

Expand Down Expand Up @@ -76,6 +79,16 @@ protected void doStop() throws Exception {
super.doStop();
}

@ManagedOperation(description = "Close client socket")
public void closeMllpSocket() {
MllpSocketUtil.close(socket, log, "JMX triggered closing socket");
}

@ManagedOperation(description = "Reset client socket")
public void resetMllpSocket() {
MllpSocketUtil.reset(socket, log, "JMX triggered resetting socket");
}

@Override
public void process(Exchange exchange) throws Exception {
log.trace("process(exchange)");
Expand Down Expand Up @@ -235,11 +248,13 @@ private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7Acknowled
}

/**
* Validate the TCP Connection
* Validate the TCP Connection, if closed opens up the socket with
* the value set via endpoint configuration
*
* @return null if the connection is valid, otherwise the Exception encountered checking the connection
* @throws IOException if the connection is not valid, otherwise the Exception is not
* encountered while checking the connection
*/
void checkConnection() throws IOException {
private void checkConnection() throws IOException {
if (null == socket || socket.isClosed() || !socket.isConnected()) {
socket = new Socket();

Expand Down Expand Up @@ -276,6 +291,19 @@ void checkConnection() throws IOException {
mllpSocketWriter = new MllpSocketWriter(socket, false);
}
}
return;
}

@ManagedOperation(description = "Check client connection")
public boolean managedCheckConnection() {
boolean isValid = true;
try {
checkConnection();
} catch (IOException ioEx) {
isValid = false;
log.debug("JMX check connection: {}", ioEx);
}
return isValid;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.mllp.impl.Hl7Util;
import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
import org.apache.camel.component.mllp.impl.MllpSocketReader;
Expand Down Expand Up @@ -74,6 +76,7 @@
/**
* The MLLP consumer.
*/
@ManagedResource(description = "MllpTcpServer Consumer")
public class MllpTcpServerConsumer extends DefaultConsumer {
public static final int SOCKET_STARTUP_TEST_WAIT = 100;
public static final int SOCKET_STARTUP_TEST_READ_TIMEOUT = 250;
Expand All @@ -95,6 +98,33 @@ public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) {
protected void doStart() throws Exception {
log.debug("doStart() - creating acceptor thread");

startMllpConsumer();

super.doStart();
}

@ManagedOperation(description = "Check server connection")
public boolean managedCheckConnection() {
boolean isValid = true;
try {
InetSocketAddress socketAddress;
if (null == endpoint.getHostname()) {
socketAddress = new InetSocketAddress(endpoint.getPort());
} else {
socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
}
Socket checkSocket = new Socket();
checkSocket.connect(socketAddress, 100);
checkSocket.close();
} catch (Exception e) {
isValid = false;
log.debug("JMX check connection: {}", e);
}
return isValid;
}

@ManagedOperation(description = "Starts serverSocket thread and waits for requests")
public void startMllpConsumer() throws IOException, InterruptedException {
ServerSocket serverSocket = new ServerSocket();
if (null != endpoint.receiveBufferSize) {
serverSocket.setReceiveBufferSize(endpoint.receiveBufferSize);
Expand Down Expand Up @@ -129,14 +159,19 @@ protected void doStart() throws Exception {

serverSocketThread = new ServerSocketThread(serverSocket);
serverSocketThread.start();

super.doStart();
}

@Override
protected void doStop() throws Exception {
log.debug("doStop()");

stopMllpConsumer();

super.doStop();
}

@ManagedOperation(description = "Stops client threads and serverSocket thread")
public void stopMllpConsumer() {
// Close any client sockets that are currently open
for (ClientSocketThread clientSocketThread: clientThreads) {
clientSocketThread.interrupt();
Expand All @@ -158,8 +193,6 @@ protected void doStop() throws Exception {
}

serverSocketThread = null;

super.doStop();
}

/**
Expand Down

0 comments on commit 71c6b05

Please sign in to comment.