Skip to content

Commit

Permalink
https://issues.apache.org/jira/browse/AMQ-6361
Browse files Browse the repository at this point in the history
Honor the expired ack from the client as the authority on expiration
  • Loading branch information
tabish121 committed Jul 15, 2016
1 parent a35e8dc commit cd5ea6c
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,19 +318,15 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a
final MessageReference node = iter.next();
Destination nodeDest = (Destination) node.getRegionDestination();
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
if (node.isExpired()) {
if (broker.isExpired(node)) {
Destination regionDestination = nodeDest;
regionDestination.messageExpired(context, this, node);
}
iter.remove();
nodeDest.getDestinationStatistics().getInflight().decrement();
}
Destination regionDestination = nodeDest;
regionDestination.messageExpired(context, this, node);
iter.remove();
nodeDest.getDestinationStatistics().getInflight().decrement();

if (ack.getLastMessageId().equals(messageId)) {
if (usePrefetchExtension && getPrefetchSize() != 0) {
// allow batch to exceed prefetch
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.jmx.QueueView;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class MessageExpiryTimeDifferenceTest {

public static final String QUEUE_NAME = "timeout.test";
private ActiveMQConnection connection;
private BrokerService broker;
private String connectionUri;

@Before
public void setUp() throws Exception {
createBroker();

connection = createConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
}
}

if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}

/**
* if the client clock is slightly ahead of the brokers clock a message
* could be expired on the client. When the expiry is sent to the broker it
* checks if the message is also considered expired on the broker side.
*
* If the broker clock is behind the message could be considered not expired
* on the broker and not removed from the broker's dispatched list. This
* leaves the broker reporting a message inflight from the broker's
* perspective even though the message has been expired on the
* consumer(client) side
*
* The BrokerFlight is used to manipulate the expiration timestamp on the
* message when it is sent and ack'd from the consumer to simulate a time
* difference between broker and client in the unit test. This is rather
* invasive but it it difficult to test this deterministically in a unit
* test.
*/
@Test(timeout = 30000)
public void testInflightCountAfterExpiry() throws Exception {

connection.start();

// push message to queue
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("timeout.test");
MessageProducer producer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage(QUEUE_NAME);

producer.send(textMessage);
session.close();

// try to consume message
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(queue);
final CountDownLatch messageReceived = new CountDownLatch(1);

// call consume in a separate thread
ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.submit(new Runnable() {

@Override
public void run() {
Message message = null;
try {
message = consumer.receive(1000);
} catch (JMSException e) {
fail();
}

// message should be null as it should have expired and the
// consumer.receive(timeout) should return null.
assertNull(message);
messageReceived.countDown();
}
});

messageReceived.await(20, TimeUnit.SECONDS);

QueueView queueView = getQueueView(broker, QUEUE_NAME);
assertEquals("Should be No inflight messages", 0, queueView.getInFlightCount());

}

private void createBroker() throws Exception {
broker = new BrokerService();
broker.setUseJmx(true);
broker.setPersistent(false);
broker.setAdvisorySupport(false);
broker.addConnector("tcp://localhost:0");

// add a plugin to ensure the expiry happens on the client side the
// acknowledge() reset the expiration time to 30 seconds in the future.
//
// this simulates a scenario where the client clock is *0 seconds ahead
// of the broker's clock.
broker.setPlugins(new BrokerPlugin[] { new BrokerPlugin() {

@Override
public Broker installPlugin(Broker broker) throws Exception {
return new BrokerFilter(broker) {

private AtomicInteger counter = new AtomicInteger();
private org.apache.activemq.command.Message dispatchedMessage;

@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
if (counter.get() == 0 && messageDispatch.getDestination().getPhysicalName().contains("timeout.test")) {
// Set the expiration to now
dispatchedMessage = messageDispatch.getMessage();
dispatchedMessage.setExpiration(System.currentTimeMillis() - 100);

counter.incrementAndGet();
}
super.preProcessDispatch(messageDispatch);
}

@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
// set the expiration in the future, to simulate broker's clock is
// 30 seconds behind client clock
if (ack.isExpiredAck()) {
dispatchedMessage.setExpiration(System.currentTimeMillis() + 300000);
}
super.acknowledge(consumerExchange, ack);
}
};
}
} });

broker.start();
broker.waitUntilStarted();

connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}

protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(connectionUri);
}

protected ActiveMQConnection createConnection() throws Exception {
return (ActiveMQConnection) createConnectionFactory().createConnection();
}

private QueueView getQueueView(BrokerService broker, String queueName) throws Exception {
Map<ObjectName, DestinationView> queueViews = broker.getAdminView().getBroker().getQueueViews();

for (ObjectName key : queueViews.keySet()) {
DestinationView destinationView = queueViews.get(key);

if (destinationView instanceof QueueView) {
QueueView queueView = (QueueView) destinationView;

if (queueView.getName().equals(queueName)) {
return queueView;
}

}
}
return null;
}
}

0 comments on commit cd5ea6c

Please sign in to comment.