diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 5288dc90e3a..a05ba8c3ca9 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -103,6 +103,7 @@ import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.MarshallingSupport; +import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter; import org.apache.activemq.util.SubscriptionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java index 10df1f92e60..3c0b85b7179 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -46,7 +46,7 @@ public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBr @Override protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { - if (addToAlreadyInterestedConsumers(info)) { + if (addToAlreadyInterestedConsumers(info, false)) { return null; // don't want this subscription added } //add our original id to ourselves @@ -55,7 +55,7 @@ protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws return doCreateDemandSubscription(info); } - protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) { + protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) { // search through existing subscriptions and see if we have a match if (info.isNetworkSubscription()) { return false; @@ -71,6 +71,10 @@ protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) { // add the interest in the subscription if (!info.isDurable()) { ds.add(info.getConsumerId()); + if (isForcedDurable) { + forcedDurableRemoteId.add(info.getConsumerId()); + ds.addForcedDurableConsumer(info.getConsumerId()); + } } else { ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 865e60e012d..8a3a56a59c6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -21,9 +21,11 @@ import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -99,6 +101,7 @@ import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,6 +134,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected ActiveMQDestination[] durableDestinations; protected final ConcurrentMap subscriptionMapByLocalId = new ConcurrentHashMap<>(); protected final ConcurrentMap subscriptionMapByRemoteId = new ConcurrentHashMap<>(); + protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap()); protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; protected final CountDownLatch startedLatch = new CountDownLatch(2); protected final CountDownLatch localStartedLatch = new CountDownLatch(1); @@ -549,6 +553,21 @@ protected void startRemoteBridge() throws Exception { // set our properties Properties props = new Properties(); IntrospectionSupport.getProperties(configuration, props, null); + + String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations"; + String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations"; + + if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) { + props.put(dynamicallyIncludedDestinationsKey, + StringToListOfActiveMQDestinationConverter. + convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true)); + } + if (!configuration.getStaticallyIncludedDestinations().isEmpty()) { + props.put(staticallyIncludedDestinationsKey, + StringToListOfActiveMQDestinationConverter. + convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true)); + } + props.remove("networkTTL"); String str = MarshallingSupport.propertiesToString(props); brokerInfo.setNetworkProperties(str); @@ -858,6 +877,17 @@ public void run() { } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); removeDemandSubscription(id); + + if (forcedDurableRemoteId.remove(id)) { + for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { + DemandSubscription ds = i.next(); + boolean removed = ds.removeForcedDurableConsumer(id); + if (removed) { + cleanupDurableSub(ds, i); + } + } + } + } else if (data.getClass() == RemoveSubscriptionInfo.class) { RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); @@ -873,7 +903,8 @@ public void run() { private void cleanupDurableSub(final DemandSubscription ds, Iterator i) throws IOException { - if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()) { + if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty() + && ds.getForcedDurableConsumersSize() == 0) { // deactivate subscriber RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); @@ -1196,7 +1227,14 @@ protected boolean isPermissableDestination(ActiveMQDestination destination, bool dests = dynamicallyIncludedDestinations; if (dests != null && dests.length > 0) { - return matchesDynamicallyIncludedDestinations(destination); + for (ActiveMQDestination dest : dests) { + DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); + if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { + return true; + } + } + + return false; } return true; @@ -1216,6 +1254,19 @@ private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination desti return false; } + protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) { + if (dests != null && dests.length > 0) { + for (ActiveMQDestination dest : dests) { + DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); + if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { + return dest; + } + } + } + + return null; + } + /** * Subscriptions for these destinations are always created */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java index b53646ae7c6..371df0ae49f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -41,6 +41,7 @@ public class DemandSubscription { private final AtomicInteger dispatched = new AtomicInteger(0); private final AtomicBoolean activeWaiter = new AtomicBoolean(); private final Set durableRemoteSubs = new CopyOnWriteArraySet(); + private final Set forcedDurableConsumers = new CopyOnWriteArraySet(); private SubscriptionInfo localDurableSubscriber; private NetworkBridgeFilter networkBridgeFilter; @@ -106,6 +107,18 @@ public ConsumerInfo getRemoteInfo() { return remoteInfo; } + public boolean addForcedDurableConsumer(ConsumerId id) { + return forcedDurableConsumers.add(id); + } + + public boolean removeForcedDurableConsumer(ConsumerId id) { + return forcedDurableConsumers.remove(id); + } + + public int getForcedDurableConsumersSize() { + return forcedDurableConsumers.size(); + } + public void waitForCompletion() { if (dispatched.get() > 0) { LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get()); diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index 237e2726b91..e699272c13c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -17,7 +17,9 @@ package org.apache.activemq.network; import java.io.IOException; +import java.util.Map; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; @@ -26,6 +28,7 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.TypeConversionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,13 +95,16 @@ protected void setupStaticDestinations() { @Override protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { - if (addToAlreadyInterestedConsumers(info)) { + boolean isForcedDurable = isForcedDurable(info); + + if (addToAlreadyInterestedConsumers(info, isForcedDurable)) { return null; // don't want this subscription added } //add our original id to ourselves info.addNetworkConsumerId(info.getConsumerId()); + ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null; - if (info.isDurable()) { + if(info.isDurable() || isForcedDurable) { // set the subscriber name to something reproducible info.setSubscriptionName(getSubscriberName(info.getDestination())); // and override the consumerId with something unique so that it won't @@ -107,7 +113,46 @@ protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws consumerIdGenerator.getNextSequenceId())); } info.setSelector(null); - return doCreateDemandSubscription(info); + DemandSubscription demandSubscription = doCreateDemandSubscription(info); + if (forcedDurableId != null) { + demandSubscription.addForcedDurableConsumer(forcedDurableId); + forcedDurableRemoteId.add(forcedDurableId); + } + return demandSubscription; + } + + + private boolean isForcedDurable(ConsumerInfo info) { + if (info.isDurable()) { + return false; + } + + ActiveMQDestination destination = info.getDestination(); + if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() || + destination.isQueue()) { + return false; + } + + ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination); + if (matching != null) { + return isDestForcedDurable(matching); + } + matching = findMatchingDestination(staticallyIncludedDestinations, destination); + if (matching != null) { + return isDestForcedDurable(matching); + } + return false; + } + + private boolean isDestForcedDurable(ActiveMQDestination destination) { + final Map options = destination.getOptions(); + + boolean isForceDurable = false; + if (options != null) { + isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class); + } + + return isForceDurable; } protected String getSubscriberName(ActiveMQDestination dest) { diff --git a/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java b/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java index 7f58f207840..130afa78b64 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/StringToListOfActiveMQDestinationConverter.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.util; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; @@ -59,6 +61,10 @@ public static List convertToActiveMQDestination(Object valu } public static String convertFromActiveMQDestination(Object value) { + return convertFromActiveMQDestination(value, false); + } + + public static String convertFromActiveMQDestination(Object value, boolean includeOptions) { if (value == null) { return null; } @@ -70,7 +76,17 @@ public static String convertFromActiveMQDestination(Object value) { Object e = list.get(i); if (e instanceof ActiveMQDestination) { ActiveMQDestination destination = (ActiveMQDestination) e; - sb.append(destination); + if (includeOptions && destination.getOptions() != null) { + try { + //Reapply the options as URI parameters + sb.append(destination.toString() + URISupport.applyParameters( + new URI(""), destination.getOptions())); + } catch (URISyntaxException e1) { + sb.append(destination); + } + } else { + sb.append(destination); + } if (i < list.size() - 1) { sb.append(", "); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 1390d05fd5d..3c4a2a06887 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.network; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.net.URI; import java.util.Arrays; @@ -30,12 +28,10 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.CommandTypes; -import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait.Condition; import org.junit.After; @@ -98,12 +94,12 @@ public void testRemoveSubscriptionPropagate() throws Exception { sub1.close(); assertSubscriptionsCount(broker1, topic, 1); - assertNCSubscriptionsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); removeSubscription(broker1, topic, subName); assertSubscriptionsCount(broker1, topic, 0); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); } @@ -114,17 +110,17 @@ public void testRemoveSubscriptionPropegateAfterRestart() throws Exception { sub1.close(); assertSubscriptionsCount(broker1, topic, 1); - assertNCSubscriptionsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); restartBrokers(true); assertSubscriptionsCount(broker1, topic, 1); - assertNCSubscriptionsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); removeSubscription(broker1, topic, subName); assertSubscriptionsCount(broker1, topic, 0); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); } @@ -135,7 +131,7 @@ public void testRemoveSubscriptionWithBridgeOffline() throws Exception { sub1.close(); assertSubscriptionsCount(broker1, topic, 1); - assertNCSubscriptionsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); doTearDown(); restartBroker(broker1, false); @@ -146,9 +142,9 @@ public void testRemoveSubscriptionWithBridgeOffline() throws Exception { //Test that on successful reconnection of the bridge that //the NC sub will be removed restartBroker(broker2, true); - assertNCSubscriptionsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); restartBroker(broker1, true); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); } @@ -160,7 +156,7 @@ public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Ex sub1.close(); assertSubscriptionsCount(broker1, topic, 1); - assertNCSubscriptionsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); doTearDown(); restartBroker(broker1, false); @@ -176,13 +172,13 @@ public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Ex //before sync, the old NC should exist restartBroker(broker2, true); - assertNCSubscriptionsCount(broker2, topic, 1); - assertNCSubscriptionsCount(broker2, topic2, 0); + assertNCDurableSubsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic2, 0); //After sync, remove old NC and create one for topic 2 restartBroker(broker1, true); - assertNCSubscriptionsCount(broker2, topic, 0); - assertNCSubscriptionsCount(broker2, topic2, 1); + assertNCDurableSubsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic2, 1); } @@ -193,7 +189,7 @@ public void testAddSubscriptionsWithBridgeOffline() throws Exception { final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName); assertSubscriptionsCount(broker1, topic, 0); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); doTearDown(); restartBroker(broker1, false); @@ -207,9 +203,9 @@ public void testAddSubscriptionsWithBridgeOffline() throws Exception { assertSubscriptionsCount(broker1, topic2, 1); restartBrokers(true); - assertNCSubscriptionsCount(broker2, topic, 1); - assertNCSubscriptionsCount(broker2, topic2, 1); - assertNCSubscriptionsCount(broker2, excludeTopic, 0); + assertNCDurableSubsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic2, 1); + assertNCDurableSubsCount(broker2, excludeTopic, 0); } @@ -223,7 +219,7 @@ public void testAddSubscriptionsWithBridgeOfflineOpenWire11() throws Exception { final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); assertSubscriptionsCount(broker1, topic, 0); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); doTearDown(); restartBroker(broker1, false); @@ -235,7 +231,7 @@ public void testAddSubscriptionsWithBridgeOfflineOpenWire11() throws Exception { //Since we are using an old version of openwire, the NC should //not be added restartBrokers(true); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); } @@ -246,7 +242,7 @@ public void testAddOfflineSubscriptionWithBridgeOfflineDynamicTrue() throws Exce final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); assertSubscriptionsCount(broker1, topic, 0); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); doTearDown(); restartBroker(broker1, false); @@ -256,7 +252,7 @@ public void testAddOfflineSubscriptionWithBridgeOfflineDynamicTrue() throws Exce assertSubscriptionsCount(broker1, topic, 1); restartBrokers(true); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); } @Test @@ -266,7 +262,7 @@ public void testAddOnlineSubscriptionWithBridgeOfflineDynamicTrue() throws Excep final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); assertSubscriptionsCount(broker1, topic, 0); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); doTearDown(); restartBroker(broker1, false); @@ -276,10 +272,10 @@ public void testAddOnlineSubscriptionWithBridgeOfflineDynamicTrue() throws Excep assertSubscriptionsCount(broker1, topic, 1); restartBrokers(true); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); //bring online again session1.createDurableSubscriber(topic, subName); - assertNCSubscriptionsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); } @@ -290,7 +286,7 @@ public void testAddAndRemoveSubscriptionsWithBridgeOffline() throws Exception { session1.createDurableSubscriber(topic, subName).close(); assertSubscriptionsCount(broker1, topic, 1); - assertNCSubscriptionsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); doTearDown(); restartBroker(broker1, false); @@ -301,8 +297,8 @@ public void testAddAndRemoveSubscriptionsWithBridgeOffline() throws Exception { assertSubscriptionsCount(broker1, topic, 1); restartBrokers(true); - assertNCSubscriptionsCount(broker2, topic, 1); - assertNCSubscriptionsCount(broker2, excludeTopic, 0); + assertNCDurableSubsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, excludeTopic, 0); } @@ -314,7 +310,7 @@ public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception { final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName); assertSubscriptionsCount(broker1, topic, 0); - assertNCSubscriptionsCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); doTearDown(); restartBrokers(false); @@ -342,32 +338,9 @@ public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception { session1.createDurableSubscriber(excludeTopic, "sub-exclude"); Thread.sleep(1000); - assertNCSubscriptionsCount(broker2, topic, 1); - assertNCSubscriptionsCount(broker2, excludeTopic, 0); - - } - - protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic, - final String subName) throws Exception { - final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); - info.setClientId(clientId); - info.setSubscriptionName(subName); + assertNCDurableSubsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, excludeTopic, 0); - final ConnectionContext context = new ConnectionContext(); - context.setBroker(brokerService.getBroker()); - context.setClientId(clientId); - - brokerService.getBroker().removeSubscription(context, info); - } - - protected void assertSubscriptionsCount(final BrokerService brokerService, - final ActiveMQTopic dest, final int count) throws Exception { - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return count == getSubscriptions(brokerService, dest).size(); - } - }, 10000, 500)); } protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { @@ -387,7 +360,7 @@ protected void restartBrokers(boolean startNetworkConnector) throws Exception { protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir, File remoteDataDir) throws Exception { included = new ActiveMQTopic(testTopicName); - doSetUpRemoteBroker(deleteAllMessages, remoteDataDir); + doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); //Give time for advisories to propagate Thread.sleep(1000); @@ -399,8 +372,13 @@ protected void restartLocalBroker(boolean startNetworkConnector) throws Exceptio } protected void restartRemoteBroker() throws Exception { + int port = 0; + if (remoteBroker != null) { + List transportConnectors = remoteBroker.getTransportConnectors(); + port = transportConnectors.get(0).getConnectUri().getPort(); + } stopRemoteBroker(); - doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile()); + doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile(), port); } protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, @@ -438,8 +416,8 @@ public boolean isSatisified() throws Exception { } } - protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception { - remoteBroker = createRemoteBroker(dataDir); + protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception { + remoteBroker = createRemoteBroker(dataDir, port); remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); remoteBroker.start(); remoteBroker.waitUntilStarted(); @@ -494,7 +472,7 @@ protected NetworkConnector configureLocalNetworkConnector() throws Exception { protected AdvisoryBroker remoteAdvisoryBroker; - protected BrokerService createRemoteBroker(File dataDir) throws Exception { + protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception { BrokerService brokerService = new BrokerService(); brokerService.setBrokerName("remoteBroker"); brokerService.setUseJmx(false); @@ -502,7 +480,7 @@ protected BrokerService createRemoteBroker(File dataDir) throws Exception { remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); - brokerService.addConnector("tcp://localhost:0?wireFormat.version=" + remoteBrokerWireFormatVersion); + brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.version=" + remoteBrokerWireFormatVersion); return brokerService; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java index 29b726aaaf0..0b388cc93db 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java @@ -30,8 +30,11 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.RemoveSubscriptionInfo; @@ -96,24 +99,24 @@ protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionConte } protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception { - Wait.waitFor(new Wait.Condition() { + assertTrue(Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { //should only be 1 for the composite destination creation return count == destinationStatistics.getConsumers().getCount(); } - }); + })); } protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception { - Wait.waitFor(new Wait.Condition() { + assertTrue(Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return count == destinationStatistics.getDequeues().getCount() && count == destinationStatistics.getDispatched().getCount() && count == destinationStatistics.getForwards().getCount(); } - }); + })); } protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) { @@ -126,16 +129,32 @@ protected interface ConsumerCreator { MessageConsumer createConsumer() throws JMSException; } - protected void assertNCSubscriptionsCount(final BrokerService brokerService, + protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest, final int count) throws Exception { assertTrue(Wait.waitFor(new Condition() { @Override public boolean isSatisified() throws Exception { - return count == getNCSubscriptions(brokerService, dest).size(); + return count == getNCDurableSubs(brokerService, dest).size(); } }, 10000, 500)); } + protected void assertConsumersCount(final BrokerService brokerService, + final ActiveMQTopic dest, final int count) throws Exception { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return count == getConsumers(brokerService, dest).size(); + } + }, 10000, 500)); + } + + protected List getConsumers(final BrokerService brokerService, + final ActiveMQTopic dest) throws Exception { + Topic destination = (Topic) brokerService.getDestination(dest); + return destination.getConsumers(); + } + protected List getSubscriptions(final BrokerService brokerService, final ActiveMQTopic dest) throws Exception { List subs = new ArrayList<>(); @@ -151,10 +170,17 @@ protected List getSubscriptions(final BrokerService br return subs; } - protected List getNCSubscriptions(final BrokerService brokerService, + protected List getNCDurableSubs(final BrokerService brokerService, final ActiveMQTopic dest) throws Exception { List subs = new ArrayList<>(); - Topic destination = (Topic) brokerService.getDestination(dest); + Destination d = brokerService.getDestination(dest); + Topic destination = null; + if (d instanceof DestinationFilter){ + destination = ((DestinationFilter) d).getAdaptor(Topic.class); + } else { + destination = (Topic) d; + } + for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) { if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) { DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key); @@ -166,4 +192,28 @@ protected List getNCSubscriptions(final BrokerService return subs; } + protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic, + final String subName) throws Exception { + final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); + info.setClientId(clientId); + info.setSubscriptionName(subName); + + final ConnectionContext context = new ConnectionContext(); + context.setBroker(brokerService.getBroker()); + context.setClientId(clientId); + + brokerService.getBroker().removeSubscription(context, info); + } + + protected void assertSubscriptionsCount(final BrokerService brokerService, + final ActiveMQTopic dest, final int count) throws Exception { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return count == getSubscriptions(brokerService, dest).size(); + } + }, 10000, 500)); + } + + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java new file mode 100644 index 00000000000..770ba6ea375 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +@RunWith(Parameterized.class) +public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(ForceDurableNetworkBridgeTest.class); + + protected String testTopicName2 = "include.nonforced.bar"; + protected String staticTopic = "include.static.bar"; + protected String staticTopic2 = "include.static.nonforced.bar"; + public static enum FLOW {FORWARD, REVERSE}; + private BrokerService broker1; + private BrokerService broker2; + private Session session1; + private final FLOW flow; + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {FLOW.FORWARD}, + {FLOW.REVERSE} + }); + } + + public ForceDurableNetworkBridgeTest(final FLOW flow) { + this.flow = flow; + } + + @Before + public void setUp() throws Exception { + doSetUp(true, tempFolder.newFolder(), tempFolder.newFolder()); + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + @Test + public void testForceDurableSubscriptionStatic() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(staticTopic); + + assertNCDurableSubsCount(broker2, topic, 1); + assertConsumersCount(broker2, topic, 1); + + //Static so consumers stick around + assertNCDurableSubsCount(broker2, topic, 1); + assertConsumersCount(broker2, topic, 1); + } + + @Test + public void testConsumerNotForceDurableSubscriptionStatic() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(staticTopic2); + + assertConsumersCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 0); + } + + @Test + public void testConsumerNotForceDurableSubscription() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2); + MessageConsumer sub1 = session1.createConsumer(topic); + + assertConsumersCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 0); + sub1.close(); + + assertNCDurableSubsCount(broker2, topic, 0); + assertConsumersCount(broker2, topic, 0); + } + + @Test + public void testConsumerNotForceDurableWithAnotherDurable() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2); + TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName); + session1.createConsumer(topic); + + //1 consumer because of conduit + //1 durable sub + assertConsumersCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + + //Remove the sub + durSub.close(); + Thread.sleep(1000); + removeSubscription(broker1, topic, subName); + + //The durable should be gone even though there is a consumer left + //since we are not forcing durable subs + assertNCDurableSubsCount(broker2, topic, 0); + //consumers count ends up being 0 here, even though there is a non-durable consumer left, + //because the durable sub is destroyed and it is a conduit subscription + //this is another good reason to want to enable forcing of durables + assertConsumersCount(broker2, topic, 0); + } + + @Test + public void testForceDurableSubscription() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + MessageConsumer sub1 = session1.createConsumer(topic); + + assertNCDurableSubsCount(broker2, topic, 1); + assertConsumersCount(broker2, topic, 1); + sub1.close(); + + assertNCDurableSubsCount(broker2, topic, 0); + assertConsumersCount(broker2, topic, 0); + } + + @Test + public void testForceDurableMultiSubscriptions() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + MessageConsumer sub1 = session1.createConsumer(topic); + MessageConsumer sub2 = session1.createConsumer(topic); + MessageConsumer sub3 = session1.createConsumer(topic); + + assertNCDurableSubsCount(broker2, topic, 1); + assertConsumersCount(broker2, topic, 1); + sub1.close(); + sub2.close(); + + assertNCDurableSubsCount(broker2, topic, 1); + assertConsumersCount(broker2, topic, 1); + + sub3.close(); + + assertNCDurableSubsCount(broker2, topic, 0); + assertConsumersCount(broker2, topic, 0); + } + + @Test + public void testForceDurableSubWithDurableCreatedFirst() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName); + durSub.close(); + assertNCDurableSubsCount(broker2, topic, 1); + + MessageConsumer sub1 = session1.createConsumer(topic); + Thread.sleep(1000); + assertNCDurableSubsCount(broker2, topic, 1); + sub1.close(); + + Thread.sleep(1000); + assertNCDurableSubsCount(broker2, topic, 1); + + removeSubscription(broker1, topic, subName); + assertNCDurableSubsCount(broker2, topic, 0); + } + + @Test + public void testForceDurableSubWithNonDurableCreatedFirst() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + MessageConsumer sub1 = session1.createConsumer(topic); + assertNCDurableSubsCount(broker2, topic, 1); + + TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName); + durSub.close(); + Thread.sleep(1000); + assertNCDurableSubsCount(broker2, topic, 1); + + removeSubscription(broker1, topic, subName); + Thread.sleep(1000); + assertConsumersCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + + sub1.close(); + assertNCDurableSubsCount(broker2, topic, 0); + } + + @Test + public void testDurableSticksAroundOnConsumerClose() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + //Create the normal consumer first + MessageConsumer sub1 = session1.createConsumer(topic); + assertNCDurableSubsCount(broker2, topic, 1); + + TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName); + durSub.close(); + sub1.close(); + Thread.sleep(1000); + //Both consumer and durable are closed but the durable should stick around + assertConsumersCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + + removeSubscription(broker1, topic, subName); + assertConsumersCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); + } + + protected void restartBrokers() throws Exception { + doTearDown(); + doSetUp(false, localBroker.getDataDirectoryFile(), remoteBroker.getDataDirectoryFile()); + } + + protected void doSetUp(boolean deleteAllMessages, File localDataDir, + File remoteDataDir) throws Exception { + included = new ActiveMQTopic(testTopicName); + doSetUpRemoteBroker(deleteAllMessages, remoteDataDir); + doSetUpLocalBroker(deleteAllMessages, localDataDir); + //Give time for advisories to propagate + Thread.sleep(1000); + } + + protected void doSetUpLocalBroker(boolean deleteAllMessages, File dataDir) throws Exception { + localBroker = createLocalBroker(dataDir); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + URI localURI = localBroker.getVmConnectorURI(); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + fac.setAlwaysSyncSend(true); + fac.setDispatchAsync(false); + localConnection = fac.createConnection(); + localConnection.setClientID("clientId"); + localConnection.start(); + + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1; + } + }, 10000, 500); + + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (flow.equals(FLOW.FORWARD)) { + broker1 = localBroker; + session1 = localSession; + } else { + broker2 = localBroker; + } + } + + protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception { + remoteBroker = createRemoteBroker(dataDir); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + URI remoteURI = remoteBroker.getVmConnectorURI(); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI); + remoteConnection = fac.createConnection(); + remoteConnection.setClientID("clientId"); + remoteConnection.start(); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (flow.equals(FLOW.FORWARD)) { + broker2 = remoteBroker; + } else { + broker1 = remoteBroker; + session1 = remoteSession; + } + } + + protected BrokerService createLocalBroker(File dataDir) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setMonitorConnectionSplits(true); + brokerService.setDataDirectoryFile(dataDir); + brokerService.setBrokerName("localBroker"); + brokerService.addNetworkConnector(configureLocalNetworkConnector()); + brokerService.addConnector("tcp://localhost:0"); + brokerService.setDestinations(new ActiveMQDestination[] { + new ActiveMQTopic(testTopicName), + new ActiveMQTopic(testTopicName2), + new ActiveMQTopic(excludeTopicName)}); + + return brokerService; + } + + protected NetworkConnector configureLocalNetworkConnector() throws Exception { + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)); + connector.setName("networkConnector"); + connector.setDynamicOnly(false); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setDuplex(true); + connector.setStaticBridge(false); + connector.setStaticallyIncludedDestinations(Lists.newArrayList( + new ActiveMQTopic(staticTopic + "?forceDurable=true"), + new ActiveMQTopic(staticTopic2))); + connector.setDynamicallyIncludedDestinations( + Lists.newArrayList( + new ActiveMQTopic("include.test.>?forceDurable=true"), + new ActiveMQTopic(testTopicName2))); + connector.setExcludedDestinations( + Lists.newArrayList(new ActiveMQTopic(excludeTopicName))); + return connector; + } + + + protected BrokerService createRemoteBroker(File dataDir) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("remoteBroker"); + brokerService.setUseJmx(false); + brokerService.setDataDirectoryFile(dataDir); + brokerService.addConnector("tcp://localhost:0"); + brokerService.setDestinations(new ActiveMQDestination[] { + new ActiveMQTopic(testTopicName), + new ActiveMQTopic(testTopicName2), + new ActiveMQTopic(excludeTopicName)}); + + return brokerService; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index f4749299577..782f53f36e8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -39,6 +40,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.virtual.CompositeQueue; @@ -240,9 +242,18 @@ public void testVirtualTopicWithConsumerGoOffline() throws Exception { */ @Test(timeout = 60 * 1000) public void testDynamicFlow() throws Exception { + testDynamicFlow(false); + } + + @Test(timeout = 60 * 1000) + public void testDynamicFlowForceDurable() throws Exception { + testDynamicFlow(true); + } + + protected void testDynamicFlow(boolean forceDurable) throws Exception { Assume.assumeTrue(isUseVirtualDestSubsOnCreation); - doSetUp(true, null); + doSetUp(true, null, true, forceDurable); MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); @@ -262,6 +273,7 @@ public void testDynamicFlow() throws Exception { new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); waitForConsumerCount(destinationStatistics, 1); + assertNCDurableSubsCount(localBroker, included, forceDurable ? 1 : 0); includedProducer.send(test); waitForDispatchFromLocalBroker(destinationStatistics, 1); @@ -272,7 +284,6 @@ public void testDynamicFlow() throws Exception { assertAdvisoryBrokerCounts(1,1,1); } - /** * Test that dynamic flow works for virtual destinations when a second composite * topic is included that forwards to the same queue, but is excluded from @@ -1006,7 +1017,7 @@ public void testReplay() throws Exception { CompositeTopic compositeTopic = createCompositeTopic(testTopicName, new ActiveMQQueue("include.test.bar.bridge")); - doSetUp(true, new VirtualDestination[] {compositeTopic}, false); + doSetUp(true, new VirtualDestination[] {compositeTopic}, false, false); MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); @@ -1034,7 +1045,7 @@ public void testReplayWithConsumer() throws Exception { CompositeTopic compositeTopic = createCompositeTopic(testTopicName, new ActiveMQQueue("include.test.bar.bridge")); - doSetUp(true, new VirtualDestination[] {compositeTopic}, false); + doSetUp(true, new VirtualDestination[] {compositeTopic}, false, false); MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); @@ -1291,16 +1302,16 @@ public void tearDown() throws Exception { protected void doSetUp(boolean deleteAllMessages, VirtualDestination[] remoteVirtualDests) throws Exception { - doSetUp(deleteAllMessages, remoteVirtualDests, true); + doSetUp(deleteAllMessages, remoteVirtualDests, true, false); } protected void doSetUp(boolean deleteAllMessages, - VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector) throws Exception { + VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector, boolean forceDurable) throws Exception { remoteBroker = createRemoteBroker(isUseVirtualDestSubsOnCreation, remoteVirtualDests); remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); remoteBroker.start(); remoteBroker.waitUntilStarted(); - localBroker = createLocalBroker(startNetworkConnector); + localBroker = createLocalBroker(startNetworkConnector, forceDurable); localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); localBroker.start(); localBroker.waitUntilStarted(); @@ -1324,13 +1335,16 @@ protected void doSetUp(boolean deleteAllMessages, protected NetworkConnector connector; - protected BrokerService createLocalBroker(boolean startNetworkConnector) throws Exception { + protected BrokerService createLocalBroker(boolean startNetworkConnector, boolean forceDurable) throws Exception { BrokerService brokerService = new BrokerService(); brokerService.setMonitorConnectionSplits(true); brokerService.setDataDirectoryFile(tempFolder.newFolder()); brokerService.setBrokerName("localBroker"); - connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)")); + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + connector = new DiscoveryNetworkConnector(new URI(uri)); connector.setName("networkConnector"); connector.setDynamicOnly(false); connector.setDecreaseNetworkConsumerPriority(false); @@ -1338,7 +1352,7 @@ protected BrokerService createLocalBroker(boolean startNetworkConnector) throws connector.setDuplex(isDuplex); connector.setUseVirtualDestSubs(true); connector.setDynamicallyIncludedDestinations(Lists.newArrayList(new ActiveMQQueue(testQueueName), - new ActiveMQTopic(testTopicName), new ActiveMQTopic("VirtualTopic.>"))); + new ActiveMQTopic(testTopicName + (forceDurable ? "?forceDurable=true" : "")), new ActiveMQTopic("VirtualTopic.>"))); connector.setExcludedDestinations(Lists.newArrayList(new ActiveMQQueue("exclude.test.foo"), new ActiveMQTopic("exclude.test.bar"))); @@ -1346,7 +1360,7 @@ protected BrokerService createLocalBroker(boolean startNetworkConnector) throws brokerService.addNetworkConnector(connector); } - brokerService.addConnector("tcp://localhost:61616"); + brokerService.addConnector("tcp://localhost:0"); return brokerService; } @@ -1374,7 +1388,7 @@ protected BrokerService createRemoteBroker(boolean isUsevirtualDestinationSubscr remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); - brokerService.addConnector("tcp://localhost:61617"); + brokerService.addConnector("tcp://localhost:0"); return brokerService; }