Skip to content

Commit

Permalink
NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attrib…
Browse files Browse the repository at this point in the history
…utes in ConsumeJMS

Remove unused assertEquals import

Move destination from default to send/receive to support EL better
  • Loading branch information
jfrazee authored and olegz committed Sep 20, 2016
1 parent feaa4c9 commit c238676
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ private void buildTargetResource(ProcessContext context) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
jmsTemplate.setDefaultDestinationName(this.destinationName);
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));

// set of properties that may be good candidates for exposure via configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
*/
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException {
final JMSResponse response = this.targetResource.consume();
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
final JMSResponse response = this.targetResource.consume(destinationName);
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
Expand All @@ -88,7 +89,9 @@ public void process(final OutputStream out) throws IOException {
}
});
Map<String, Object> jmsHeaders = response.getMessageHeaders();
flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession);
Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, flowFile, processSession);
flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
processSession.transfer(flowFile, REL_SUCCESS);
} else {
Expand All @@ -115,10 +118,10 @@ public Set<Relationship> getRelationships() {
/**
*
*/
private FlowFile updateFlowFileAttributesWithJmsHeaders(Map<String, Object> jmsHeaders, FlowFile flowFile, ProcessSession processSession) {
private FlowFile updateFlowFileAttributesWithMap(Map<String, Object> map, FlowFile flowFile, ProcessSession processSession) {
Map<String, String> attributes = new HashMap<String, String>();
for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) {
attributes.put(headersEntry.getKey(), String.valueOf(headersEntry.getValue()));
for (Entry<String, Object> entry : map.entrySet()) {
attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
}
attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName);
flowFile = processSession.putAllAttributes(flowFile, attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ final class JMSConsumer extends JMSWorker {
/**
*
*/
public JMSResponse consume() {
Message message = this.jmsTemplate.receive();
public JMSResponse consume(final String destinationName) {
Message message = this.jmsTemplate.receive(destinationName);
if (message != null) {
byte[] messageBody = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ final class JMSPublisher extends JMSWorker {
*
* @param messageBytes byte array representing contents of the message
*/
void publish(byte[] messageBytes) {
this.publish(messageBytes, null);
void publish(final String destinationName, byte[] messageBytes) {
this.publish(destinationName, messageBytes, null);
}

/**
Expand All @@ -74,16 +74,16 @@ void publish(byte[] messageBytes) {
* @param flowFileAttributes
* Map representing {@link FlowFile} attributes.
*/
void publish(final byte[] messageBytes, final Map<String, String> flowFileAttributes) {
this.jmsTemplate.send(new MessageCreator() {
void publish(final String destinationName, final byte[] messageBytes, final Map<String, String> flowFileAttributes) {
this.jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageBytes);
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
// set message headers and properties
for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-")) {// '-' is illegal char in JMS prop names
if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' are illegal char in JMS prop names
message.setStringProperty(entry.getKey(), entry.getValue());
} else if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ protected void rendezvousWithJms(ProcessContext context, ProcessSession processS
FlowFile flowFile = processSession.get();
if (flowFile != null) {
try {
this.targetResource.publish(this.extractMessageBody(flowFile, processSession),
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession),
flowFile.getAttributes());
processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ public void validateServiceIsLocatableViaServiceLoader() {
assertTrue(consumeJmsPresent);
}

static JmsTemplate buildJmsTemplateForDestination(String destinationName, boolean pubSub) {
static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
CachingConnectionFactory cf = new CachingConnectionFactory(connectionFactory);

JmsTemplate jmsTemplate = new JmsTemplate(cf);
jmsTemplate.setDefaultDestinationName(destinationName);
jmsTemplate.setPubSubDomain(pubSub);
return jmsTemplate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.nifi.jms.processors;

import java.util.HashMap;
import java.util.Map;

import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockFlowFile;
Expand All @@ -26,7 +29,6 @@
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -35,9 +37,13 @@ public class ConsumeJMSTest {

@Test
public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("cooQueue", false);
final String destinationName = "cooQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher sender = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
sender.publish("Hey dude!".getBytes());
final Map<String, String> senderAttributes = new HashMap<>();
senderAttributes.put("filename", "message.txt");
senderAttributes.put("attribute_from_sender", "some value");
sender.publish(destinationName, "Hey dude!".getBytes(), senderAttributes);
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
Expand All @@ -46,13 +52,18 @@ public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
runner.enableControllerService(cs);

runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(ConsumeJMS.DESTINATION, "cooQueue");
runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
runner.run(1, false);
//
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
assertEquals("cooQueue", successFF.getAttributes().get(JmsHeaders.DESTINATION));
successFF.assertAttributeExists(JmsHeaders.DESTINATION);
successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName);
successFF.assertAttributeExists("filename");
successFF.assertAttributeEquals("filename", "message.txt");
successFF.assertAttributeExists("attribute_from_sender");
successFF.assertAttributeEquals("attribute_from_sender", "some value");
successFF.assertContentEquals("Hey dude!".getBytes());
String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
assertNotNull(sourceDestination);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ public class JMSPublisherConsumerTest {

@Test
public void validateByesConvertedToBytesMessageOnSend() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false);
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
publisher.publish("hellomq".getBytes());
publisher.publish(destinationName, "hellomq".getBytes());

Message receivedMessage = jmsTemplate.receive();
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
byte[] bytes = new byte[7];
((BytesMessage) receivedMessage).readBytes(bytes);
Expand All @@ -58,15 +59,16 @@ public void validateByesConvertedToBytesMessageOnSend() throws Exception {

@Test
public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false);
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("foo", "foo");
flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
publisher.publish("hellomq".getBytes(), flowFileAttributes);
publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes);

Message receivedMessage = jmsTemplate.receive();
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
assertEquals("foo", receivedMessage.getStringProperty("foo"));
assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
Expand All @@ -83,9 +85,10 @@ public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() thro
*/
@Test(expected = IllegalStateException.class)
public void validateFailOnUnsupportedMessageType() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false);
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

jmsTemplate.send(new MessageCreator() {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage();
Expand All @@ -94,17 +97,18 @@ public Message createMessage(Session session) throws JMSException {

JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
try {
consumer.consume();
consumer.consume(destinationName);
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}

@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false);
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

jmsTemplate.send(new MessageCreator() {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello from the other side");
Expand All @@ -116,9 +120,7 @@ public Message createMessage(Session session) throws JMSException {
});

JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
assertEquals("JMSConsumer[destination:testQueue; pub-sub:false;]", consumer.toString());

JMSResponse response = consumer.consume();
JMSResponse response = consumer.consume(destinationName);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class PublishJMSTest {
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

final String destinationName = "fooQueue";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
Expand All @@ -53,7 +54,43 @@ public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
runner.enableControllerService(cs);

runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, "fooQueue");
runner.setProperty(PublishJMS.DESTINATION, destinationName);

Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);

final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);

JmsTemplate jmst = new JmsTemplate(cf);
BytesMessage message = (BytesMessage) jmst.receive(destinationName);

byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
}

@Test
public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

final String destinationNameExpression = "${foo}Queue";
final String destinationName = "fooQueue";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);

runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);

runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression);

Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
Expand All @@ -65,8 +102,7 @@ public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
assertNotNull(successFF);

JmsTemplate jmst = new JmsTemplate(cf);
jmst.setDefaultDestinationName("fooQueue");
BytesMessage message = (BytesMessage) jmst.receive();
BytesMessage message = (BytesMessage) jmst.receive(destinationName);

byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));
Expand Down

0 comments on commit c238676

Please sign in to comment.