Skip to content

Commit

Permalink
BZ-1019378 - Fix and testcase on Bridge over paging (testBridgeWithPa…
Browse files Browse the repository at this point in the history
…ging on BridgeTest was improved to cover this issue
  • Loading branch information
clebertsuconic committed Nov 1, 2013
1 parent 7e1abc0 commit ddbd642
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,6 @@ else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK)
if (tx != null)
{
installPageTransaction(tx, listCtx);
tx.setWaitBeforeCommit(true);
}
else if (syncNonTransactional)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ static enum State

boolean hasTimedOut(long currentTime, int defaultTimeout);

void setWaitBeforeCommit(boolean waitBeforeCommit);

void putProperty(int index, Object property);

Object getProperty(int index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ public class TransactionImpl implements Transaction

private final long id;

/**
* if the appendCommit has to be done only after the current operations are completed
*/
private boolean waitBeforeCommit = false;

private volatile State state = State.ACTIVE;

Expand Down Expand Up @@ -290,60 +286,12 @@ protected void doCommit() throws Exception
if (containsPersistent || xid != null && state == State.PREPARED)
{

if (waitBeforeCommit)
{
// we will wait all the pending operations to finish before we can add this
asyncAppendCommit();
}
else
{
storageManager.commit(id);
}
storageManager.commit(id);

state = State.COMMITTED;
}
}

/**
*
*/
protected void asyncAppendCommit()
{
final OperationContext ctx = storageManager.getContext();
storageManager.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
{
HornetQServerLogger.LOGGER.error("Error=" + errorCode + ", message=" + errorMessage);
}

public void done()
{
OperationContext originalCtx = storageManager.getContext();
try
{
storageManager.setContext(ctx);
storageManager.commit(id, false);
}
catch (Exception e)
{
onError(HornetQExceptionType.IO_ERROR.getCode(), e.getMessage());
}
finally
{
storageManager.setContext(originalCtx);
}
}

@Override
public String toString()
{
return IOAsyncTask.class.getName() + "(" + TransactionImpl.class.getName() + "-AsyncAppendCommit)";
}
});
storageManager.lineUpContext();
}

public void rollback() throws Exception
{
synchronized (timeoutLock)
Expand Down Expand Up @@ -415,11 +363,6 @@ public void setState(final State state)
this.state = state;
}

public void setWaitBeforeCommit(boolean waitBeforeCommit)
{
this.waitBeforeCommit = waitBeforeCommit;
}

public Xid getXid()
{
return xid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,18 +662,10 @@ protected HornetQServer createClusteredServerWithParams(final boolean isNetty,
final int maxAddressSize,
final Map<String, Object> params) throws Exception
{
if (isNetty)
{
return createServer(realFiles, createDefaultConfig(index, params, NETTY_ACCEPTOR_FACTORY),
pageSize,
maxAddressSize,
new HashMap<String, AddressSettings>());
}
else
{
return createServer(realFiles, createDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY), -1, -1,
new HashMap<String, AddressSettings>());
}
return createServer(realFiles, createDefaultConfig(index, params, (isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY)),
pageSize,
maxAddressSize,
new HashMap<String, AddressSettings>());
}

protected ServerLocator createFactory(final boolean isNetty) throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.hornetq.tests.integration.cluster.bridge;

import org.hornetq.utils.ReusableLatch;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -80,6 +82,16 @@ protected boolean isNetty()
return false;
}


@Override
@Before
public void setUp() throws Exception
{
StopInterceptor.reset();
super.setUp();
}


private String getConnector()
{
if (isNetty())
Expand Down Expand Up @@ -1334,7 +1346,6 @@ public void testBridgeWithPaging() throws Exception
final int PAGE_MAX = 100 * 1024;

final int PAGE_SIZE = 10 * 1024;
ServerLocator locator = null;
try
{

Expand All @@ -1343,7 +1354,7 @@ public void testBridgeWithPaging() throws Exception

Map<String, Object> server1Params = new HashMap<String, Object>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, PAGE_SIZE, PAGE_MAX, server1Params);

final String testAddress = "testAddress";
final String queueName0 = "queue0";
Expand All @@ -1358,6 +1369,8 @@ public void testBridgeWithPaging() throws Exception

server0.getConfiguration().setConnectorConfigurations(connectors);

server0.getConfiguration().setIDCacheSize(20000);

ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(server1tc.getName());

Expand All @@ -1369,18 +1382,22 @@ public void testBridgeWithPaging() throws Exception
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
1,
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
-1,
false,
0,
1,
staticConnectors,
false,
HornetQDefaultConfiguration.getDefaultClusterUser(),
HornetQDefaultConfiguration.getDefaultClusterPassword());

bridgeConfiguration.setCallTimeout(1000);

bridgeConfiguration.setUseDuplicateDetection(true);

List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
Expand All @@ -1395,6 +1412,12 @@ public void testBridgeWithPaging() throws Exception
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);

List<String> interceptorToStop = new ArrayList<String>();
interceptorToStop.add(StopInterceptor.class.getName());
server1.getConfiguration().setIncomingInterceptorClassNames(interceptorToStop);

StopInterceptor.serverToStop = server0;

server1.start();
server0.start();

Expand All @@ -1403,7 +1426,7 @@ public void testBridgeWithPaging() throws Exception

ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);

ClientSession session0 = sf0.createSession(false, true, true);
ClientSession session0 = sf0.createSession(false, false, true);

ClientSession session1 = sf1.createSession(false, true, true);

Expand All @@ -1413,31 +1436,92 @@ public void testBridgeWithPaging() throws Exception

session1.start();

final int numMessages = 500;
final int numMessages = 6000;

final SimpleString propKey = new SimpleString("testkey");

for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session0.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[512]);

message.putIntProperty(propKey, i);

producer0.send(message);
}

session0.commit();

assertTrue(StopInterceptor.latch.await(1, TimeUnit.HOURS));

StopInterceptor.thread.join(15000);

if (StopInterceptor.thread.isAlive())
{
System.out.println(threadDump("Still alive, stop didn't work!!!"));
fail("Thread that should restart the server still alive");
}


// Restarting the server
server0.start();

HashMap<Integer, AtomicInteger> receivedMsg = new HashMap<Integer, AtomicInteger>();

for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(5000);

Assert.assertNotNull(message);
if (message == null)
{
break;
}

Assert.assertEquals(i, message.getObjectProperty(propKey));
Integer msgKey = message.getIntProperty(propKey);

message.acknowledge();
AtomicInteger msgCount = receivedMsg.get(msgKey);

if (msgKey.intValue() != i)
{
System.err.println("Message " + msgCount + " received out of order, expected to be " + i + " it's acceptable but not the ideal!");
}

if (msgCount == null)
{
msgCount = new AtomicInteger();
receivedMsg.put(msgKey, msgCount);
}

msgCount.incrementAndGet();

if (i % 500 == 0) System.out.println("received " + i);
}

Assert.assertNull(consumer1.receiveImmediate());
boolean failed = false;

if (consumer1.receiveImmediate() != null)
{
System.err.println("Unexpected message received");
failed = true;
}

for (int i = 0 ; i < numMessages; i++)
{
AtomicInteger msgCount = receivedMsg.get((Integer)i);
if (msgCount == null)
{
System.err.println("Msg " + i + " wasn't received");
failed = true;
}
else if (msgCount.get() > 1)
{
System.err.println("msg " + i + " was received " + msgCount.get() + " times");
failed = true;
}

}

assertFalse("Test failed", failed);

session0.close();

Expand Down Expand Up @@ -1479,6 +1563,71 @@ public void testBridgeWithPaging() throws Exception
}


// Stops a server after 100 messages received
public static class StopInterceptor implements Interceptor
{
static HornetQServer serverToStop;

static Thread thread;

static final ReusableLatch latch = new ReusableLatch(0);


public static void reset()
{
latch.setCount(1);
serverToStop = null;
count = 0;
thread = null;
}

static int count = 0;
@Override
public synchronized boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{

if (packet instanceof SessionSendMessage && count == 1000)
{
System.out.println("Going to kill the server");
}
if (packet instanceof SessionSendMessage && ++count == 5000)
{
try
{
System.out.println("Stopping server after " + count + " messages");

thread = new Thread("***Server Restarter***")
{

public void run()
{
try
{
System.out.println("Stopping server");
latch.countDown();
serverToStop.stop(false);
}
catch (Exception e)
{
e.printStackTrace();
}
}
};

thread.start();

latch.await();
return true;
}
catch (Exception e)
{
e.printStackTrace();
}
}
return true;
}
}

@Test
public void testBridgeWithLargeMessage() throws Exception
{
Expand Down

0 comments on commit ddbd642

Please sign in to comment.