Skip to content

Commit c093c37

Browse files
committed
update gateway
1 parent 45a5eea commit c093c37

File tree

6 files changed

+272
-0
lines changed

6 files changed

+272
-0
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.github.kimmking.gateway.filter;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.handler.codec.http.FullHttpRequest;
5+
6+
public interface HttpRequestFilter {
7+
8+
void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx);
9+
10+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package io.github.kimmking.gateway.outbound.httpclient4;
2+
3+
4+
import io.netty.buffer.Unpooled;
5+
import io.netty.channel.ChannelFutureListener;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
8+
import io.netty.handler.codec.http.FullHttpRequest;
9+
import io.netty.handler.codec.http.FullHttpResponse;
10+
import io.netty.handler.codec.http.HttpUtil;
11+
import org.apache.http.HttpResponse;
12+
import org.apache.http.client.methods.HttpGet;
13+
import org.apache.http.concurrent.FutureCallback;
14+
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
15+
import org.apache.http.impl.nio.client.HttpAsyncClients;
16+
import org.apache.http.impl.nio.reactor.IOReactorConfig;
17+
import org.apache.http.protocol.HTTP;
18+
import org.apache.http.util.EntityUtils;
19+
20+
import java.util.concurrent.*;
21+
22+
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
23+
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
24+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
25+
26+
public class HttpOutboundHandler {
27+
28+
private CloseableHttpAsyncClient httpclient;
29+
private ExecutorService proxyService;
30+
private String backendUrl;
31+
32+
public HttpOutboundHandler(String backendUrl){
33+
this.backendUrl = backendUrl.endsWith("/")?backendUrl.substring(0,backendUrl.length()-1):backendUrl;
34+
int cores = Runtime.getRuntime().availableProcessors() * 2;
35+
long keepAliveTime = 1000;
36+
int queueSize = 2048;
37+
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();//.DiscardPolicy();
38+
proxyService = new ThreadPoolExecutor(cores, cores,
39+
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
40+
new NamedThreadFactory("proxyService"), handler);
41+
42+
IOReactorConfig ioConfig = IOReactorConfig.custom()
43+
.setConnectTimeout(1000)
44+
.setSoTimeout(1000)
45+
.setIoThreadCount(cores)
46+
.setRcvBufSize(32 * 1024)
47+
.build();
48+
49+
httpclient = HttpAsyncClients.custom().setMaxConnTotal(40)
50+
.setMaxConnPerRoute(8)
51+
.setDefaultIOReactorConfig(ioConfig)
52+
.setKeepAliveStrategy((response,context) -> 6000)
53+
.build();
54+
httpclient.start();
55+
}
56+
57+
public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) {
58+
final String url = this.backendUrl + fullRequest.uri();
59+
proxyService.submit(()->fetchGet(fullRequest, ctx, url));
60+
}
61+
62+
private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext ctx, final String url) {
63+
final HttpGet httpGet = new HttpGet(url);
64+
//httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
65+
httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
66+
httpclient.execute(httpGet, new FutureCallback<HttpResponse>() {
67+
@Override
68+
public void completed(final HttpResponse endpointResponse) {
69+
try {
70+
handleResponse(inbound, ctx, endpointResponse);
71+
} catch (Exception e) {
72+
e.printStackTrace();
73+
} finally {
74+
75+
}
76+
}
77+
78+
@Override
79+
public void failed(final Exception ex) {
80+
httpGet.abort();
81+
ex.printStackTrace();
82+
}
83+
84+
@Override
85+
public void cancelled() {
86+
httpGet.abort();
87+
}
88+
});
89+
}
90+
91+
private void handleResponse(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final HttpResponse endpointResponse) throws Exception {
92+
FullHttpResponse response = null;
93+
try {
94+
// String value = "hello,kimmking";
95+
// response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
96+
// response.headers().set("Content-Type", "application/json");
97+
// response.headers().setInt("Content-Length", response.content().readableBytes());
98+
99+
100+
byte[] body = EntityUtils.toByteArray(endpointResponse.getEntity());
101+
// System.out.println(new String(body));
102+
// System.out.println(body.length);
103+
104+
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(body));
105+
response.headers().set("Content-Type", "application/json");
106+
response.headers().setInt("Content-Length", Integer.parseInt(endpointResponse.getFirstHeader("Content-Length").getValue()));
107+
108+
// for (Header e : endpointResponse.getAllHeaders()) {
109+
// //response.headers().set(e.getName(),e.getValue());
110+
// System.out.println(e.getName() + " => " + e.getValue());
111+
// }
112+
113+
} catch (Exception e) {
114+
e.printStackTrace();
115+
response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
116+
exceptionCaught(ctx, e);
117+
} finally {
118+
if (fullRequest != null) {
119+
if (!HttpUtil.isKeepAlive(fullRequest)) {
120+
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
121+
} else {
122+
//response.headers().set(CONNECTION, KEEP_ALIVE);
123+
ctx.write(response);
124+
}
125+
}
126+
ctx.flush();
127+
//ctx.close();
128+
}
129+
130+
}
131+
132+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
133+
cause.printStackTrace();
134+
ctx.close();
135+
}
136+
137+
138+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.github.kimmking.gateway.outbound.httpclient4;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class NamedThreadFactory implements ThreadFactory {
7+
8+
private final ThreadGroup group;
9+
private final AtomicInteger threadNumber = new AtomicInteger(1);
10+
11+
private final String namePrefix;
12+
private final boolean daemon;
13+
14+
public NamedThreadFactory(String namePrefix, boolean daemon) {
15+
this.daemon = daemon;
16+
SecurityManager s = System.getSecurityManager();
17+
group = (s != null) ? s.getThreadGroup() :
18+
Thread.currentThread().getThreadGroup();
19+
this.namePrefix = namePrefix;
20+
}
21+
22+
public NamedThreadFactory(String namePrefix) {
23+
this(namePrefix, false);
24+
}
25+
26+
@Override
27+
public Thread newThread(Runnable r) {
28+
Thread t = new Thread(group, r, namePrefix + "-thread-" + threadNumber.getAndIncrement(), 0);
29+
t.setDaemon(daemon);
30+
return t;
31+
}
32+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
//package io.github.kimmking.gateway.outbound;
2+
//
3+
//import io.netty.bootstrap.Bootstrap;
4+
//import io.netty.channel.ChannelFuture;
5+
//import io.netty.channel.ChannelInitializer;
6+
//import io.netty.channel.ChannelOption;
7+
//import io.netty.channel.EventLoopGroup;
8+
//import io.netty.channel.nio.NioEventLoopGroup;
9+
//import io.netty.channel.socket.SocketChannel;
10+
//import io.netty.channel.socket.nio.NioSocketChannel;
11+
//import io.netty.handler.codec.http.HttpRequestEncoder;
12+
//import io.netty.handler.codec.http.HttpResponseDecoder;
13+
//
14+
//public class NettyHttpClient {
15+
// public void connect(String host, int port) throws Exception {
16+
// EventLoopGroup workerGroup = new NioEventLoopGroup();
17+
//
18+
// try {
19+
// Bootstrap b = new Bootstrap();
20+
// b.group(workerGroup);
21+
// b.channel(NioSocketChannel.class);
22+
// b.option(ChannelOption.SO_KEEPALIVE, true);
23+
// b.handler(new ChannelInitializer<SocketChannel>() {
24+
// @Override
25+
// public void initChannel(SocketChannel ch) throws Exception {
26+
// // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
27+
// ch.pipeline().addLast(new HttpResponseDecoder());
28+
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
29+
// ch.pipeline().addLast(new HttpRequestEncoder());
30+
// ch.pipeline().addLast(new HttpClientOutboundHandler());
31+
// }
32+
// });
33+
//
34+
// // Start the client.
35+
// ChannelFuture f = b.connect(host, port).sync();
36+
//
37+
// // for test
38+
// URI uri = new URI("http://127.0.0.1:8844");
39+
// String msg = "Are you ok?";
40+
// DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
41+
// uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
42+
//
43+
// // 构建http请求
44+
// request.headers().set(HttpHeaders.Names.HOST, host);
45+
// request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
46+
// request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
47+
// // 发送http请求
48+
// f.channel().write(request);
49+
// f.channel().flush();
50+
// f.channel().closeFuture().sync();
51+
// } finally {
52+
// workerGroup.shutdownGracefully();
53+
// }
54+
//
55+
// }
56+
//
57+
// public static void main(String[] args) throws Exception {
58+
// NettyHttpClient client = new NettyHttpClient();
59+
// client.connect("127.0.0.1", 8844);
60+
// }
61+
//}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.github.kimmking.gateway.outbound.netty4;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.channel.ChannelInboundHandlerAdapter;
5+
6+
public class NettyHttpClientOutboundHandler extends ChannelInboundHandlerAdapter {
7+
8+
@Override
9+
public void channelActive(ChannelHandlerContext ctx)
10+
throws Exception {
11+
12+
13+
}
14+
15+
@Override
16+
public void channelRead(ChannelHandlerContext ctx, Object msg)
17+
throws Exception {
18+
19+
20+
21+
}
22+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.github.kimmking.gateway.router;
2+
3+
import java.util.List;
4+
5+
public interface HttpEndpointRouter {
6+
7+
String route(List<String> endpoints);
8+
9+
}

0 commit comments

Comments
 (0)