Skip to content

Commit

Permalink
HORNETQ-1467 / BZ-1207707 large messages sending over bridge when non…
Browse files Browse the repository at this point in the history
… persistent

https://issues.jboss.org/browse/HORNETQ-1467
https://bugzilla.redhat.com/show_bug.cgi?id=1207707

NullStorageLargeServerMessage's method copy is not implemented. For that reason a LargeMessage would fail if copied (operation that would happen through the Bridge or Expiration)
  • Loading branch information
clebertsuconic committed Mar 31, 2015
1 parent 52e2003 commit 521b5f5
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;

/**
Expand All @@ -29,6 +30,11 @@ public NullStorageLargeServerMessage()
super();
}

public NullStorageLargeServerMessage(ServerMessageImpl other)
{
super(other);
}

@Override
public void releaseResources()
{
Expand Down Expand Up @@ -79,7 +85,13 @@ public synchronized int getEncodeSize()
@Override
public String toString()
{
return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
return "NullStorageLargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
}

public ServerMessage copy()
{
// This is a simple copy, used only to avoid changing original properties
return new NullStorageLargeServerMessage(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ protected ConfigurationImpl createBasicConfig() throws Exception
* @return
* @throws Exception
*/
protected final ConfigurationImpl createBasicConfig(final int serverID)
protected ConfigurationImpl createBasicConfig(final int serverID)
{
ConfigurationImpl configuration = new ConfigurationImpl()
.setSecurityEnabled(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@
*/
package org.hornetq.tests.integration.jms.cluster;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQNotConnectedException;
import org.hornetq.api.core.TransportConfiguration;
Expand All @@ -29,18 +41,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* A BindingsClusterTest
*/
Expand Down Expand Up @@ -70,7 +70,7 @@ public void setUp() throws Exception
}

@Override
protected boolean enablePersistence()
protected boolean isPersistent()
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,70 @@

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import java.util.Arrays;
import java.util.Collection;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.tests.util.JMSClusteredTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/**
* A TextMessageOverBridgeTest
*
* @author clebertsuconic
*/
@RunWith(value = Parameterized.class)
public class LargeMessageOverBridgeTest extends JMSClusteredTestBase
{

private final boolean persistent;

@Override
protected boolean isPersistent()
{
return persistent;
}

@Parameterized.Parameters(name = "persistent={0}")
public static Collection getParameters()
{
return Arrays.asList(new Object[][]{
{true},
{false}
});
}


@Override
protected final ConfigurationImpl createBasicConfig(final int serverID)
{
ConfigurationImpl configuration = super.createBasicConfig(serverID);
configuration.setJournalFileSize(1024 * 1024);
return configuration;
}


public LargeMessageOverBridgeTest(boolean persistent)
{
this.persistent = persistent;
}


/**
* This was causing a text message to ber eventually converted into large message when sent over the bridge
*
Expand Down Expand Up @@ -79,6 +121,56 @@ public void testSendHalfLargeTextMessage() throws Exception

}

/**
* This was causing a text message to ber eventually converted into large message when sent over the bridge
*
* @throws Exception
*/
@Test
public void testSendMapMessageOverCluster() throws Exception
{
createQueue("Q1");

Queue queue = (Queue)context1.lookup("queue/Q1");
Connection conn1 = cf1.createConnection();
Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod1 = session1.createProducer(queue);

Connection conn2 = cf2.createConnection();
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons2 = session2.createConsumer(queue);
conn2.start();

StringBuffer buffer = new StringBuffer();

for (int i = 0; i < 3810002; i++)
{
buffer.append('a');
}

final int NUMBER_OF_MESSAGES = 1;

for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
{
MapMessage msg = session1.createMapMessage();
msg.setString("str", buffer.toString());
msg.setIntProperty("i", i);
prod1.send(msg);
}

for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
{
MapMessage msg = (MapMessage)cons2.receive(5000);
assertEquals(buffer.toString(), msg.getString("str"));
}

assertNull(cons2.receiveNoWait());

conn1.close();
conn2.close();

}


protected Configuration createConfigServer2()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void setupServer2() throws Exception
JMSConfigurationImpl jmsconfig = new JMSConfigurationImpl();

mBeanServer2 = MBeanServerFactory.createMBeanServer();
server2 = HornetQServers.newHornetQServer(conf2, mBeanServer2, enablePersistence());
server2 = HornetQServers.newHornetQServer(conf2, mBeanServer2, isPersistent());
jmsServer2 = new JMSServerManagerImpl(server2, jmsconfig);
context2 = new InVMNamingContext();
jmsServer2.setContext(context2);
Expand Down Expand Up @@ -195,13 +195,13 @@ private void setupServer1() throws Exception
JMSConfigurationImpl jmsconfig = new JMSConfigurationImpl();

mBeanServer1 = MBeanServerFactory.createMBeanServer();
server1 = HornetQServers.newHornetQServer(conf1, mBeanServer1, enablePersistence());
server1 = HornetQServers.newHornetQServer(conf1, mBeanServer1, isPersistent());
jmsServer1 = new JMSServerManagerImpl(server1, jmsconfig);
context1 = new InVMNamingContext();
jmsServer1.setContext(context1);
}

protected boolean enablePersistence()
protected boolean isPersistent()
{
return false;
}
Expand Down

0 comments on commit 521b5f5

Please sign in to comment.