Skip to content

Commit

Permalink
Add some additional tests for dynamic sender / receiver links
Browse files Browse the repository at this point in the history
  • Loading branch information
tabish121 committed Jun 16, 2016
1 parent 9ac5f83 commit 27d9555
Showing 1 changed file with 107 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
Expand Down Expand Up @@ -248,6 +250,111 @@ protected void doTestDynamicReceiverLifetimeBoundToLinkQueue(boolean topic) thro
connection.close();
}

@Test(timeout = 60000)
public void TestCreateDynamicQueueSenderAndPublish() throws Exception {
doTestCreateDynamicSenderAndPublish(false);
}

@Test(timeout = 60000)
public void TestCreateDynamicTopicSenderAndPublish() throws Exception {
doTestCreateDynamicSenderAndPublish(true);
}

protected void doTestCreateDynamicSenderAndPublish(boolean topic) throws Exception {
Target target = createDynamicTarget(topic);

final BrokerViewMBean brokerView = getProxyToBroker();

AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();

AmqpSender sender = session.createSender(target);
assertNotNull(sender);

if (topic) {
assertEquals(1, brokerView.getTemporaryTopics().length);
} else {
assertEquals(1, brokerView.getTemporaryQueues().length);
}

// Get the new address
String address = sender.getSender().getRemoteTarget().getAddress();
LOG.info("New dynamic sender address -> {}", address);

// Create a message and send to a receive that is listening on the newly
// created dynamic link address.
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg-1");
message.setText("Test-Message");

AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(1);

sender.send(message);

AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received);
received.accept();

receiver.close();
sender.close();

connection.close();
}

@Test(timeout = 60000)
public void testCreateDynamicReceiverToTopicAndSend() throws Exception {
doTestCreateDynamicSender(true);
}

@Test(timeout = 60000)
public void testCreateDynamicReceiverToQueueAndSend() throws Exception {
doTestCreateDynamicSender(false);
}

protected void doTestCreateDynamicReceiverAndSend(boolean topic) throws Exception {
Source source = createDynamicSource(topic);

final BrokerViewMBean brokerView = getProxyToBroker();

AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();

AmqpReceiver receiver = session.createReceiver(source);
assertNotNull(receiver);

if (topic) {
assertEquals(1, brokerView.getTemporaryTopics().length);
} else {
assertEquals(1, brokerView.getTemporaryQueues().length);
}

// Get the new address
String address = receiver.getReceiver().getRemoteSource().getAddress();
LOG.info("New dynamic receiver address -> {}", address);

// Create a message and send to a receive that is listening on the newly
// created dynamic link address.
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg-1");
message.setText("Test-Message");

AmqpSender sender = session.createSender(address);
sender.send(message);

receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received);
received.accept();

sender.close();
receiver.close();

connection.close();
}

protected Source createDynamicSource(boolean topic) {

Source source = new Source();
Expand Down

0 comments on commit 27d9555

Please sign in to comment.