Skip to content

Commit

Permalink
tcp proxy: properly handle upstream LocalClose events. (envoyproxy#2789)
Browse files Browse the repository at this point in the history
LocalClose events can happen if the ClientConnection fails to bind or set socket options. Prior to this change, the TcpProxy would ignore the error.

Risk Level: Medium
Testing: Unit tests added
Release Notes: None

Signed-off-by: Greg Greenway <[email protected]>
  • Loading branch information
ggreenway authored and alyssawilk committed Mar 15, 2018
1 parent 9d4ca6d commit c7329dd
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 34 deletions.
53 changes: 30 additions & 23 deletions source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,6 @@ void TcpProxy::finalizeUpstreamConnectionStats() {
finalizeConnectionStats(*read_callbacks_->upstreamHost(), *connected_timespan_);
}

void TcpProxy::closeUpstreamConnection() {
finalizeUpstreamConnectionStats();
upstream_connection_->close(Network::ConnectionCloseType::NoFlush);
read_callbacks_->connection().dispatcher().deferredDelete(std::move(upstream_connection_));
}

void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
read_callbacks_ = &callbacks;
ENVOY_CONN_LOG(debug, "new tcp proxy session", read_callbacks_->connection());
Expand All @@ -192,7 +186,8 @@ void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callb
}

void TcpProxy::readDisableUpstream(bool disable) {
if (upstream_connection_->state() != Network::Connection::State::Open) {
if (upstream_connection_ == nullptr ||
upstream_connection_->state() != Network::Connection::State::Open) {
// Because we flush write downstream, we can have a case where upstream has already disconnected
// and we are waiting to flush. If we had a watermark event during this time we should no
// longer touch the upstream connection.
Expand Down Expand Up @@ -301,6 +296,8 @@ void TcpProxy::UpstreamCallbacks::drain(TcpProxyDrainer& drainer) {
}

Network::FilterStatus TcpProxy::initializeUpstreamConnection() {
ASSERT(upstream_connection_ == nullptr);

const std::string& cluster_name = getUpstreamCluster();

Upstream::ThreadLocalCluster* thread_local_cluster = cluster_manager_.get(cluster_name);
Expand Down Expand Up @@ -383,8 +380,9 @@ void TcpProxy::onConnectTimeout() {
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_timeout_.inc();
request_info_.setResponseFlag(RequestInfo::ResponseFlag::UpstreamConnectionFailure);

closeUpstreamConnection();
initializeUpstreamConnection();
// This will cause a LocalClose event to be raised, which will trigger a reconnect if
// needed/configured.
upstream_connection_->close(Network::ConnectionCloseType::NoFlush);
}

Network::FilterStatus TcpProxy::onData(Buffer::Instance& data, bool end_stream) {
Expand All @@ -402,7 +400,8 @@ void TcpProxy::onDownstreamEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::RemoteClose) {
upstream_connection_->close(Network::ConnectionCloseType::FlushWrite);

if (upstream_connection_->state() != Network::Connection::State::Closed) {
if (upstream_connection_ != nullptr &&
upstream_connection_->state() != Network::Connection::State::Closed) {
if (config_ != nullptr) {
config_->drainManager().add(config_->sharedConfig(), std::move(upstream_connection_),
std::move(upstream_callbacks_), std::move(idle_timer_),
Expand Down Expand Up @@ -443,24 +442,31 @@ void TcpProxy::onUpstreamEvent(Network::ConnectionEvent event) {

if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
finalizeUpstreamConnectionStats();
read_callbacks_->connection().dispatcher().deferredDelete(std::move(upstream_connection_));
disableIdleTimer();
}

if (event == Network::ConnectionEvent::RemoteClose) {
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_destroy_remote_.inc();
auto& destroy_ctx_stat =
(event == Network::ConnectionEvent::RemoteClose)
? read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_destroy_remote_
: read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_destroy_local_;
destroy_ctx_stat.inc();

if (connecting) {
request_info_.setResponseFlag(RequestInfo::ResponseFlag::UpstreamConnectionFailure);
read_callbacks_->upstreamHost()->outlierDetector().putResult(
Upstream::Outlier::Result::CONNECT_FAILED);
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_fail_.inc();
read_callbacks_->upstreamHost()->stats().cx_connect_fail_.inc();
closeUpstreamConnection();
if (event == Network::ConnectionEvent::RemoteClose) {
request_info_.setResponseFlag(RequestInfo::ResponseFlag::UpstreamConnectionFailure);
read_callbacks_->upstreamHost()->outlierDetector().putResult(
Upstream::Outlier::Result::CONNECT_FAILED);
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_fail_.inc();
read_callbacks_->upstreamHost()->stats().cx_connect_fail_.inc();
}

initializeUpstreamConnection();
} else {
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
if (read_callbacks_->connection().state() == Network::Connection::State::Open) {
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
}
}
} else if (event == Network::ConnectionEvent::LocalClose) {
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_destroy_local_.inc();
} else if (event == Network::ConnectionEvent::Connected) {
connect_timespan_->complete();

Expand Down Expand Up @@ -491,7 +497,8 @@ void TcpProxy::onUpstreamEvent(Network::ConnectionEvent event) {

void TcpProxy::onIdleTimeout() {
config_->stats().idle_timeout_.inc();
closeUpstreamConnection();

// This results in also closing the upstream connection.
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
}

Expand Down
1 change: 0 additions & 1 deletion source/common/filter/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ class TcpProxy : public Network::ReadFilter,
void onUpstreamData(Buffer::Instance& data, bool end_stream);
void onUpstreamEvent(Network::ConnectionEvent event);
void finalizeUpstreamConnectionStats();
void closeUpstreamConnection();
void onIdleTimeout();
void resetIdleTimer();
void disableIdleTimer();
Expand Down
57 changes: 49 additions & 8 deletions test/common/filter/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,31 @@ TEST_F(TcpProxyTest, HalfCloseProxy) {
upstream_read_filter_->onData(response, true);

EXPECT_CALL(filter_callbacks_.connection_, close(_));
EXPECT_CALL(*upstream_connections_.at(0), close(_));
EXPECT_CALL(filter_callbacks_.connection_.dispatcher_,
deferredDelete_(upstream_connections_.at(0)));
upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose);
}

TEST_F(TcpProxyTest, UpstreamDisconnect) {
// Test that downstream is closed after an upstream LocalClose.
TEST_F(TcpProxyTest, UpstreamLocalDisconnect) {
setup(1);

Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false));
filter_->onData(buffer, false);

raiseEventUpstreamConnected(0);

Buffer::OwnedImpl response("world");
EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _));
upstream_read_filter_->onData(response, false);

EXPECT_CALL(filter_callbacks_.connection_, close(_));
upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose);
}

// Test that downstream is closed after an upstream RemoteClose.
TEST_F(TcpProxyTest, UpstreamRemoteDisconnect) {
setup(1);

Buffer::OwnedImpl buffer("hello");
Expand All @@ -510,13 +530,30 @@ TEST_F(TcpProxyTest, UpstreamDisconnect) {
upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose);
}

// Test that reconnect is attempted after a connect failure
TEST_F(TcpProxyTest, ConnectAttemptsUpstreamFail) {
// Test that reconnect is attempted after a local connect failure
TEST_F(TcpProxyTest, ConnectAttemptsUpstreamLocalFail) {
envoy::config::filter::network::tcp_proxy::v2::TcpProxy config = defaultConfig();
config.mutable_max_connect_attempts()->set_value(2);
setup(2, config);

EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush));
EXPECT_CALL(filter_callbacks_.connection_.dispatcher_,
deferredDelete_(upstream_connections_.at(0)));
upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::LocalClose);
raiseEventUpstreamConnected(1);

EXPECT_EQ(0U, factory_context_.cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_
.counter("upstream_cx_connect_attempts_exceeded")
.value());
}

// Test that reconnect is attempted after a remote connect failure
TEST_F(TcpProxyTest, ConnectAttemptsUpstreamRemoteFail) {
envoy::config::filter::network::tcp_proxy::v2::TcpProxy config = defaultConfig();
config.mutable_max_connect_attempts()->set_value(2);
setup(2, config);

EXPECT_CALL(filter_callbacks_.connection_.dispatcher_,
deferredDelete_(upstream_connections_.at(0)));
upstream_connections_.at(0)->raiseEvent(Network::ConnectionEvent::RemoteClose);
raiseEventUpstreamConnected(1);

Expand Down Expand Up @@ -549,8 +586,12 @@ TEST_F(TcpProxyTest, ConnectAttemptsLimit) {
{
testing::InSequence sequence;
EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush));
EXPECT_CALL(*upstream_connections_.at(1), close(Network::ConnectionCloseType::NoFlush));
EXPECT_CALL(*upstream_connections_.at(2), close(Network::ConnectionCloseType::NoFlush));
EXPECT_CALL(filter_callbacks_.connection_.dispatcher_,
deferredDelete_(upstream_connections_.at(0)));
EXPECT_CALL(filter_callbacks_.connection_.dispatcher_,
deferredDelete_(upstream_connections_.at(1)));
EXPECT_CALL(filter_callbacks_.connection_.dispatcher_,
deferredDelete_(upstream_connections_.at(2)));
EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush));
}

Expand Down Expand Up @@ -779,9 +820,9 @@ TEST_F(TcpProxyTest, IdleTimeout) {
EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000)));
upstream_connections_.at(0)->raiseBytesSentCallbacks(2);

EXPECT_CALL(*idle_timer, disableTimer());
EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush));
EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush));
EXPECT_CALL(*idle_timer, disableTimer());
idle_timer->callback_();
}

Expand Down
4 changes: 2 additions & 2 deletions test/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class MockDispatcher : public Dispatcher {
TimerPtr createTimer(TimerCb cb) override { return TimerPtr{createTimer_(cb)}; }

void deferredDelete(DeferredDeletablePtr&& to_delete) override {
deferredDelete_(to_delete);
deferredDelete_(to_delete.get());
if (to_delete) {
to_delete_.push_back(std::move(to_delete));
}
Expand Down Expand Up @@ -95,7 +95,7 @@ class MockDispatcher : public Dispatcher {
bool bind_to_port,
bool hand_off_restored_destination_connections));
MOCK_METHOD1(createTimer_, Timer*(TimerCb cb));
MOCK_METHOD1(deferredDelete_, void(DeferredDeletablePtr& to_delete));
MOCK_METHOD1(deferredDelete_, void(DeferredDeletable* to_delete));
MOCK_METHOD0(exit, void());
MOCK_METHOD2(listenForSignal_, SignalEvent*(int signal_num, SignalCb cb));
MOCK_METHOD1(post, void(std::function<void()> callback));
Expand Down

0 comments on commit c7329dd

Please sign in to comment.