Skip to content

Commit c94fc4f

Browse files
committed
[ROCKETMQ-311] Add a swith for broker fast failure and support pull request queue
1 parent cba3089 commit c94fc4f

File tree

3 files changed

+104
-6
lines changed

3 files changed

+104
-6
lines changed

broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.latency;
1818

19+
import java.util.concurrent.BlockingQueue;
1920
import java.util.concurrent.Executors;
2021
import java.util.concurrent.ScheduledExecutorService;
2122
import java.util.concurrent.TimeUnit;
@@ -27,6 +28,10 @@
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

31+
/**
32+
* BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and
33+
* {@link BrokerController#pullThreadPoolQueue}
34+
*/
3035
public class BrokerFastFailure {
3136
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
3237
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
@@ -52,7 +57,9 @@ public void start() {
5257
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
5358
@Override
5459
public void run() {
55-
cleanExpiredRequest();
60+
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
61+
cleanExpiredRequest();
62+
}
5663
}
5764
}, 1000, 10, TimeUnit.MILLISECONDS);
5865
}
@@ -75,10 +82,18 @@ private void cleanExpiredRequest() {
7582
}
7683
}
7784

85+
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
86+
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
87+
88+
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
89+
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
90+
}
91+
92+
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
7893
while (true) {
7994
try {
80-
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
81-
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
95+
if (!blockingQueue.isEmpty()) {
96+
final Runnable runnable = blockingQueue.peek();
8297
if (null == runnable) {
8398
break;
8499
}
@@ -88,10 +103,10 @@ private void cleanExpiredRequest() {
88103
}
89104

90105
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
91-
if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
92-
if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
106+
if (behind >= maxWaitTimeMillsInQueue) {
107+
if (blockingQueue.remove(runnable)) {
93108
rt.setStopRun(true);
94-
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
109+
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
95110
}
96111
} else {
97112
break;
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.broker.latency;
18+
19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.rocketmq.remoting.netty.RequestTask;
23+
import org.junit.Test;
24+
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
27+
public class BrokerFastFailureTest {
28+
@Test
29+
public void testCleanExpiredRequestInQueue() throws Exception {
30+
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
31+
32+
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
33+
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
34+
assertThat(queue.size()).isZero();
35+
36+
//Normal Runnable
37+
Runnable runnable = new Runnable() {
38+
@Override
39+
public void run() {
40+
41+
}
42+
};
43+
queue.add(runnable);
44+
45+
assertThat(queue.size()).isEqualTo(1);
46+
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
47+
assertThat(queue.size()).isEqualTo(1);
48+
49+
queue.clear();
50+
51+
//With expired request
52+
RequestTask expiredRequest = new RequestTask(runnable, null, null);
53+
queue.add(new FutureTaskExt<>(expiredRequest, null));
54+
TimeUnit.MILLISECONDS.sleep(100);
55+
56+
RequestTask requestTask = new RequestTask(runnable, null, null);
57+
queue.add(new FutureTaskExt<>(requestTask, null));
58+
59+
assertThat(queue.size()).isEqualTo(2);
60+
brokerFastFailure.cleanExpiredRequestInQueue(queue, 100);
61+
assertThat(queue.size()).isEqualTo(1);
62+
assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
63+
}
64+
65+
}

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ public class BrokerConfig {
103103
private boolean disableConsumeIfConsumerReadSlowly = false;
104104
private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
105105

106+
private boolean brokerFastFailureEnable = true;
106107
private long waitTimeMillsInSendQueue = 200;
108+
private long waitTimeMillsInPullQueue = 5 * 1000;
107109

108110
private long startAcceptSendRequestTimeStamp = 0L;
109111

@@ -160,6 +162,22 @@ public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshol
160162
this.consumerFallbehindThreshold = consumerFallbehindThreshold;
161163
}
162164

165+
public boolean isBrokerFastFailureEnable() {
166+
return brokerFastFailureEnable;
167+
}
168+
169+
public void setBrokerFastFailureEnable(final boolean brokerFastFailureEnable) {
170+
this.brokerFastFailureEnable = brokerFastFailureEnable;
171+
}
172+
173+
public long getWaitTimeMillsInPullQueue() {
174+
return waitTimeMillsInPullQueue;
175+
}
176+
177+
public void setWaitTimeMillsInPullQueue(final long waitTimeMillsInPullQueue) {
178+
this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue;
179+
}
180+
163181
public boolean isDisableConsumeIfConsumerReadSlowly() {
164182
return disableConsumeIfConsumerReadSlowly;
165183
}

0 commit comments

Comments
 (0)