Skip to content

Commit

Permalink
Fix test failure in CI
Browse files Browse the repository at this point in the history
Has race condition on the ArrayList it uses to track subs and 
Fix the unreliable sleep used to track locked messages in subs
Ensure Broker is shut down on test completion.
  • Loading branch information
tabish121 committed Jun 16, 2016
1 parent 5ba8679 commit 9ac5f83
Showing 1 changed file with 70 additions and 77 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,4 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Expand All @@ -32,14 +16,19 @@
*/
package org.apache.activemq.broker.region;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;

Expand All @@ -61,28 +50,31 @@
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import junit.framework.TestCase;

public class SubscriptionAddRemoveQueueTest extends TestCase {

Queue queue;

ConsumerInfo info = new ConsumerInfo();
List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
ConnectionContext context = new ConnectionContext();
ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
ProducerInfo producerInfo = new ProducerInfo();
ProducerState producerState = new ProducerState(producerInfo);
ActiveMQDestination destination = new ActiveMQQueue("TEST");
int numSubscriptions = 1000;
boolean working = true;
int senders = 20;


@Override
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class SubscriptionAddRemoveQueueTest {

private BrokerService brokerService;
private Queue queue;
private ConsumerInfo info = new ConsumerInfo();
private List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
private ConnectionContext context = new ConnectionContext();
private ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
private ProducerInfo producerInfo = new ProducerInfo();
private ProducerState producerState = new ProducerState(producerInfo);
private ActiveMQDestination destination = new ActiveMQQueue("TEST");
private int numSubscriptions = 1000;
private boolean working = true;
private int senders = 20;

@Before
public void setUp() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService = new BrokerService();
brokerService.start();

DestinationStatistics parentStats = new DestinationStatistics();
parentStats.setEnabled(true);

Expand All @@ -99,6 +91,15 @@ public void setUp() throws Exception {
queue.initialize();
}

@After
public void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
}
}

@Test(timeout = 120000)
public void testNoDispatchToRemovedConsumers() throws Exception {
final AtomicInteger producerId = new AtomicInteger();
Runnable sender = new Runnable() {
Expand Down Expand Up @@ -134,21 +135,34 @@ public void run() {
}
};

for (int i=0;i<numSubscriptions; i++) {
for (int i = 0; i < numSubscriptions; i++) {
SimpleImmediateDispatchSubscription sub = new SimpleImmediateDispatchSubscription();
subs.add(sub);
queue.addSubscription(context, sub);
}

assertEquals("there are X subscriptions", numSubscriptions, queue.getDestinationStatistics().getConsumers().getCount());
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0; i<senders ; i++) {
for (int i = 0; i < senders; i++) {
executor.submit(sender);
}

Thread.sleep(1000);
for (SimpleImmediateDispatchSubscription sub : subs) {
assertTrue("There are some locked messages in the subscription", hasSomeLocks(sub.dispatched));
}
assertTrue("All subs should have some locks", Wait.waitFor(new Wait.Condition() {

@Override
public boolean isSatisified() throws Exception {
boolean allHaveLocks = true;

for (SimpleImmediateDispatchSubscription sub : subs) {
if (!hasSomeLocks(sub.dispatched)) {
allHaveLocks = false;
break;
}
}

return allHaveLocks;
}
}));

Future<?> result = executor.submit(subRemover);
result.get();
Expand All @@ -158,12 +172,11 @@ public void run() {
for (SimpleImmediateDispatchSubscription sub : subs) {
assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched));
}

}

private boolean hasSomeLocks(List<MessageReference> dispatched) {
boolean hasLock = false;
for (MessageReference mr: dispatched) {
for (MessageReference mr : dispatched) {
QueueMessageReference qmr = (QueueMessageReference) mr;
if (qmr.getLockOwner() != null) {
hasLock = true;
Expand All @@ -173,21 +186,19 @@ private boolean hasSomeLocks(List<MessageReference> dispatched) {
return hasLock;
}

public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
private class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {

private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
List<MessageReference> dispatched =
Collections.synchronizedList(new ArrayList<MessageReference>());
List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();

@Override
public void acknowledge(ConnectionContext context, MessageAck ack)
throws Exception {
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
}

@Override
public void add(MessageReference node) throws Exception {
// immediate dispatch
QueueMessageReference qmr = (QueueMessageReference)node;
QueueMessageReference qmr = (QueueMessageReference) node;
qmr.lock(this);
dispatched.add(qmr);
}
Expand Down Expand Up @@ -234,8 +245,7 @@ public void resetConsumedCount() {
}

@Override
public void add(ConnectionContext context, Destination destination)
throws Exception {
public void add(ConnectionContext context, Destination destination) throws Exception {
}

@Override
Expand Down Expand Up @@ -331,13 +341,8 @@ public boolean isRecoveryRequired() {
return false;
}

public boolean isSlave() {
return false;
}

@Override
public boolean matches(MessageReference node,
MessageEvaluationContext context) throws IOException {
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
return true;
}

Expand All @@ -347,13 +352,11 @@ public boolean matches(ActiveMQDestination destination) {
}

@Override
public void processMessageDispatchNotification(
MessageDispatchNotification mdn) throws Exception {
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
}

@Override
public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
return null;
}

Expand All @@ -363,8 +366,7 @@ public boolean isWildcard() {
}

@Override
public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
return new ArrayList<MessageReference>(dispatched);
}

Expand All @@ -373,17 +375,15 @@ public void setObjectName(ObjectName objectName) {
}

@Override
public void setSelector(String selector)
throws InvalidSelectorException, UnsupportedOperationException {
public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
}

@Override
public void updateConsumerPrefetch(int newPrefetch) {
}

@Override
public boolean addRecoveredMessage(ConnectionContext context,
MessageReference message) throws Exception {
public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
return false;
}

Expand All @@ -402,12 +402,6 @@ public boolean isLockExclusive() {
return false;
}

public void addDestination(Destination destination) {
}

public void removeDestination(Destination destination) {
}

@Override
public int countBeforeFull() {
return 10;
Expand All @@ -422,6 +416,5 @@ public SubscriptionStatistics getSubscriptionStatistics() {
public long getInFlightMessageSize() {
return subscriptionStatistics.getInflightMessageSize().getTotalSize();
}

}
}

0 comments on commit 9ac5f83

Please sign in to comment.