Skip to content

Commit

Permalink
Merge pull request lets-blade#321 from ydq/dev
Browse files Browse the repository at this point in the history
websocket bug修复和调整
  • Loading branch information
hellokaton authored Dec 29, 2018
2 parents d712f94 + f90bee1 commit 51434e2
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.blade.mvc.annotation.OnMessage;
import com.blade.mvc.annotation.OnOpen;
import com.blade.mvc.websocket.WebSocketContext;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.extern.slf4j.Slf4j;

import java.lang.annotation.Annotation;
Expand All @@ -27,7 +28,7 @@ public final class WebSocketHandlerWrapper implements WebSocketHandler {

private final Map<String,Class<?>> handlers = new HashMap<>(4);
private final Map<String, Map<Class<? extends Annotation>, Method>> methodCache = new HashMap<>(4);
private final ThreadLocal<String> path = ThreadLocal.withInitial(() -> null);
private final FastThreadLocal<String> path = new FastThreadLocal<>();
private final Blade blade;

public static WebSocketHandlerWrapper init(Blade blade) {
Expand Down
40 changes: 30 additions & 10 deletions src/main/java/com/blade/mvc/websocket/WebSocketContext.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,47 @@
package com.blade.mvc.websocket;

import io.netty.channel.ChannelHandlerContext;
import com.blade.mvc.handler.WebSocketHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.Data;
import lombok.Getter;
import lombok.experimental.Accessors;

/**
* @author biezhi
* @author biezhi,darren
* @date 2017/10/30
*/
@Data
@Accessors(fluent = true)
public class WebSocketContext {

private ChannelHandlerContext ctx;
@Getter
private WebSocketSession session;
private String reqText;
@Getter
private String message;
private WebSocketHandler handler;

public WebSocketContext(ChannelHandlerContext ctx) {
this.ctx = ctx;
this.session = new WebSocketSession(ctx.channel());
public WebSocketContext(WebSocketSession session,WebSocketHandler handler) {
this.session = session;
this.handler = handler;
}
public WebSocketContext(WebSocketSession session,WebSocketHandler handler,String message) {
this(session,handler);
this.message = message;
}

/**
* post a message
* @param value
*/
public void message(String value) {
ctx.writeAndFlush(new TextWebSocketFrame(value));
session.handlerContext().writeAndFlush(new TextWebSocketFrame(value));
}

/**
* Allows the user to disconnect the websocket
*/
public void disconnect(){
session.handlerContext().disconnect().addListener(ChannelFutureListener.CLOSE);
handler.onDisConnect(this);
}

}
16 changes: 9 additions & 7 deletions src/main/java/com/blade/mvc/websocket/WebSocketSession.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package com.blade.mvc.websocket;

import com.blade.kit.UUID;
import io.netty.channel.Channel;
import lombok.Data;
import io.netty.channel.ChannelHandlerContext;
import lombok.Getter;
import lombok.experimental.Accessors;

/**
* @author biezhi
* @author biezhi,darren
* @date 2017/10/30
*/
@Data
@Getter
@Accessors(fluent = true)
public class WebSocketSession {

private Channel channel;
private ChannelHandlerContext handlerContext;
private String uuid;

public WebSocketSession(Channel channel) {
this.channel = channel;
public WebSocketSession(ChannelHandlerContext handlerContext) {
this.handlerContext = handlerContext;
this.uuid = UUID.UU32();
}
}
18 changes: 12 additions & 6 deletions src/main/java/com/blade/server/netty/WebSocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import com.blade.Blade;
import com.blade.mvc.handler.WebSocketHandlerWrapper;
import com.blade.mvc.websocket.WebSocketContext;
import com.blade.mvc.websocket.WebSocketSession;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;

/**
* Http Server Handler
*
Expand All @@ -20,7 +23,7 @@
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {

private WebSocketServerHandshaker handshaker;
private WebSocketContext context;
private WebSocketSession session;
private com.blade.mvc.handler.WebSocketHandler handler;
private String uri;
private Blade blade;
Expand Down Expand Up @@ -57,10 +60,12 @@ private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
this.handshaker.handshake(ctx.channel(), req);
this.context = new WebSocketContext(ctx);
this.session = new WebSocketSession(ctx);
this.uri = req.uri();
initHandlerWrapper();
this.handler.onConnect(this.context);
//Allows the user to send messages in the event of onConnect
CompletableFuture.completedFuture(new WebSocketContext(this.session,this.handler))
.thenAcceptAsync(this.handler::onConnect,ctx.executor());
}
} else {
ReferenceCountUtil.retain(req);
Expand All @@ -76,8 +81,9 @@ private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
this.handler.onDisConnect(this.context);
this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
CompletableFuture.completedFuture(new WebSocketContext(this.session,this.handler))
.thenAcceptAsync(this.handler::onDisConnect);
return;
}
if (frame instanceof PingWebSocketFrame) {
Expand All @@ -88,8 +94,8 @@ private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame fram
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("unsupported frame type: " + frame.getClass().getName());
}
this.context.setReqText(((TextWebSocketFrame) frame).text());
this.handler.onText(this.context);
CompletableFuture.completedFuture(new WebSocketContext(this.session,this.handler,((TextWebSocketFrame) frame).text()))
.thenAcceptAsync(this.handler::onText,ctx.executor());
}


Expand Down
4 changes: 2 additions & 2 deletions src/test/java/netty_hello/BaseWebSocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ public abstract class BaseWebSocketHandler {

@OnOpen
public void OnOpen(WebSocketContext ctx) {
System.out.println("ws from annotation @OnOpen:" + ctx.getSession().getUuid());
System.out.println("ws from annotation @OnOpen:" + ctx.session().uuid());
}

@OnClose
public void OnClose(WebSocketContext ctx) {
System.out.println("ws from annotation @OnClose:" + ctx.getSession().getUuid() + " disconnect");
System.out.println("ws from annotation @OnClose:" + ctx.session().uuid() + " disconnect");
}
}
6 changes: 3 additions & 3 deletions src/test/java/netty_hello/CustomWebSocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ public class CustomWebSocketHandler implements WebSocketHandler {
@Override
public void onConnect(WebSocketContext ctx) {
cService.sayHello();
System.out.println("ws from implements interface:onConnect:"+ctx.getSession().getUuid());
System.out.println("ws from implements interface:onConnect:"+ctx.session().uuid());
}

@Override
public void onText(WebSocketContext ctx) {
System.out.println("ws from implements interface:onText:"+ctx.getSession().getUuid() + " said:" + ctx.getReqText());
System.out.println("ws from implements interface:onText:"+ctx.session().uuid() + " said:" + ctx.message());
}

@Override
public void onDisConnect(WebSocketContext ctx) {
System.out.println("ws from implements interface:onDisConnect:"+ctx.getSession().getUuid() + " disconnect");
System.out.println("ws from implements interface:onDisConnect:"+ctx.session().uuid() + " disconnect");
}
}
2 changes: 1 addition & 1 deletion src/test/java/netty_hello/CustomWebSocketHandlerAnno.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ public class CustomWebSocketHandlerAnno extends BaseWebSocketHandler {

@OnMessage
public void OnMessage(WebSocketContext ctx) {
System.out.println("ws from annotation @OnMessage:" + ctx.getSession().getUuid() + " said:" + ctx.getReqText());
System.out.println("ws from annotation @OnMessage:" + ctx.session().uuid() + " said:" + ctx.message());
}
}
15 changes: 11 additions & 4 deletions src/test/java/netty_hello/WebSocketDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.blade.mvc.handler.WebSocketHandler;
import com.blade.mvc.websocket.WebSocketContext;

import java.util.concurrent.TimeUnit;

/**
* @author biezhi
* @date 2017/10/30
Expand All @@ -17,18 +19,23 @@ public static void main(String[] args) {
.webSocket("/websocket", new WebSocketHandler() {
@Override
public void onConnect(WebSocketContext ctx) {
System.out.println("客户端连接上了ws1: " + ctx.getSession());
System.out.println(ctx.session().uuid()+":open");
ctx.message(ctx.session().uuid()+":open");
}

@Override
public void onText(WebSocketContext ctx) {
System.out.println("ws1收到:" + ctx.getReqText());
ctx.message("发送: Hello");
if("close".equals(ctx.message())){
ctx.disconnect();
} else {
System.out.println(ctx.session().uuid()+":"+ctx.message());
ctx.message(ctx.message());
}
}

@Override
public void onDisConnect(WebSocketContext ctx) {
System.out.println("ws1客户端关闭链接: " + ctx.getSession());
System.out.println(ctx.session().uuid()+":close:" + ctx.session());
}
}).start(WebSocketDemo.class);
}
Expand Down

0 comments on commit 51434e2

Please sign in to comment.