From e310f8d500fd7b4ddbaaaad8e7a801cc1da18429 Mon Sep 17 00:00:00 2001 From: jbertram Date: Fri, 27 Feb 2015 14:57:33 -0600 Subject: [PATCH 1/2] BZ-1195210 Backup fails to announce to live server When a live and backup server are both started at or near the same moment there is a small window where the live server's acceptors have been started but the server's state != STARTED. During this window if the backup sends its announcement the announcement will fail and the backup will shutdown. This fix closes this small window by only starting the acceptors until the server is fully started. --- .../BackupReplicationStartFailedMessage.java | 6 + .../core/remoting/server/RemotingService.java | 2 + .../server/impl/RemotingServiceImpl.java | 19 ++- .../core/server/impl/HornetQServerImpl.java | 12 +- .../core/server/impl/LiveOnlyActivation.java | 2 + .../impl/SharedNothingBackupActivation.java | 2 + .../impl/SharedNothingLiveActivation.java | 3 + .../impl/SharedStoreBackupActivation.java | 2 + .../impl/SharedStoreLiveActivation.java | 2 + .../byteman/tests/ReplicationBackupTest.java | 122 ++++++++++++++++++ 10 files changed, 162 insertions(+), 10 deletions(-) create mode 100644 tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ReplicationBackupTest.java diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupReplicationStartFailedMessage.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupReplicationStartFailedMessage.java index c569e458fef..de236b06334 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupReplicationStartFailedMessage.java +++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupReplicationStartFailedMessage.java @@ -110,4 +110,10 @@ public int hashCode() result = 31 * result + (problem != null ? problem.hashCode() : 0); return result; } + + @Override + public String toString() + { + return getParentString() + ", problem=" + problem.name() + "]"; + } } diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/RemotingService.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/RemotingService.java index 3a95e5ac87d..43ddb478d29 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/RemotingService.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/RemotingService.java @@ -51,6 +51,8 @@ public interface RemotingService void start() throws Exception; + void startAcceptors() throws Exception; + boolean isStarted(); /** diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java index f1b2ce9efda..9b79c655d08 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java @@ -326,10 +326,10 @@ public ClassLoader run() } } - for (Acceptor a : acceptors.values()) - { - a.start(); - } + /** + * Don't start the acceptors here. Only start the acceptors at the every end of the start-up process to avoid + * race conditions. See {@link #startAcceptors()}. + */ // This thread checks connections that need to be closed, and also flushes confirmations failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL); @@ -339,6 +339,17 @@ public ClassLoader run() started = true; } + public synchronized void startAcceptors() throws Exception + { + if (isStarted()) + { + for (Acceptor a : acceptors.values()) + { + a.start(); + } + } + } + public synchronized void allowInvmSecurityOverride(HornetQPrincipal principal) { defaultInvmSecurityPrincipal = principal; diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java index ad2538e84dd..fedb8f86cf4 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java @@ -441,7 +441,6 @@ public final synchronized void start() throws Exception } else { - state = SERVER_STATE.STARTED; HornetQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), nodeManager.getNodeId(), identity != null ? identity : ""); } @@ -1829,16 +1828,17 @@ public void run() { throw HornetQMessageBundle.BUNDLE.nodeIdNull(); } - activationLatch.countDown(); // We can only do this after everything is started otherwise we may get nasty races with expired messages postOffice.startExpiryScanner(); } - else - { - activationLatch.countDown(); - } + } + public void completeActivation() throws Exception + { + setState(HornetQServerImpl.SERVER_STATE.STARTED); + getRemotingService().startAcceptors(); + activationLatch.countDown(); callActivationCompleteCallbacks(); } diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java index ceb60d128eb..abcbb0dcd7b 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java @@ -58,6 +58,8 @@ public void run() hornetQServer.initialisePart2(false); + hornetQServer.completeActivation(); + if (hornetQServer.getIdentity() != null) { HornetQServerLogger.LOGGER.serverIsLive(hornetQServer.getIdentity()); diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingBackupActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingBackupActivation.java index ea2a9761e5c..376e97f5df9 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingBackupActivation.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingBackupActivation.java @@ -285,6 +285,8 @@ public void run() } } + + hornetQServer.completeActivation(); } } catch (Exception e) diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingLiveActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingLiveActivation.java index 27ad9d87e00..219af2cbe85 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingLiveActivation.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingLiveActivation.java @@ -99,6 +99,8 @@ public void run() hornetQServer.initialisePart2(false); + hornetQServer.completeActivation(); + if (hornetQServer.getIdentity() != null) { HornetQServerLogger.LOGGER.serverIsLive(hornetQServer.getIdentity()); @@ -137,6 +139,7 @@ public void handlePacket(Packet packet) } catch (HornetQException e) { + HornetQServerLogger.LOGGER.debug("Failed to process backup registration packet", e); channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION)); } } diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreBackupActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreBackupActivation.java index 6c27e12a5a4..fd8f38e687c 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreBackupActivation.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreBackupActivation.java @@ -83,6 +83,8 @@ public void run() hornetQServer.initialisePart2(scalingDown); + hornetQServer.completeActivation(); + if (scalingDown) { HornetQServerLogger.LOGGER.backupServerScaledDown(); diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreLiveActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreLiveActivation.java index 4f8baf27ed4..f7c2b13948d 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreLiveActivation.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreLiveActivation.java @@ -72,6 +72,8 @@ public void run() hornetQServer.initialisePart2(false); + hornetQServer.completeActivation(); + HornetQServerLogger.LOGGER.serverIsLive(); } catch (Exception e) diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ReplicationBackupTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ReplicationBackupTest.java new file mode 100644 index 00000000000..7676fe53b52 --- /dev/null +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ReplicationBackupTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.byteman.tests; + +import java.util.concurrent.CountDownLatch; + +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.JournalType; +import org.hornetq.tests.util.ReplicatedBackupUtils; +import org.hornetq.tests.util.ServiceTestBase; +import org.hornetq.tests.util.TransportConfigurationUtils; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class ReplicationBackupTest extends ServiceTestBase +{ + private static final CountDownLatch ruleFired = new CountDownLatch(1); + private HornetQServer backupServer; + private HornetQServer liveServer; + + /* + * simple test to induce a potential race condition where the server's acceptors are active, but the server's + * state != STARTED + */ + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "prevent backup annoucement", + targetClass = "org.hornetq.core.server.impl.SharedNothingLiveActivation", + targetMethod = "run", + targetLocation = "AT EXIT", + action = "org.hornetq.byteman.tests.ReplicationBackupTest.breakIt();" + ) + } + ) + public void testBasicConnection() throws Exception + { + TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0); + TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0); + TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0); + TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0); + + Configuration backupConfig = createDefaultConfig(); + Configuration liveConfig = createDefaultConfig(); + + liveConfig.setJournalType(JournalType.NIO); + backupConfig.setJournalType(JournalType.NIO); + + backupConfig.setBackup(true); + + final String suffix = "_backup"; + backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix); + backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + suffix); + backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + suffix); + backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() + suffix); + + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor); + + liveServer = createServer(liveConfig); + + // start the live server in a new thread so we can start the backup simultaneously to induce a potential race + Thread startThread = new Thread(new Runnable() + { + @Override + public void run() + { + try + { + liveServer.start(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }); + startThread.start(); + + ruleFired.await(); + + backupServer = createServer(backupConfig); + backupServer.start(); + ServiceTestBase.waitForRemoteBackup(null, 3, true, backupServer); + } + + public static void breakIt() + { + ruleFired.countDown(); + try + { + /* before the fix this sleep would put the "live" server into a state where the acceptors were started + * but the server's state != STARTED which would cause the backup to fail to announce + */ + Thread.sleep(2000); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } +} \ No newline at end of file From f18a13bb0d556947275d41727c3b515834cc107f Mon Sep 17 00:00:00 2001 From: jbertram Date: Mon, 2 Mar 2015 16:36:11 -0600 Subject: [PATCH 2/2] Fix a few tests using the wrong destinationType --- .../jms/server/management/JMSServerControl2Test.java | 4 ++-- .../org/hornetq/tests/integration/ra/ResourceAdapterTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java index 1395a8149f2..1992ed2c93d 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java @@ -539,7 +539,7 @@ public void testStartActivationListConnections() throws Exception spec.setPassword("password"); - spec.setDestinationType("Queue"); + spec.setDestinationType("javax.jms.Queue"); spec.setDestination("test"); spec.setMinSession(1); @@ -628,7 +628,7 @@ public void testStartActivationOverrideListConnections() throws Exception spec.setUser("user"); spec.setPassword("password"); - spec.setDestinationType("Queue"); + spec.setDestinationType("javax.jms.Queue"); spec.setDestination("test"); spec.setMinSession(1); diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/ResourceAdapterTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/ResourceAdapterTest.java index a8dddcd925b..e3fad7eded0 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/ResourceAdapterTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/ResourceAdapterTest.java @@ -82,7 +82,7 @@ public void testStartStopActivationManyTimes() throws Exception spec.setUser("user"); spec.setPassword("password"); - spec.setDestinationType("Topic"); + spec.setDestinationType("javax.jms.Topic"); spec.setDestination("test"); spec.setMinSession(1);