Skip to content

Commit

Permalink
Support kqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
jinmiao committed Dec 10, 2020
1 parent 8a43760 commit 72c841f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 14 deletions.
19 changes: 16 additions & 3 deletions kcp-base/src/main/java/kcp/KcpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import threadPool.IMessageExecutorPool;
Expand Down Expand Up @@ -50,18 +53,28 @@ public void init(KcpListener kcpListener, ChannelConfig channelConfig, int... po


boolean epoll = Epoll.isAvailable();
boolean kqueue = KQueue.isAvailable();
this.iMessageExecutorPool = channelConfig.getiMessageExecutorPool();
bootstrap = new Bootstrap();
int cpuNum = Runtime.getRuntime().availableProcessors();
int bindTimes = 1;
if (epoll) {
if (epoll||kqueue) {
//ADD SO_REUSEPORT ? https://www.jianshu.com/p/61df929aa98b
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
bindTimes = cpuNum;
}
Class<? extends Channel> channelClass = null;
if(epoll){
group = new EpollEventLoopGroup(cpuNum);
channelClass = EpollDatagramChannel.class;
}else if(kqueue){
group = new KQueueEventLoopGroup(cpuNum);
channelClass = KQueueDatagramChannel.class;
}else{
group = new NioEventLoopGroup(ports.length);
channelClass = NioDatagramChannel.class;
}

group = epoll ? new EpollEventLoopGroup(cpuNum) : new NioEventLoopGroup(ports.length);
Class<? extends Channel> channelClass = epoll ? EpollDatagramChannel.class : NioDatagramChannel.class;
bootstrap.channel(channelClass);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<Channel>() {
Expand Down
48 changes: 37 additions & 11 deletions kcp-base/src/test/java/main/MpscaBenchmark.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main;

import org.jctools.queues.MpscLinkedQueue;
import io.netty.util.internal.shaded.org.jctools.queues.atomic.MpscAtomicArrayQueue;
import org.jctools.queues.*;
import org.jctools.queues.atomic.MpscChunkedAtomicArrayQueue;
import org.jctools.queues.atomic.MpscLinkedAtomicQueue;
import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Control;

Expand All @@ -10,13 +13,25 @@

/**
*
* Benchmark (implementation) Mode Cnt Score Error Units
* MpscNoCapacityBenchmark.MyGroup PARAM_Linked thrpt 3 53389377.536 ± 22345602.442 ops/s
* MpscNoCapacityBenchmark.MyGroup:read PARAM_Linked thrpt 3 26694715.226 ± 11172841.874 ops/s
* MpscNoCapacityBenchmark.MyGroup:write PARAM_Linked thrpt 3 26694662.311 ± 11172761.077 ops/s
* MpscNoCapacityBenchmark.MyGroup PARAM_LINKED_ATOMICQUEUE thrpt 3 37142210.446 ± 49326137.693 ops/s
* MpscNoCapacityBenchmark.MyGroup:read PARAM_LINKED_ATOMICQUEUE thrpt 3 18571104.104 ± 24661172.819 ops/s
* MpscNoCapacityBenchmark.MyGroup:write PARAM_LINKED_ATOMICQUEUE thrpt 3 18571106.342 ± 24664964.875 ops/s
Benchmark (implementation) Mode Cnt Score Error Units
MpscaBenchmark.MyGroup PARAM_Linked thrpt 3 51081130.039 ± 22670119.965 ops/s
MpscaBenchmark.MyGroup:read PARAM_Linked thrpt 3 25540597.167 ± 11334748.828 ops/s
MpscaBenchmark.MyGroup:write PARAM_Linked thrpt 3 25540532.871 ± 11335371.539 ops/s
MpscaBenchmark.MyGroup PARAM_LINKED_ATOMICQUEUE thrpt 3 50800677.821 ± 22446778.474 ops/s
MpscaBenchmark.MyGroup:read PARAM_LINKED_ATOMICQUEUE thrpt 3 25400339.501 ± 11223775.305 ops/s
MpscaBenchmark.MyGroup:write PARAM_LINKED_ATOMICQUEUE thrpt 3 25400338.320 ± 11223003.327 ops/s
MpscaBenchmark.MyGroup PARAM_MpscUnboundedArrayQueue thrpt 3 8523884.340 ± 2478333.168 ops/s
MpscaBenchmark.MyGroup:read PARAM_MpscUnboundedArrayQueue thrpt 3 4261945.984 ± 1239077.075 ops/s
MpscaBenchmark.MyGroup:write PARAM_MpscUnboundedArrayQueue thrpt 3 4261938.356 ± 1239256.093 ops/s
MpscaBenchmark.MyGroup PARAM_MpscUnboundedAtomicArrayQueue thrpt 3 7750201.020 ± 734250.824 ops/s
MpscaBenchmark.MyGroup:read PARAM_MpscUnboundedAtomicArrayQueue thrpt 3 3875105.906 ± 367144.551 ops/s
MpscaBenchmark.MyGroup:write PARAM_MpscUnboundedAtomicArrayQueue thrpt 3 3875095.114 ± 367106.310 ops/s
MpscaBenchmark.MyGroup PARAM_MpscArrayQueue thrpt 3 13748739.474 ± 2507214.536 ops/s
MpscaBenchmark.MyGroup:read PARAM_MpscArrayQueue thrpt 3 6874362.473 ± 1253480.134 ops/s
MpscaBenchmark.MyGroup:write PARAM_MpscArrayQueue thrpt 3 6874377.001 ± 1253734.422 ops/s
MpscaBenchmark.MyGroup PARAM_MpscAtomicArrayQueue thrpt 3 13016730.228 ± 259225.449 ops/s
MpscaBenchmark.MyGroup:read PARAM_MpscAtomicArrayQueue thrpt 3 6508362.564 ± 129594.326 ops/s
MpscaBenchmark.MyGroup:write PARAM_MpscAtomicArrayQueue thrpt 3 6508367.664 ± 129631.132 ops/s
* Created by JinMiao
* 2020/6/24.
*/
Expand All @@ -28,15 +43,14 @@
@State(Scope.Group)
public class MpscaBenchmark {

public static final String PARAM_Linked = "PARAM_Linked";
public static final String PARAM_Linked = "PARAM_Linked",PARAM_LINKED_ATOMICQUEUE = "PARAM_LINKED_ATOMICQUEUE",PARAM_MpscUnboundedArrayQueue="PARAM_MpscUnboundedArrayQueue",PARAM_MpscUnboundedAtomicArrayQueue="PARAM_MpscUnboundedAtomicArrayQueue",PARAM_MpscArrayQueue="PARAM_MpscArrayQueue",PARAM_MpscAtomicArrayQueue="PARAM_MpscAtomicArrayQueue";

public static final String PARAM_LINKED_ATOMICQUEUE = "PARAM_LINKED_ATOMICQUEUE";

public static final int PRODUCER_THREADS_NUMBER = 4;

public static final String GROUP_NAME = "MyGroup";

@Param({PARAM_Linked,PARAM_LINKED_ATOMICQUEUE})
@Param({PARAM_Linked,PARAM_LINKED_ATOMICQUEUE,PARAM_MpscUnboundedArrayQueue,PARAM_MpscUnboundedAtomicArrayQueue,PARAM_MpscArrayQueue,PARAM_MpscAtomicArrayQueue})
public volatile String implementation;

public volatile Queue<Long> queue;
Expand All @@ -50,6 +64,18 @@ public void setUp() {
case PARAM_LINKED_ATOMICQUEUE:
queue = new MpscLinkedAtomicQueue<>();
break;
case PARAM_MpscUnboundedArrayQueue:
queue = new MpscUnboundedArrayQueue<>(1024);
break;
case PARAM_MpscUnboundedAtomicArrayQueue:
queue = new MpscUnboundedAtomicArrayQueue<>(1024);
break;
case PARAM_MpscAtomicArrayQueue:
queue = new MpscAtomicArrayQueue<>(1024);
break;
case PARAM_MpscArrayQueue:
queue = new MpscArrayQueue<>(1024);
break;
default:
throw new UnsupportedOperationException("Unsupported implementation " + implementation);
}
Expand Down

0 comments on commit 72c841f

Please sign in to comment.