Skip to content

Commit

Permalink
Use ExecutorService instead of EventLoopGroup as a working threads pool
Browse files Browse the repository at this point in the history
  • Loading branch information
tuohai666 committed Jul 20, 2018
1 parent fde8819 commit 5c84c44
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.sql.Statement;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;

/**
* SQL Execute engine for JDBC.
Expand All @@ -54,7 +55,7 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine {

private final BackendConnection backendConnection = new BackendConnection();

private final EventLoopGroup userGroup = ExecutorContext.getInstance().getUserGroup();
private final ExecutorService executorService = ExecutorContext.getInstance().getExecutorService();

private int columnCount;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private List<Future<Collection<JDBCExecuteResponse>>> asyncExecute(final boolean
for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
final Connection connection = getBackendConnection().getConnection(entry.getKey());
final Collection<SQLUnit> sqlUnits = entry.getValue();
result.add(getUserGroup().submit(new Callable<Collection<JDBCExecuteResponse>>() {
result.add(getExecutorService().submit(new Callable<Collection<JDBCExecuteResponse>>() {

@Override
public Collection<JDBCExecuteResponse> call() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private List<Future<JDBCExecuteResponse>> asyncExecute(final boolean isReturnGen
for (SQLExecutionUnit each : sqlExecutionUnits) {
final String dataSourceName = each.getDataSource();
final String actualSQL = each.getSqlUnit().getSql();
result.add(getUserGroup().submit(new Callable<JDBCExecuteResponse>() {
result.add(getExecutorService().submit(new Callable<JDBCExecuteResponse>() {

@Override
public JDBCExecuteResponse call() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public final class ShardingProxy {
private EventLoopGroup workerGroup;

public ShardingProxy() {
ruleRegistry.initShardingMetaData(executorContext.getUserGroup());
ruleRegistry.initShardingMetaData(executorContext.getExecutorService());
}

/**
Expand All @@ -71,7 +71,8 @@ public void start(final int port) throws InterruptedException, MalformedURLExcep
ShardingProxyClient.getInstance().start();
}
ServerBootstrap bootstrap = new ServerBootstrap();
if (executorContext.canUseEpoll()) {
bossGroup = createEventLoopGroup();
if (bossGroup instanceof EpollEventLoopGroup) {
groupsEpoll(bootstrap);
} else {
groupsNio(bootstrap);
Expand All @@ -81,13 +82,21 @@ public void start(final int port) throws InterruptedException, MalformedURLExcep
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
executorContext.getUserGroup().shutdownGracefully();
executorContext.getExecutorService().shutdown();
if (ruleRegistry.isProxyBackendUseNio()) {
ShardingProxyClient.getInstance().stop();
}
}
}

private EventLoopGroup createEventLoopGroup() {
try {
return new EpollEventLoopGroup(1);
} catch (final UnsatisfiedLinkError ex) {
return new NioEventLoopGroup(1);
}
}

private void groupsEpoll(final ServerBootstrap bootstrap) {
bossGroup = new EpollEventLoopGroup(1);
workerGroup = new EpollEventLoopGroup(ruleRegistry.getMaxWorkingThreads());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public final class ExecutorGroup {
* @return executor service
*/
public ExecutorService getExecutorService() {
return TransactionType.XA.equals(RuleRegistry.getInstance().getTransactionType()) ? ChannelThreadExecutorGroup.getInstance().get(channelId) : ExecutorContext.getInstance().getUserGroup();
return TransactionType.XA.equals(RuleRegistry.getInstance().getTransactionType()) ? ChannelThreadExecutorGroup.getInstance().get(channelId) : ExecutorContext.getInstance().getExecutorService();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package io.shardingsphere.proxy.util;

import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.shardingsphere.proxy.config.RuleRegistry;
import lombok.Getter;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Executor context.
*
Expand All @@ -33,19 +37,7 @@ public final class ExecutorContext {
private static final ExecutorContext INSTANCE = new ExecutorContext();

@Getter
private final EventLoopGroup userGroup;

private ExecutorContext() {
userGroup = createEventLoopGroup(RuleRegistry.getInstance().getMaxWorkingThreads());
}

private EventLoopGroup createEventLoopGroup(final int maxWorkingThreads) {
try {
return new EpollEventLoopGroup(maxWorkingThreads);
} catch (final UnsatisfiedLinkError ignore) {
return new NioEventLoopGroup(maxWorkingThreads);
}
}
private final ExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(RuleRegistry.getInstance().getMaxWorkingThreads()));;

/**
* Get executor context instance.
Expand All @@ -55,13 +47,4 @@ private EventLoopGroup createEventLoopGroup(final int maxWorkingThreads) {
public static ExecutorContext getInstance() {
return INSTANCE;
}

/**
* Judge can use epoll as IO solution or not.
*
* @return can use epoll as IO solution or not
*/
public boolean canUseEpoll() {
return userGroup instanceof EpollEventLoopGroup;
}
}

0 comments on commit 5c84c44

Please sign in to comment.