Skip to content

Commit

Permalink
add crc32 check
Browse files Browse the repository at this point in the history
  • Loading branch information
jinmiao committed Jun 21, 2019
1 parent c28b2f4 commit 8ed51bb
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 19 deletions.
13 changes: 12 additions & 1 deletion kcp-base/src/main/java/kcp/ChannelConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class ChannelConfig {
private int rcvwnd = Kcp.IKCP_WND_RCV;
private int mtu = Kcp.IKCP_MTU_DEF;
private int minRto = Kcp.IKCP_RTO_MIN;
//TODO 有bug待修复
private long timeout;
//TODO 有bug还未测试
private boolean stream;
Expand All @@ -22,15 +23,17 @@ public class ChannelConfig {
private int fecParityShardCount;

private boolean ackNoDelay = false;

private boolean fastFlush = true;
//crc32校验
private boolean crc32Check = false;


public boolean isNodelay() {
return nodelay;
}



public void setNodelay(boolean nodelay) {
this.nodelay = nodelay;
}
Expand Down Expand Up @@ -139,4 +142,12 @@ public boolean isFastFlush() {
public void setFastFlush(boolean fastFlush) {
this.fastFlush = fastFlush;
}

public boolean isCrc32Check() {
return crc32Check;
}

public void setCrc32Check(boolean crc32Check) {
this.crc32Check = crc32Check;
}
}
31 changes: 31 additions & 0 deletions kcp-base/src/main/java/kcp/Crc32OutPut.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package kcp;

import io.netty.buffer.ByteBuf;

import java.nio.ByteBuffer;
import java.util.zip.CRC32;

/**
* crc32校验
* Created by JinMiao
* 2019-06-21.
*/
public class Crc32OutPut implements KcpOutput {
private KcpOutput output;
private CRC32 crc32 = new CRC32();
private int headerOffset;

protected Crc32OutPut(KcpOutput output,int headerOffset) {
this.output = output;
this.headerOffset = headerOffset;
}
@Override
public void out(ByteBuf data, Kcp kcp) {
ByteBuffer byteBuffer = data.nioBuffer(headerOffset+Ukcp.HEADER_CRC,data.readableBytes()-headerOffset-Ukcp.HEADER_CRC);
crc32.reset();
crc32.update(byteBuffer);
long checksum = crc32.getValue();
data.setInt(headerOffset, (int) checksum);
output.out(data,kcp);
}
}
1 change: 1 addition & 0 deletions kcp-base/src/main/java/kcp/FecOutPut.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.netty.buffer.ByteBuf;

/**
* fec
* Created by JinMiao
* 2018/7/27.
*/
Expand Down
4 changes: 4 additions & 0 deletions kcp-base/src/main/java/kcp/Kcp.java
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,10 @@ public void setReserved(int reserved) {
this.reserved = reserved;
}

public int getReserved() {
return reserved;
}

public int getSndWnd() {
return sndWnd;
}
Expand Down
13 changes: 8 additions & 5 deletions kcp-base/src/main/java/kcp/ServerChannelHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
//System.out.println("新连接"+Thread.currentThread().getName());
IMessageExecutor disruptorSingleExecutor = disruptorExecutorPool.getAutoDisruptorProcessor();
KcpOutput kcpOutput = new KcpOutPutImp();
Ukcp newUkcp = new Ukcp(10,kcpOutput,kcpListener,disruptorSingleExecutor,ctx.executor());

ReedSolomon reedSolomon = null;
if(channelConfig.getFecDataShardCount()!=0&&channelConfig.getFecParityShardCount()!=0){
reedSolomon = ReedSolomon.create(channelConfig.getFecDataShardCount(),channelConfig.getFecParityShardCount());
}

Ukcp newUkcp = new Ukcp(10,kcpOutput,kcpListener,disruptorSingleExecutor,ctx.executor(),channelConfig.isCrc32Check(),reedSolomon);

newUkcp.setNodelay(channelConfig.isNodelay());
newUkcp.setInterval(channelConfig.getInterval());
Expand All @@ -71,10 +77,7 @@ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
newUkcp.setAckNoDelay(channelConfig.isAckNoDelay());
newUkcp.setFastFlush(channelConfig.isFastFlush());
newUkcp.user(user);
if(channelConfig.getFecDataShardCount()!=0&&channelConfig.getFecParityShardCount()!=0){
ReedSolomon reedSolomon = ReedSolomon.create(channelConfig.getFecDataShardCount(),channelConfig.getFecParityShardCount());
newUkcp.initFec(reedSolomon);
}

disruptorSingleExecutor.execute(() -> newUkcp.getKcpListener().onConnected(newUkcp));
clientMap.put(socketAddress,newUkcp);
newUkcp.read(msg.content());
Expand Down
42 changes: 37 additions & 5 deletions kcp-base/src/main/java/kcp/Ukcp.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import threadPool.thread.IMessageExecutor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
import java.util.zip.CRC32;

/**
* Wrapper for kcp
Expand All @@ -24,6 +26,8 @@ public class Ukcp{

private static final InternalLogger log = InternalLoggerFactory.getInstance(Ukcp.class);

public static final int HEADER_CRC=4,HEADER_NONCESIZE= 16;

private Kcp kcp;

private boolean fastFlush = true;
Expand All @@ -45,6 +49,8 @@ public class Ukcp{

private EventExecutor eventExecutors;

private boolean crc32Check = false;

/**
* 上次收到消息时间
**/
Expand All @@ -55,14 +61,16 @@ public class Ukcp{
**/
private long closeTime = 0;

private CRC32 crc32 = new CRC32();


/**
* Creates a new instance.
*
* @param conv conv of kcp
* @param output output for kcp
*/
public Ukcp(int conv, KcpOutput output, KcpListener kcpListener, IMessageExecutor disruptorSingleExecutor, EventExecutor eventExecutors) {
public Ukcp(int conv, KcpOutput output, KcpListener kcpListener, IMessageExecutor disruptorSingleExecutor, EventExecutor eventExecutors,boolean crc32Check,ReedSolomon reedSolomon) {
Kcp kcp = new Kcp(conv, output);
this.kcp = kcp;
this.active = true;
Expand All @@ -72,21 +80,34 @@ public Ukcp(int conv, KcpOutput output, KcpListener kcpListener, IMessageExecuto
sendList = new MpscArrayQueue<>(2 << 16);
recieveList = new SpscLinkedQueue<>();
this.eventExecutors = eventExecutors;
}

int headerSize = 0;
//init encryption


//init crc32
if(crc32Check){
this.crc32Check = true;
KcpOutput kcpOutput = kcp.getOutput();
kcpOutput = new Crc32OutPut(kcpOutput,headerSize);
kcp.setOutput(kcpOutput);
headerSize+=HEADER_CRC;
}

public void initFec(ReedSolomon reedSolomon) {
//init fec
if (reedSolomon != null) {
KcpOutput kcpOutput = kcp.getOutput();
fecEncode = new FecEncode(0, reedSolomon);
fecEncode = new FecEncode(headerSize, reedSolomon);
fecDecode = new FecDecode(3 * reedSolomon.getTotalShardCount(), reedSolomon);
kcpOutput = new FecOutPut(kcpOutput, fecEncode);
kcp.setOutput(kcpOutput);
kcp.setReserved(Fec.fecHeaderSizePlus2);
headerSize+= Fec.fecHeaderSizePlus2;
}

kcp.setReserved(headerSize);
}


/**
* Receives ByteBufs.
*
Expand All @@ -101,6 +122,16 @@ public void input(ByteBuf data,long current) throws IOException {
Snmp.snmp.InPkts.incrementAndGet();
Snmp.snmp.InBytes.addAndGet(data.readableBytes());

if(crc32Check){
long checksum = data.readUnsignedInt();
ByteBuffer byteBuffer = data.nioBuffer(data.readerIndex(),data.readableBytes());
crc32.reset();
crc32.update(byteBuffer);
if(checksum!=crc32.getValue()){
Snmp.snmp.getInCsumErrors().incrementAndGet();
return;
}
}
if (fecDecode != null) {
FecPacket fecPacket = FecPacket.newFecPacket(data);
if (fecPacket.getFlag() == Fec.typeData) {
Expand Down Expand Up @@ -535,6 +566,7 @@ void release() {
}
}


public long getCloseTime() {
return closeTime;
}
Expand Down
61 changes: 59 additions & 2 deletions kcp-base/src/test/java/TestCrc32.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,75 @@
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.nio.ByteBuffer;
import java.util.zip.CRC32;

/**
* Created by JinMiao
* 2019-04-16.
*/
public class TestCrc32 {
@Test

public static void main(String[] args) throws InterruptedException {

CRC32 crc32 = new CRC32();
byte[] bytes = new byte[1024];
crc32.update(bytes);
System.out.println(crc32.getValue());
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(1024);
long now =System.currentTimeMillis();
int i = 0;
byteBuf.writeBytes(new byte[1024]);

ByteBuffer byteBuf1 = byteBuf.nioBuffer();
while (true){
i++;
if(i%10000000==0){
System.out.println(System.currentTimeMillis()-now);
now =System.currentTimeMillis();
}
//crc32.reset();
//crc32.update(bytes);
crc32.reset();
crc32.update(byteBuf1);
}
//System.out.println(crc32.getValue());

//new Thread(() -> {
// ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(20480);
// //byteBuf.writeInt(5);
// byteBuf.writeBytes(new byte[20480]);
// while (true){
// ByteBuffer byteBuf1 = byteBuf.nioBuffer();
// //try {
// // Thread.sleep(100);
// //} catch (InterruptedException e) {
// // e.printStackTrace();
// //}
// }
//}).start();


//Thread.sleep(50000000);
}
//@Test
public void crc32(){
CRC32 crc32 = new CRC32();
byte[] bytes = new byte[1024];
crc32.update(bytes);
System.out.println(crc32.getValue());

//byteBuf1.putInt(0,3);

//((DirectBuffer) byteBuf1).cleaner().clean();
//System.out.println(byteBuf1.getInt(0));
//System.out.println(byteBuf.getInt(0));

//System.out.println();




//for (int i = 0; i < 20000000; i++) {
// crc32.update("abcdfg".getBytes());
// crc32.getValue();
Expand Down
13 changes: 8 additions & 5 deletions kcp-netty/src/main/java/KcpClientRttExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,22 @@ public void init(){
disruptorSingleExecutor.start();
nioEventLoopGroup = new NioEventLoopGroup(1);
EventLoop eventExecutors = nioEventLoopGroup.next();
ukcp = new Ukcp(10,this,this,disruptorSingleExecutor,eventExecutors);


ReedSolomon reedSolomon = ReedSolomon.create(10,3);


ukcp = new Ukcp(10,this,this,disruptorSingleExecutor,eventExecutors,true,reedSolomon);
ukcp.setMtu(1400);
ukcp.setNocwnd(true);
ukcp.setNodelay(true);
ukcp.setRcvWnd(512);
ukcp.setSndWnd(512);
ukcp.setInterval(40);
ukcp.setFastResend(2);
//ukcp.setCloseTime(5000);
//ukcp.setAckNoDelay(true);

ReedSolomon reedSolomon = ReedSolomon.create(10,3);
ukcp.initFec(reedSolomon);

Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);
bootstrap.group(nioEventLoopGroup);
Expand Down Expand Up @@ -101,7 +104,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

//remote = new InetSocketAddress("10.60.100.191",10002);

remote = new InetSocketAddress("127.0.0.1",10002);
remote = new InetSocketAddress("127.0.0.1",10003);

future = scheduleSrv.scheduleWithFixedDelay(() -> {
ukcp.write(rttMsg(++count));
Expand Down
4 changes: 3 additions & 1 deletion kcp-netty/src/main/java/KcpServerRttExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public void init(){
channelConfig.setAckNoDelay(false);
channelConfig.setInterval(40);
channelConfig.setNocwnd(true);
KcpServer abstractKcpServer = new KcpServer(2,this,channelConfig,10002);
channelConfig.setCrc32Check(true);
//channelConfig.setTimeout(10000);
KcpServer abstractKcpServer = new KcpServer(2,this,channelConfig,10003);
}

@Override
Expand Down

0 comments on commit 8ed51bb

Please sign in to comment.