Skip to content

Commit

Permalink
Merge pull request hornetq#2000 from jbertram/master
Browse files Browse the repository at this point in the history
BZ-1195210 + a few test fixes
  • Loading branch information
clebertsuconic committed Mar 4, 2015
2 parents 6e3f6d9 + f18a13b commit 52e2003
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,10 @@ public int hashCode()
result = 31 * result + (problem != null ? problem.hashCode() : 0);
return result;
}

@Override
public String toString()
{
return getParentString() + ", problem=" + problem.name() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface RemotingService

void start() throws Exception;

void startAcceptors() throws Exception;

boolean isStarted();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@ public ClassLoader run()
}
}

for (Acceptor a : acceptors.values())
{
a.start();
}
/**
* Don't start the acceptors here. Only start the acceptors at the every end of the start-up process to avoid
* race conditions. See {@link #startAcceptors()}.
*/

// This thread checks connections that need to be closed, and also flushes confirmations
failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
Expand All @@ -339,6 +339,17 @@ public ClassLoader run()
started = true;
}

public synchronized void startAcceptors() throws Exception
{
if (isStarted())
{
for (Acceptor a : acceptors.values())
{
a.start();
}
}
}

public synchronized void allowInvmSecurityOverride(HornetQPrincipal principal)
{
defaultInvmSecurityPrincipal = principal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ public final synchronized void start() throws Exception
}
else
{
state = SERVER_STATE.STARTED;
HornetQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), nodeManager.getNodeId(),
identity != null ? identity : "");
}
Expand Down Expand Up @@ -1829,16 +1828,17 @@ public void run()
{
throw HornetQMessageBundle.BUNDLE.nodeIdNull();
}
activationLatch.countDown();

// We can only do this after everything is started otherwise we may get nasty races with expired messages
postOffice.startExpiryScanner();
}
else
{
activationLatch.countDown();
}
}

public void completeActivation() throws Exception
{
setState(HornetQServerImpl.SERVER_STATE.STARTED);
getRemotingService().startAcceptors();
activationLatch.countDown();
callActivationCompleteCallbacks();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public void run()

hornetQServer.initialisePart2(false);

hornetQServer.completeActivation();

if (hornetQServer.getIdentity() != null)
{
HornetQServerLogger.LOGGER.serverIsLive(hornetQServer.getIdentity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ public void run()
}

}

hornetQServer.completeActivation();
}
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void run()

hornetQServer.initialisePart2(false);

hornetQServer.completeActivation();

if (hornetQServer.getIdentity() != null)
{
HornetQServerLogger.LOGGER.serverIsLive(hornetQServer.getIdentity());
Expand Down Expand Up @@ -137,6 +139,7 @@ public void handlePacket(Packet packet)
}
catch (HornetQException e)
{
HornetQServerLogger.LOGGER.debug("Failed to process backup registration packet", e);
channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public void run()

hornetQServer.initialisePart2(scalingDown);

hornetQServer.completeActivation();

if (scalingDown)
{
HornetQServerLogger.LOGGER.backupServerScaledDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public void run()

hornetQServer.initialisePart2(false);

hornetQServer.completeActivation();

HornetQServerLogger.LOGGER.serverIsLive();
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.hornetq.byteman.tests;

import java.util.concurrent.CountDownLatch;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
import org.hornetq.tests.util.ReplicatedBackupUtils;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.TransportConfigurationUtils;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(BMUnitRunner.class)
public class ReplicationBackupTest extends ServiceTestBase
{
private static final CountDownLatch ruleFired = new CountDownLatch(1);
private HornetQServer backupServer;
private HornetQServer liveServer;

/*
* simple test to induce a potential race condition where the server's acceptors are active, but the server's
* state != STARTED
*/
@Test
@BMRules
(
rules =
{
@BMRule
(
name = "prevent backup annoucement",
targetClass = "org.hornetq.core.server.impl.SharedNothingLiveActivation",
targetMethod = "run",
targetLocation = "AT EXIT",
action = "org.hornetq.byteman.tests.ReplicationBackupTest.breakIt();"
)
}
)
public void testBasicConnection() throws Exception
{
TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);

Configuration backupConfig = createDefaultConfig();
Configuration liveConfig = createDefaultConfig();

liveConfig.setJournalType(JournalType.NIO);
backupConfig.setJournalType(JournalType.NIO);

backupConfig.setBackup(true);

final String suffix = "_backup";
backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix);
backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + suffix);
backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + suffix);
backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() + suffix);

ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);

liveServer = createServer(liveConfig);

// start the live server in a new thread so we can start the backup simultaneously to induce a potential race
Thread startThread = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
liveServer.start();
}
catch (Exception e)
{
e.printStackTrace();
}
}
});
startThread.start();

ruleFired.await();

backupServer = createServer(backupConfig);
backupServer.start();
ServiceTestBase.waitForRemoteBackup(null, 3, true, backupServer);
}

public static void breakIt()
{
ruleFired.countDown();
try
{
/* before the fix this sleep would put the "live" server into a state where the acceptors were started
* but the server's state != STARTED which would cause the backup to fail to announce
*/
Thread.sleep(2000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ public void testStartActivationListConnections() throws Exception

spec.setPassword("password");

spec.setDestinationType("Queue");
spec.setDestinationType("javax.jms.Queue");
spec.setDestination("test");

spec.setMinSession(1);
Expand Down Expand Up @@ -628,7 +628,7 @@ public void testStartActivationOverrideListConnections() throws Exception
spec.setUser("user");
spec.setPassword("password");

spec.setDestinationType("Queue");
spec.setDestinationType("javax.jms.Queue");
spec.setDestination("test");

spec.setMinSession(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testStartStopActivationManyTimes() throws Exception
spec.setUser("user");
spec.setPassword("password");

spec.setDestinationType("Topic");
spec.setDestinationType("javax.jms.Topic");
spec.setDestination("test");

spec.setMinSession(1);
Expand Down

0 comments on commit 52e2003

Please sign in to comment.