Skip to content

Commit

Permalink
https://issues.apache.org/jira/browse/AMQ-6383
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/AMQ-6373

Adding a new flag that can be specified on a network bridge to allow
forcing of subscriptions to be durable.  Cleaned up some unit tests.
  • Loading branch information
cshannon committed Aug 1, 2016
1 parent 71bb54f commit e73ab34
Show file tree
Hide file tree
Showing 10 changed files with 614 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBr

@Override
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
if (addToAlreadyInterestedConsumers(info)) {
if (addToAlreadyInterestedConsumers(info, false)) {
return null; // don't want this subscription added
}
//add our original id to ourselves
Expand All @@ -55,7 +55,7 @@ protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws
return doCreateDemandSubscription(info);
}

protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) {
// search through existing subscriptions and see if we have a match
if (info.isNetworkSubscription()) {
return false;
Expand All @@ -71,6 +71,10 @@ protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
// add the interest in the subscription
if (!info.isDurable()) {
ds.add(info.getConsumerId());
if (isForcedDurable) {
forcedDurableRemoteId.add(info.getConsumerId());
ds.addForcedDurableConsumer(info.getConsumerId());
}
} else {
ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -99,6 +101,7 @@
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -131,6 +134,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -549,6 +553,21 @@ protected void startRemoteBridge() throws Exception {
// set our properties
Properties props = new Properties();
IntrospectionSupport.getProperties(configuration, props, null);

String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";

if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
props.put(dynamicallyIncludedDestinationsKey,
StringToListOfActiveMQDestinationConverter.
convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
}
if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
props.put(staticallyIncludedDestinationsKey,
StringToListOfActiveMQDestinationConverter.
convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
}

props.remove("networkTTL");
String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str);
Expand Down Expand Up @@ -858,6 +877,17 @@ public void run() {
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);

if (forcedDurableRemoteId.remove(id)) {
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.removeForcedDurableConsumer(id);
if (removed) {
cleanupDurableSub(ds, i);
}
}
}

} else if (data.getClass() == RemoveSubscriptionInfo.class) {
RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
Expand All @@ -873,7 +903,8 @@ public void run() {

private void cleanupDurableSub(final DemandSubscription ds,
Iterator<DemandSubscription> i) throws IOException {
if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()) {
if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
&& ds.getForcedDurableConsumersSize() == 0) {

// deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
Expand Down Expand Up @@ -1196,7 +1227,14 @@ protected boolean isPermissableDestination(ActiveMQDestination destination, bool

dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) {
return matchesDynamicallyIncludedDestinations(destination);
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}

return false;
}

return true;
Expand All @@ -1216,6 +1254,19 @@ private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination desti
return false;
}

protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return dest;
}
}
}

return null;
}

/**
* Subscriptions for these destinations are always created
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class DemandSubscription {
private final AtomicInteger dispatched = new AtomicInteger(0);
private final AtomicBoolean activeWaiter = new AtomicBoolean();
private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
private SubscriptionInfo localDurableSubscriber;

private NetworkBridgeFilter networkBridgeFilter;
Expand Down Expand Up @@ -106,6 +107,18 @@ public ConsumerInfo getRemoteInfo() {
return remoteInfo;
}

public boolean addForcedDurableConsumer(ConsumerId id) {
return forcedDurableConsumers.add(id);
}

public boolean removeForcedDurableConsumer(ConsumerId id) {
return forcedDurableConsumers.remove(id);
}

public int getForcedDurableConsumersSize() {
return forcedDurableConsumers.size();
}

public void waitForCompletion() {
if (dispatched.get() > 0) {
LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.activemq.network;

import java.io.IOException;
import java.util.Map;

import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
Expand All @@ -26,6 +28,7 @@
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.TypeConversionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -92,13 +95,16 @@ protected void setupStaticDestinations() {

@Override
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
if (addToAlreadyInterestedConsumers(info)) {
boolean isForcedDurable = isForcedDurable(info);

if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
return null; // don't want this subscription added
}
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null;

if (info.isDurable()) {
if(info.isDurable() || isForcedDurable) {
// set the subscriber name to something reproducible
info.setSubscriptionName(getSubscriberName(info.getDestination()));
// and override the consumerId with something unique so that it won't
Expand All @@ -107,7 +113,46 @@ protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws
consumerIdGenerator.getNextSequenceId()));
}
info.setSelector(null);
return doCreateDemandSubscription(info);
DemandSubscription demandSubscription = doCreateDemandSubscription(info);
if (forcedDurableId != null) {
demandSubscription.addForcedDurableConsumer(forcedDurableId);
forcedDurableRemoteId.add(forcedDurableId);
}
return demandSubscription;
}


private boolean isForcedDurable(ConsumerInfo info) {
if (info.isDurable()) {
return false;
}

ActiveMQDestination destination = info.getDestination();
if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
destination.isQueue()) {
return false;
}

ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
if (matching != null) {
return isDestForcedDurable(matching);
}
matching = findMatchingDestination(staticallyIncludedDestinations, destination);
if (matching != null) {
return isDestForcedDurable(matching);
}
return false;
}

private boolean isDestForcedDurable(ActiveMQDestination destination) {
final Map<String, String> options = destination.getOptions();

boolean isForceDurable = false;
if (options != null) {
isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
}

return isForceDurable;
}

protected String getSubscriberName(ActiveMQDestination dest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.activemq.util;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -59,6 +61,10 @@ public static List<ActiveMQDestination> convertToActiveMQDestination(Object valu
}

public static String convertFromActiveMQDestination(Object value) {
return convertFromActiveMQDestination(value, false);
}

public static String convertFromActiveMQDestination(Object value, boolean includeOptions) {
if (value == null) {
return null;
}
Expand All @@ -70,7 +76,17 @@ public static String convertFromActiveMQDestination(Object value) {
Object e = list.get(i);
if (e instanceof ActiveMQDestination) {
ActiveMQDestination destination = (ActiveMQDestination) e;
sb.append(destination);
if (includeOptions && destination.getOptions() != null) {
try {
//Reapply the options as URI parameters
sb.append(destination.toString() + URISupport.applyParameters(
new URI(""), destination.getOptions()));
} catch (URISyntaxException e1) {
sb.append(destination);
}
} else {
sb.append(destination);
}
if (i < list.size() - 1) {
sb.append(", ");
}
Expand Down
Loading

0 comments on commit e73ab34

Please sign in to comment.