Skip to content

Commit

Permalink
reset ssl enabled netty client to async
Browse files Browse the repository at this point in the history
  • Loading branch information
NickNYU committed Nov 27, 2018
1 parent 4369869 commit 82f73ea
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ protected void initBootstrap() {
}

@Override
public PooledObject<NettyClient> makeObject(Endpoint key) throws Exception {
public PooledObject<NettyClient> makeObject(Endpoint key) {
ProxyEndpoint endpoint = (ProxyEndpoint) key;
ChannelFuture f = getBootstrap(endpoint).connect(key.getHost(), key.getPort());
f.get(5000, TimeUnit.MILLISECONDS);
NettyClient nettyClient = new DefaultNettyClient(f.channel());
NettyClient nettyClient = new AsyncNettyClient(f, key);
f.channel().attr(NettyClientHandler.KEY_CLIENT).set(nettyClient);
return new DefaultPooledObject<>(nettyClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.ctrip.xpipe.redis.core.proxy.endpoint.DefaultProxyEndpointManager;
import com.ctrip.xpipe.redis.core.proxy.endpoint.EndpointHealthChecker;
import com.ctrip.xpipe.redis.proxy.DefaultProxyServer;
import com.ctrip.xpipe.redis.proxy.ProxyServer;
import com.ctrip.xpipe.redis.proxy.Session;
import com.ctrip.xpipe.redis.proxy.TestProxyConfig;
import com.ctrip.xpipe.redis.proxy.monitor.DefaultTunnelMonitorManager;
Expand Down Expand Up @@ -82,7 +83,7 @@ public boolean checkConnectivity(Endpoint endpoint) {
}
}

protected void startFirstProxy() throws Exception {
protected DefaultProxyServer startFirstProxy() throws Exception {
// uncomment disable netty bytebuf test
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
DefaultProxyServer server = new DefaultProxyServer().setConfig(new TestProxyConfig()
Expand All @@ -92,6 +93,7 @@ protected void startFirstProxy() throws Exception {
((TestProxyConfig)server.getResourceManager().getProxyConfig()).setStartMonitor(true);
((DefaultPingStatsManager)server.getPingStatsManager()).postConstruct();
server.start();
return server;
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.ctrip.xpipe.redis.proxy.resource;

import com.ctrip.xpipe.netty.ByteBufUtils;
import com.ctrip.xpipe.netty.commands.ByteBufReceiver;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.proxy.ProxyEndpoint;
import com.ctrip.xpipe.redis.core.proxy.endpoint.DefaultProxyEndpoint;
import com.ctrip.xpipe.redis.proxy.DefaultProxyServer;
import com.ctrip.xpipe.redis.proxy.TestProxyConfig;
import com.ctrip.xpipe.redis.proxy.integrate.AbstractProxyIntegrationTest;
import com.ctrip.xpipe.simpleserver.Server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import org.apache.commons.pool2.PooledObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/**
Expand All @@ -26,9 +31,7 @@ public class SslEnabledNettyClientFactoryTest extends AbstractProxyIntegrationTe

@Before
public void beforeSslEnabledNettyClientFactoryTest() throws Exception {
server = new DefaultProxyServer().setConfig(new TestProxyConfig()).setResourceManager(new TestResourceManager());
prepare(server);
server.start();
server = startFirstProxy();
factory = new SslEnabledNettyClientFactory(new TestResourceManager());
factory.start();
}
Expand All @@ -40,11 +43,40 @@ public void afterSslEnabledNettyClientFactoryTest() {


@Test
public void makeObject() throws Exception {
public void testMakeObject() throws Exception {
Server localServer = startEchoServer();
int tlsPort = server.getConfig().frontendTlsPort();
PooledObject<NettyClient> clientPooledObject = factory.makeObject(new DefaultProxyEndpoint(ProxyEndpoint.PROXY_SCHEME.TLS + "://127.0.0.1:" + tlsPort));
PooledObject<NettyClient> clientPooledObject = factory.makeObject(new DefaultProxyEndpoint(ProxyEndpoint.PROXY_SCHEME.PROXYTLS + "://127.0.0.1:" + tlsPort));
clientPooledObject.getObject().sendRequest(Unpooled.copiedBuffer(String.format("+PROXY ROUTE TCP://127.0.0.1:%d\r\nhello", localServer.getPort()).getBytes()));
sleep(1000 * 10);
sleep(1000);
}

@Test
public void testTLSHandShakeError() throws Exception {
int tlsPort = server.getConfig().frontendTlsPort();
PooledObject<NettyClient> clientPooledObject = factory.makeObject(new DefaultProxyEndpoint(ProxyEndpoint.PROXY_SCHEME.PROXYTLS + "://127.0.0.1:" + tlsPort));
clientPooledObject.getObject().sendRequest(Unpooled.copiedBuffer(("+PROXY MONITOR PingStats\r\n").getBytes()));
sleep(1000);
}

@Ignore
@Test
public void manuallyTestFWS() throws Exception {
PooledObject<NettyClient> clientPooledObject = factory.makeObject(new DefaultProxyEndpoint(ProxyEndpoint.PROXY_SCHEME.PROXYTLS + "://10.2.134.71:443"));
clientPooledObject.getObject().sendRequest(
Unpooled.copiedBuffer(("+PROXY MONITOR PingStats\r\n+PROXY MONITOR TunnelStats\r\n+PROXY MONITOR SocketStats\r\n").getBytes()),
new ByteBufReceiver() {
@Override
public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) {
logger.info("{}", ByteBufUtils.readToString(byteBuf));
return RECEIVER_RESULT.SUCCESS;
}

@Override
public void clientClosed(NettyClient nettyClient) {

}
});
sleep(1000);
}
}

0 comments on commit 82f73ea

Please sign in to comment.