Skip to content

Commit

Permalink
add websocket server
Browse files Browse the repository at this point in the history
  • Loading branch information
leelance committed Jun 21, 2017
1 parent 19bec60 commit 9ad8ef9
Show file tree
Hide file tree
Showing 12 changed files with 625 additions and 0 deletions.
16 changes: 16 additions & 0 deletions spring-boot-websocket-netty-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,25 @@
<artifactId>netty-all</artifactId>
<version>4.1.12.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
</dependencies>

<build>
<finalName>spring-boot-websocket-netty-server</finalName>

<plugins>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.lance.net.server;

import java.net.InetSocketAddress;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.lance.net.server.common.ChatServer;

import io.netty.channel.ChannelFuture;

@SpringBootApplication
public class SimplePushApplication implements CommandLineRunner{
@Autowired
private ChatServer chatServer;

public static void main(String[] args) {
SpringApplication.run(SimplePushApplication.class, args);
}

@Bean
public ChatServer chatServer() {
return new ChatServer();
}

@Override
public void run(String... args) throws Exception {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
ChannelFuture future = chatServer.start(address);

Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
chatServer.destroy();
}
});

future.channel().closeFuture().syncUninterruptibly();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.lance.net.server.common;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;

import com.lance.net.server.module.UserInfo;

import io.netty.util.AttributeKey;

public class ChatConstants {
public static final AttributeKey<String> CHANNEL_TOKEN_KEY = AttributeKey.valueOf("netty.channel.token");
/**用来保存当前在线人员*/
public static Map<String, UserInfo> onlines = new ConcurrentHashMap<>();

public static void addOnlines(String sessionId, UserInfo val) {
onlines.putIfAbsent(sessionId, val);
}

public static void removeOnlines(String sessionId) {
if(StringUtils.isNotBlank(sessionId) && onlines.containsKey(sessionId)){
onlines.remove(sessionId);
}
}

private static char[]prefix = {'A','B','C','D','E','F','G','H','J','K','L','M','N','P','Q','R','S','T','U','V','W','X','Y'};
private static int[]imgPrefix = {1,2,3,4,5,6,7,8,9,10,11};

public static String headImg() {
int index = RandomUtils.nextInt(0, imgPrefix.length);
return "/resources/img/head/"+imgPrefix[index]+".jpg";
}

public static String code() {
int index = RandomUtils.nextInt(0, prefix.length);
char prf = prefix[index];
String len = (onlines.size()+1)+"";
if(len.length() < 4) {
len = StringUtils.leftPad(len, 4, '0');
}
return prf+len;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.lance.net.server.common;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

public class ChatHeartbeatHandler extends ChannelInboundHandlerAdapter{
private Logger logger = LogManager.getLogger();
private final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HB",CharsetUtil.UTF_8));

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
logger.info("====>Heartbeat: greater than {}", 180);
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}else {
super.userEventTriggered(ctx, evt);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.lance.net.server.common;

import java.net.InetSocketAddress;

import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;

@Component
public class ChatServer {
private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workGroup = new NioEventLoopGroup();
private Channel channel;

public ChannelFuture start(InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChatServerInitializer(channelGroup))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.bind(address).syncUninterruptibly();
channel = future.channel();
return future;
}

public void destroy() {
if(channel != null) {
channel.close();
}

channelGroup.close();
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

public static void main(String[] args) {
ChatServer server = new ChatServer();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
ChannelFuture future = server.start(address);

Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
server.destroy();
}
});

future.channel().closeFuture().syncUninterruptibly();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.lance.net.server.common;

import java.util.concurrent.TimeUnit;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

public class ChatServerInitializer extends ChannelInitializer<Channel>{
private final ChannelGroup group;

public ChatServerInitializer(ChannelGroup group) {
this.group = group;
}

@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//处理日志
pipeline.addLast(new LoggingHandler(LogLevel.INFO));

//处理心跳
pipeline.addLast(new IdleStateHandler(0, 0, 1800, TimeUnit.SECONDS));
pipeline.addLast(new ChatHeartbeatHandler());

pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler(group));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.lance.net.server.common;

import java.io.RandomAccessFile;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.lance.net.server.module.UserInfo;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private Logger loger = LogManager.getLogger();
private final String webUri;
private final String INDEX = "E:\\oworkspace\\test\\src\\main\\webapp\\index.html";

public HttpRequestHandler(String webUri) {
this.webUri = webUri;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
loger.info("===========> {}, {}", webUri, request.uri());

String uri = StringUtils.substringBefore(request.uri(), "?");
if(webUri.equalsIgnoreCase(uri)) {//获取webSocket参数
QueryStringDecoder query = new QueryStringDecoder(request.uri());
Map<String, List<String>> map = query.parameters();
List<String> tokens = map.get("token");

//根据参数保存当前登录对象, 并把该token加入到channel中
if(tokens != null && !tokens.isEmpty()) {
String token = tokens.get(0);
ChatConstants.addOnlines(token, new UserInfo(token));
ctx.channel().attr(ChatConstants.CHANNEL_TOKEN_KEY).getAndSet(token);
}

request.setUri(uri);
ctx.fireChannelRead(request.retain());
}else {
if(HttpUtil.is100ContinueExpected(request)) {
send100ContinueExpected(ctx);
}

RandomAccessFile file = new RandomAccessFile(INDEX, "r");
HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

boolean keepAlive = HttpUtil.isKeepAlive(request);
if(keepAlive) {
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response);

if(ctx.pipeline().get(SslHandler.class) == null) {
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
}else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}

ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if(!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}

file.close();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

private void send100ContinueExpected(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONFLICT);
ctx.writeAndFlush(response);
}
}
Loading

0 comments on commit 9ad8ef9

Please sign in to comment.