Skip to content

Commit

Permalink
Merge pull request hornetq#1912 from andytaylor/master
Browse files Browse the repository at this point in the history
BZ-1152410 - handle rollback called twice
  • Loading branch information
clebertsuconic committed Oct 15, 2014
2 parents 83826d4 + bd8c305 commit 2e55fcf
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,21 @@ private void doRollback(final boolean clientFailed, final boolean lastMessageAsD
toCancel.addAll(consumer.cancelRefs(clientFailed, lastMessageAsDelived, theTx));
}

//we need to check this before we cancel the refs and add them to the tx, any delivering refs will have been delivered
//after the last tx was rolled back so we should handle them separately. if not they
//will end up added to the tx but never ever handled even tho they were removed from the consumers delivering refs.
//we add them to a new tx and roll them back as the calling client will assume that this has happened.
if (theTx.getState() == State.ROLLEDBACK)
{
Transaction newTX = newTransaction();
cancelAndRollback(clientFailed, newTX, wasStarted, toCancel);
throw new IllegalStateException("Transaction has already been rolled back");
}
cancelAndRollback(clientFailed, theTx, wasStarted, toCancel);
}

private void cancelAndRollback(boolean clientFailed, Transaction theTx, boolean wasStarted, List<MessageReference> toCancel) throws Exception
{
for (MessageReference ref : toCancel)
{
ref.getQueue().cancel(theTx, ref);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.hornetq.api.core.HornetQTransactionOutcomeUnknownException;
import org.hornetq.api.core.HornetQTransactionRolledBackException;
import org.hornetq.api.core.HornetQUnBlockedException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
Expand All @@ -27,6 +28,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.HornetQClientMessageBundle;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
Expand All @@ -35,11 +37,13 @@
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.tests.integration.cluster.failover.FailoverTestBase;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.UUIDGenerator;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -61,6 +65,7 @@ public class BMFailoverTest extends FailoverTestBase
public void setUp() throws Exception
{
super.setUp();
stopped = false;
locator = getServerLocator();
}

Expand All @@ -71,6 +76,122 @@ public void tearDown() throws Exception
super.tearDown();
}

private static boolean stopped = false;
public static void stopAndThrow() throws HornetQUnBlockedException
{
if (!stopped)
{
try
{
serverToStop.getServer().stop(true);
}
catch (Exception e)
{
e.printStackTrace();
}
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
stopped = true;
throw HornetQClientMessageBundle.BUNDLE.unblockingACall(null);
}
}
@Test
@BMRules
(
rules =
{
@BMRule
(
name = "trace HornetQSessionContext xaEnd",
targetClass = "org.hornetq.core.protocol.core.impl.HornetQSessionContext",
targetMethod = "xaEnd",
targetLocation = "AT EXIT",
action = "org.hornetq.byteman.tests.BMFailoverTest.stopAndThrow()"
)
}
)
//https://bugzilla.redhat.com/show_bug.cgi?id=1152410
public void testFailOnEndAndRetry() throws Exception
{
serverToStop = liveServer;

createSessionFactory();

ClientSession session = createSession(sf, true, false, false);

session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);

ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);

for (int i = 0; i < 100; i++)
{
producer.send(createMessage(session, i, true));
}

ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);

Xid xid = RandomUtil.randomXid();

session.start(xid, XAResource.TMNOFLAGS);
session.start();
// Receive MSGs but don't ack!
for (int i = 0; i < 100; i++)
{
ClientMessage message = consumer.receive(1000);

Assert.assertNotNull(message);

assertMessageBody(i, message);

Assert.assertEquals(i, message.getIntProperty("counter").intValue());
}
try
{
//top level prepare
session.end(xid, XAResource.TMSUCCESS);
}
catch (XAException e)
{
try
{
//top level abort
session.end(xid, XAResource.TMFAIL);
}
catch (XAException e1)
{
try
{
//rollback
session.rollback(xid);
}
catch (XAException e2)
{
}
}
}
xid = RandomUtil.randomXid();
session.start(xid, XAResource.TMNOFLAGS);

for (int i = 0; i < 50; i++)
{
ClientMessage message = consumer.receive(1000);

Assert.assertNotNull(message);

assertMessageBody(i, message);

Assert.assertEquals(i, message.getIntProperty("counter").intValue());
}
session.end(xid, XAResource.TMSUCCESS);
session.commit(xid, true);
}

@Test
@BMRules
(
Expand Down Expand Up @@ -304,6 +425,13 @@ private ClientSession createXASessionAndQueue() throws Exception
return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
}

protected ClientSession
createSession(ClientSessionFactory sf1, boolean xa, boolean autoCommitSends, boolean autoCommitAcks) throws Exception
{
return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
}


private void createSessionFactory() throws Exception
{
locator.setBlockOnNonDurableSend(true);
Expand Down

0 comments on commit 2e55fcf

Please sign in to comment.