Skip to content

Commit e3f4251

Browse files
committed
[ROCKETMQ-119] Add ThreadUtils and shutdown PullMessageService properly
1 parent 53b98d0 commit e3f4251

File tree

2 files changed

+181
-0
lines changed

2 files changed

+181
-0
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
2525
import org.apache.rocketmq.client.log.ClientLogger;
2626
import org.apache.rocketmq.common.ServiceThread;
27+
import org.apache.rocketmq.common.utils.ThreadUtils;
2728
import org.slf4j.Logger;
2829

2930
public class PullMessageService extends ServiceThread {
@@ -97,6 +98,12 @@ public void run() {
9798
log.info(this.getServiceName() + " service end");
9899
}
99100

101+
@Override
102+
public void shutdown(boolean interrupt) {
103+
super.shutdown(interrupt);
104+
ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
105+
}
106+
100107
@Override
101108
public String getServiceName() {
102109
return PullMessageService.class.getSimpleName();
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.common.utils;
19+
20+
import java.util.concurrent.BlockingQueue;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.ThreadFactory;
25+
import java.util.concurrent.ThreadPoolExecutor;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
import org.apache.rocketmq.common.constant.LoggerName;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
public final class ThreadUtils {
33+
private static final Logger log = LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
34+
35+
public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
36+
TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) {
37+
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
38+
}
39+
40+
public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
41+
return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
42+
}
43+
44+
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
45+
return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon));
46+
}
47+
48+
public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
49+
boolean isDaemon) {
50+
return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon));
51+
}
52+
53+
public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) {
54+
return newGenericThreadFactory("Remoting-" + processName, isDaemon);
55+
}
56+
57+
public static ThreadFactory newGenericThreadFactory(String processName) {
58+
return newGenericThreadFactory(processName, false);
59+
}
60+
61+
public static ThreadFactory newGenericThreadFactory(String processName, int threads) {
62+
return newGenericThreadFactory(processName, threads, false);
63+
}
64+
65+
public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
66+
return new ThreadFactory() {
67+
private AtomicInteger threadIndex = new AtomicInteger(0);
68+
69+
@Override
70+
public Thread newThread(Runnable r) {
71+
Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
72+
thread.setDaemon(isDaemon);
73+
return thread;
74+
}
75+
};
76+
}
77+
78+
public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
79+
final boolean isDaemon) {
80+
return new ThreadFactory() {
81+
private AtomicInteger threadIndex = new AtomicInteger(0);
82+
83+
@Override
84+
public Thread newThread(Runnable r) {
85+
Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet()));
86+
thread.setDaemon(isDaemon);
87+
return thread;
88+
}
89+
};
90+
}
91+
92+
/**
93+
* Create a new thread
94+
*
95+
* @param name The name of the thread
96+
* @param runnable The work for the thread to do
97+
* @param daemon Should the thread block JVM stop?
98+
* @return The unstarted thread
99+
*/
100+
public static Thread newThread(String name, Runnable runnable, boolean daemon) {
101+
Thread thread = new Thread(runnable, name);
102+
thread.setDaemon(daemon);
103+
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
104+
public void uncaughtException(Thread t, Throwable e) {
105+
log.error("Uncaught exception in thread '" + t.getName() + "':", e);
106+
}
107+
});
108+
return thread;
109+
}
110+
111+
/**
112+
* Shutdown passed thread using isAlive and join.
113+
*
114+
* @param t Thread to stop
115+
*/
116+
public static void shutdownGracefully(final Thread t) {
117+
shutdownGracefully(t, 0);
118+
}
119+
120+
/**
121+
* Shutdown passed thread using isAlive and join.
122+
*
123+
* @param millis Pass 0 if we're to wait forever.
124+
* @param t Thread to stop
125+
*/
126+
public static void shutdownGracefully(final Thread t, final long millis) {
127+
if (t == null)
128+
return;
129+
while (t.isAlive()) {
130+
try {
131+
t.interrupt();
132+
t.join(millis);
133+
} catch (InterruptedException e) {
134+
Thread.currentThread().interrupt();
135+
}
136+
}
137+
}
138+
139+
/**
140+
* An implementation of the graceful stop sequence recommended by
141+
* {@link ExecutorService}.
142+
*
143+
* @param executor executor
144+
* @param timeout timeout
145+
* @param timeUnit timeUnit
146+
*/
147+
public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
148+
// Disable new tasks from being submitted.
149+
executor.shutdown();
150+
try {
151+
// Wait a while for existing tasks to terminate.
152+
if (!executor.awaitTermination(timeout, timeUnit)) {
153+
executor.shutdownNow();
154+
// Wait a while for tasks to respond to being cancelled.
155+
if (!executor.awaitTermination(timeout, timeUnit)) {
156+
log.warn(String.format("%s didn't terminate!", executor));
157+
}
158+
}
159+
} catch (InterruptedException ie) {
160+
// (Re-)Cancel if current thread also interrupted.
161+
executor.shutdownNow();
162+
// Preserve interrupt status.
163+
Thread.currentThread().interrupt();
164+
}
165+
}
166+
167+
/**
168+
* A constructor to stop this class being constructed.
169+
*/
170+
private ThreadUtils() {
171+
// Unused
172+
173+
}
174+
}

0 commit comments

Comments
 (0)