Skip to content

Commit

Permalink
https://issues.apache.org/jira/browse/AMQ-6340
Browse files Browse the repository at this point in the history
combine the lists in the correct order for later redispatch.
  • Loading branch information
tabish121 committed Jun 29, 2016
1 parent 03a211e commit 4e23adf
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ protected void performAdditionalConfiguration(BrokerService brokerService) throw

policyEntry.setQueue(">");
policyEntry.setStrictOrderDispatch(true);
policyEntry.setProducerFlowControl(true);
policyEntry.setMemoryLimit(1024 * 1024);

policyEntries.add(policyEntry);

Expand All @@ -85,7 +83,7 @@ public void testMessageOrderAfterRollback() throws Exception {
sendMessages(5);

int counter = 0;
while (counter++ < 10) {
while (counter++ < 20) {
LOG.info("Creating connection using prefetch of: {}", prefetch);

JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch));
Expand All @@ -100,11 +98,11 @@ public void testMessageOrderAfterRollback() throws Exception {
Message message = consumer.receive(5000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
LOG.info("Read message = {}", ((TextMessage) message).getText());

int sequenceID = message.getIntProperty("sequenceID");
assertEquals(0, sequenceID);

LOG.info("Read message = {}", ((TextMessage) message).getText());
session.rollback();
session.close();
connection.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -639,39 +640,40 @@ public List<MessageReference> remove(ConnectionContext context, Destination dest
}

public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
// Here is a potential problem concerning Inflight stat:
// Messages not already committed or rolled back may not be removed from dispatched list at the moment
// Except if each commit or rollback callback action comes before remove of subscriber.
rc.addAll(pending.remove(context, destination));
redispatch.addAll(pending.remove(context, destination));

if (dispatched == null) {
return rc;
return redispatch;
}

// Synchronized to DispatchLock if necessary
if (dispatched == this.dispatched) {
synchronized(dispatchLock) {
updateDestinationStats(rc, destination, dispatched);
addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
}
} else {
updateDestinationStats(rc, destination, dispatched);
addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
}
}
return rc;

return redispatch;
}

private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) {
ArrayList<MessageReference> references = new ArrayList<MessageReference>();
for (MessageReference r : dispatched) {
if (r.getRegionDestination() == destination) {
references.add(r);
getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
}
}
rc.addAll(references);
redispatch.addAll(0, references);
destination.getDestinationStatistics().getInflight().subtract(references.size());
dispatched.removeAll(references);
}
Expand Down

0 comments on commit 4e23adf

Please sign in to comment.