Skip to content

Commit

Permalink
NIFI-10543 Support broker failover in MQTT processors (apache#6447)
Browse files Browse the repository at this point in the history
NIFI-10543 Support broker failover in MQTT processors

Signed-off-by: Peter Turcsanyi <[email protected]>
  • Loading branch information
nandorsoma authored Oct 6, 2022
1 parent a721e03 commit 2405b0e
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public void onUnscheduled(final ProcessContext context) {
public void onStopped(final ProcessContext context) {
if (mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
logger.info("Finishing processing leftover messages");
ProcessSession session = processSessionFactory.createSession();
final ProcessSession session = processSessionFactory.createSession();
if (context.getProperty(RECORD_READER).isSet()) {
transferQueueRecord(context, session);
} else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
Expand Down Expand Up @@ -396,17 +396,12 @@ private void initializeClient(ProcessContext context) {
// NOTE: This method is called when isConnected returns false which can happen when the client is null, or when it is
// non-null but not connected, so we need to handle each case and only create a new client when it is null
try {
if (mqttClient == null) {
mqttClient = createMqttClient();
mqttClient.setCallback(this);
}

if (!mqttClient.isConnected()) {
mqttClient.connect();
mqttClient.subscribe(topicPrefix + topicFilter, qos);
}
mqttClient = createMqttClient();
mqttClient.setCallback(this);
mqttClient.connect();
mqttClient.subscribe(topicPrefix + topicFilter, qos);
} catch (Exception e) {
logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{clientProperties.getBroker()}, e);
logger.error("Connection failed to {}. Yielding processor", clientProperties.getRawBrokerUris(), e);
mqttClient = null; // prevent stucked processor when subscribe fails
context.yield();
}
Expand All @@ -430,7 +425,7 @@ private void transferQueueDemarcator(final ProcessContext context, final Process
final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);

FlowFile messageFlowfile = session.create();
session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, clientProperties.getRawBrokerUris());

messageFlowfile = session.append(messageFlowfile, out -> {
int i = 0;
Expand Down Expand Up @@ -461,7 +456,7 @@ private FlowFile createFlowFileAndPopulateAttributes(ProcessSession session, Rec
FlowFile messageFlowfile = session.create();

final Map<String, String> attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getRawBrokerUris());
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
Expand All @@ -476,7 +471,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);

final FlowFile flowFile = session.create();
session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, clientProperties.getRawBrokerUris());

final Map<String, String> attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger();
Expand Down Expand Up @@ -594,7 +589,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess
logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", numberOfMessages);
}

throw new ProcessException("Could not process data received from the MQTT broker(s): " + clientProperties.getBroker(), e);
throw new ProcessException("Could not process data received from the MQTT broker(s): " + clientProperties.getRawBrokerUris(), e);
} finally {
closeWriter(writer);
}
Expand All @@ -610,7 +605,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess

final int count = recordCount.get();
session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false);
getLogger().info("Successfully processed {} records for {}", count, flowFile);
logger.info("Successfully processed {} records for {}", count, flowFile);
}

private void closeWriter(final RecordSetWriter writer) {
Expand All @@ -624,8 +619,7 @@ private void closeWriter(final RecordSetWriter writer) {
}

private String getTransitUri(String... appends) {
String broker = clientProperties.getBrokerUri().toString();
StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ? broker : broker + "/");
final StringBuilder stringBuilder = new StringBuilder(clientProperties.getProvenanceFormattedBrokerUris()).append("/");
for (String append : appends) {
stringBuilder.append(append);
}
Expand All @@ -634,14 +628,14 @@ private String getTransitUri(String... appends) {

@Override
public void connectionLost(Throwable cause) {
logger.error("Connection to {} lost", clientProperties.getBroker(), cause);
logger.error("Connection to {} lost", clientProperties.getRawBrokerUris(), cause);
}

@Override
public void messageArrived(ReceivedMqttMessage message) {
if (logger.isDebugEnabled()) {
byte[] payload = message.getPayload();
String text = new String(payload, StandardCharsets.UTF_8);
final String text = new String(payload, StandardCharsets.UTF_8);
if (StringUtils.isAsciiPrintable(text)) {
logger.debug("Message arrived from topic {}. Payload: {}", message.getTopic(), text);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private void processRecordSet(ProcessContext context, ProcessSession session, fi
provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, processedRecords.get());
}

session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), provenanceEventDetails, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.getProvenanceReporter().send(flowfile, clientProperties.getRawBrokerUris(), provenanceEventDetails, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(successFlowFile, REL_SUCCESS);
} catch (Exception e) {
logger.error("An error happened during publishing records. Routing to failure.", e);
Expand All @@ -273,7 +273,7 @@ private void processRecordSet(ProcessContext context, ProcessSession session, fi
if (processedRecords.get() > 0) {
session.getProvenanceReporter().send(
failedFlowFile,
clientProperties.getBroker(),
clientProperties.getRawBrokerUris(),
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords.get()),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
Expand All @@ -289,7 +289,7 @@ private void processStandardFlowFile(ProcessContext context, ProcessSession sess

final StopWatch stopWatch = new StopWatch(true);
publishMessage(context, flowfile, topic, messageContent);
session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.getProvenanceReporter().send(flowfile, clientProperties.getRawBrokerUris(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowfile, REL_SUCCESS);
} catch (Exception e) {
logger.error("An error happened during publishing a message. Routing to failure.", e);
Expand All @@ -309,23 +309,18 @@ private void initializeClient(ProcessContext context) {
// NOTE: This method is called when isConnected returns false which can happen when the client is null, or when it is
// non-null but not connected, so we need to handle each case and only create a new client when it is null
try {
if (mqttClient == null) {
mqttClient = createMqttClient();
mqttClient.setCallback(this);
}

if (!mqttClient.isConnected()) {
mqttClient.connect();
}
mqttClient = createMqttClient();
mqttClient.setCallback(this);
mqttClient.connect();
} catch (Exception e) {
logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", clientProperties.getBroker(), e);
logger.error("Connection failed to {}. Yielding processor", clientProperties.getRawBrokerUris(), e);
context.yield();
}
}

@Override
public void connectionLost(Throwable cause) {
logger.error("Connection to {} lost", clientProperties.getBroker(), cause);
logger.error("Connection to {} lost", clientProperties.getRawBrokerUris(), cause);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.MqttProtocolScheme;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.TlsException;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand All @@ -50,8 +52,8 @@ public class HiveMqV5ClientAdapter implements MqttClient {

private MqttCallback callback;

public HiveMqV5ClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
this.mqtt5BlockingClient = createClient(clientProperties, logger);
public HiveMqV5ClientAdapter(URI brokerUri, MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
this.mqtt5BlockingClient = createClient(brokerUri, clientProperties, logger);
this.clientProperties = clientProperties;
this.logger = logger;
}
Expand Down Expand Up @@ -143,7 +145,7 @@ public void subscribe(String topicFilter, int qos) {
// Setting "listener" callback is only possible with async client, though sending subscribe message
// should happen in a blocking way to make sure the processor is blocked until ack is not arrived.
try {
Mqtt5SubAck ack = futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS);
final Mqtt5SubAck ack = futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS);
logger.debug("Received mqtt5 subscribe ack: {}", ack);
} catch (Exception e) {
throw new MqttException("An error has occurred during sending subscribe message to broker", e);
Expand All @@ -155,24 +157,25 @@ public void setCallback(MqttCallback callback) {
this.callback = callback;
}

private static Mqtt5BlockingClient createClient(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
private static Mqtt5BlockingClient createClient(URI brokerUri, MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
logger.debug("Creating Mqtt v5 client");

Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder()
final Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder()
.identifier(clientProperties.getClientId())
.serverHost(clientProperties.getBrokerUri().getHost());
.serverHost(brokerUri.getHost());

int port = clientProperties.getBrokerUri().getPort();
final int port = brokerUri.getPort();
if (port != -1) {
mqtt5ClientBuilder.serverPort(port);
}

final MqttProtocolScheme scheme = MqttProtocolScheme.valueOf(brokerUri.getScheme().toUpperCase());
// default is tcp
if (WS.equals(clientProperties.getScheme()) || WSS.equals(clientProperties.getScheme())) {
if (WS.equals(scheme) || WSS.equals(scheme)) {
mqtt5ClientBuilder.webSocketConfig().applyWebSocketConfig();
}

if (SSL.equals(clientProperties.getScheme())) {
if (SSL.equals(scheme) || WSS.equals(scheme)) {
if (clientProperties.getTlsConfiguration().getTruststorePath() != null) {
mqtt5ClientBuilder
.sslConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.net.URI;
import java.util.Properties;

public class PahoMqttClientAdapter implements MqttClient {
Expand All @@ -40,8 +41,8 @@ public class PahoMqttClientAdapter implements MqttClient {
private final MqttClientProperties clientProperties;
private final ComponentLog logger;

public PahoMqttClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) {
this.client = createClient(clientProperties, logger);
public PahoMqttClientAdapter(URI brokerUri, MqttClientProperties clientProperties, ComponentLog logger) {
this.client = createClient(brokerUri, clientProperties, logger);
this.clientProperties = clientProperties;
this.logger = logger;
}
Expand Down Expand Up @@ -178,11 +179,11 @@ public static Properties transformSSLContextService(TlsConfiguration tlsConfigur
return properties;
}

private static org.eclipse.paho.client.mqttv3.MqttClient createClient(MqttClientProperties clientProperties, ComponentLog logger) {
private static org.eclipse.paho.client.mqttv3.MqttClient createClient(URI brokerUri, MqttClientProperties clientProperties, ComponentLog logger) {
logger.debug("Creating Mqtt v3 client");

try {
return new org.eclipse.paho.client.mqttv3.MqttClient(clientProperties.getBroker(), clientProperties.getClientId(), new MemoryPersistence());
return new org.eclipse.paho.client.mqttv3.MqttClient(brokerUri.toString(), clientProperties.getClientId(), new MemoryPersistence());
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
throw new MqttException("An error has occurred during creating adapter for MQTT v3 client", e);
}
Expand Down
Loading

0 comments on commit 2405b0e

Please sign in to comment.