Skip to content

Commit

Permalink
Merge pull request lets-blade#309 from ydq/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
hellokaton authored Dec 10, 2018
2 parents f3d0c02 + a40e146 commit a2e2116
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 50 deletions.
14 changes: 2 additions & 12 deletions src/main/java/com/blade/server/netty/HttpServerInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerExpectContinueHandler;
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.ssl.SslContext;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -35,19 +32,13 @@ public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

public static volatile String date = DateKit.gmtDate(LocalDateTime.now());

private static WebSocketHandler WEB_SOCKET_HANDLER;

public HttpServerInitializer(SslContext sslCtx, Blade blade, ScheduledExecutorService service) {
this.sslCtx = sslCtx;
this.blade = blade;
this.useGZIP = blade.environment().getBoolean(Const.ENV_KEY_GZIP_ENABLE, false);
this.isWebSocket = StringKit.isNotEmpty(blade.webSocketPath());

if (isWebSocket) {
WEB_SOCKET_HANDLER = new WebSocketHandler(blade);
}

httpServerHandler = new HttpServerHandler();
this.httpServerHandler = new HttpServerHandler();

service.scheduleWithFixedDelay(() -> date = DateKit.gmtDate(LocalDateTime.now()), 1000, 1000, TimeUnit.MILLISECONDS);
}
Expand All @@ -68,8 +59,7 @@ protected void initChannel(SocketChannel ch) {
}

if (isWebSocket) {
pipeline.addLast(new WebSocketServerProtocolHandler(blade.webSocketPath(), null, true));
pipeline.addLast(WEB_SOCKET_HANDLER);
pipeline.addLast(new WebSocketHandler(blade));
}
pipeline.addLast(new MergeRequestHandler());
pipeline.addLast(httpServerHandler);
Expand Down
99 changes: 64 additions & 35 deletions src/main/java/com/blade/server/netty/WebSocketHandler.java
Original file line number Diff line number Diff line change
@@ -1,66 +1,95 @@
package com.blade.server.netty;

import com.blade.Blade;
import com.blade.mvc.handler.ExceptionHandler;
import com.blade.mvc.websocket.WebSocketContext;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;

/**
* Http Server Handler
*
* @author biezhi
* 2017/5/31
* @author biezhi,darren
* 2017/5/31,
*/
@Slf4j
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {

private WebSocketServerHandshaker handshaker;
private Blade blade;

private com.blade.mvc.handler.WebSocketHandler webSocketHandler;
private ExceptionHandler exceptionHandler;

WebSocketHandler(Blade blade) {
this.webSocketHandler = blade.webSocketHandler();
public WebSocketHandler(Blade blade) {
this.blade = blade;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
WebSocketContext webSocketContext = new WebSocketContext(ctx);
webSocketHandler.onConnect(webSocketContext);
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
} else {
ctx.fireChannelRead(msg);
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
WebSocketContext webSocketContext = new WebSocketContext(ctx);
webSocketHandler.onDisConnect(webSocketContext);
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
WebSocketContext webSocketContext = new WebSocketContext(ctx);
if (frame instanceof TextWebSocketFrame) {
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
webSocketContext.setReqText(request);
webSocketHandler.onText(webSocketContext);
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
if (isWebSocketPath(req)) {
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(req.uri(), null, true);
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {
//Return that we need cannot not support the web socket version
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
this.handshaker.handshake(ctx.channel(), req);
this.blade.webSocketHandler().onConnect(new WebSocketContext(ctx));
}
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
ctx.fireChannelRead(req);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (null != exceptionHandler) {
exceptionHandler.handle((Exception) cause);
} else {
log.error("Blade Invoke Error", cause);
/**
* Only supported TextWebSocketFrame
*
* @param ctx
* @param frame
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
this.blade.webSocketHandler().onDisConnect(new WebSocketContext(ctx));
this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
ctx.close();
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}

if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("unsupported frame type: " + frame.getClass().getName());
}
WebSocketContext wsCtx = new WebSocketContext(ctx);
wsCtx.setReqText(((TextWebSocketFrame) frame).text());
this.blade.webSocketHandler().onText(wsCtx);
}


private boolean isWebSocketPath(HttpRequest req){
return req != null
&& Objects.equals(req.uri(),blade.webSocketPath())
&& req.decoderResult().isSuccess()
&& "websocket".equals(req.headers().get("Upgrade"));
}

}
6 changes: 3 additions & 3 deletions src/test/java/netty_hello/WebSocketDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
public class WebSocketDemo {

public static void main(String[] args) {
Blade.me()
.get("/hello", ctx -> {})
.webSocket("/webscoket", new WebSocketHandler() {
Blade.of()
.get("/hello", ctx -> ctx.text("get route"))
.webSocket("/websocket", new WebSocketHandler() {
@Override
public void onConnect(WebSocketContext ctx) {
System.out.println("客户端连接上了: " + ctx.getSession());
Expand Down

0 comments on commit a2e2116

Please sign in to comment.