Skip to content

Commit

Permalink
upstream: add limits to connection pool map (envoyproxy#5861)
Browse files Browse the repository at this point in the history
Signed-off-by: Kyle Larose <[email protected]>
  • Loading branch information
klarose authored and mattklein123 committed Feb 13, 2019
1 parent 5b57f0f commit 97bda50
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 27 deletions.
6 changes: 6 additions & 0 deletions include/envoy/http/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ class Instance : public Event::DeferredDeletable {
*/
virtual void drainConnections() PURE;

/**
* Determines whether the connection pool is actively processing any requests.
* @return true if the connection pool has any pending requests or any active requests.
*/
virtual bool hasActiveConnections() const PURE;

/**
* Create a new stream on the pool.
* @param response_decoder supplies the decoder events to fire when the response is
Expand Down
4 changes: 4 additions & 0 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) {
checkForDrained();
}

bool ConnPoolImpl::hasActiveConnections() const {
return !pending_requests_.empty() || !busy_clients_.empty();
}

void ConnPoolImpl::attachRequestToClient(ActiveClient& client, StreamDecoder& response_decoder,
ConnectionPool::Callbacks& callbacks) {
ASSERT(!client.stream_wrapper_);
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase {
Http::Protocol protocol() const override { return Http::Protocol::Http11; }
void addDrainedCallback(DrainedCb cb) override;
void drainConnections() override;
bool hasActiveConnections() const override;
ConnectionPool::Cancellable* newStream(StreamDecoder& response_decoder,
ConnectionPool::Callbacks& callbacks) override;

Expand Down
12 changes: 12 additions & 0 deletions source/common/http/http2/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) {
checkForDrained();
}

bool ConnPoolImpl::hasActiveConnections() const {
if (primary_client_ && primary_client_->client_->numActiveRequests() > 0) {
return true;
}

if (draining_client_ && draining_client_->client_->numActiveRequests() > 0) {
return true;
}

return !pending_requests_.empty();
}

void ConnPoolImpl::checkForDrained() {
if (drained_callbacks_.empty()) {
return;
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http2/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase {
Http::Protocol protocol() const override { return Http::Protocol::Http2; }
void addDrainedCallback(DrainedCb cb) override;
void drainConnections() override;
bool hasActiveConnections() const override;
ConnectionPool::Cancellable* newStream(Http::StreamDecoder& response_decoder,
ConnectionPool::Callbacks& callbacks) override;

Expand Down
10 changes: 7 additions & 3 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "envoy/runtime/runtime.h"
#include "envoy/stats/scope.h"

#include "common/common/assert.h"
#include "common/common/enum_to_int.h"
#include "common/common/fmt.h"
#include "common/common/utility.h"
Expand Down Expand Up @@ -1121,13 +1122,16 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(

// Note: to simplify this, we assume that the factory is only called in the scope of this
// function. Otherwise, we'd need to capture a few of these variables by value.
Http::ConnectionPool::Instance& pool = container.pools_->getPool(hash_key, [&]() {
ConnPoolsContainer::ConnPools::OptPoolRef pool = container.pools_->getPool(hash_key, [&]() {
return parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, protocol,
have_options ? context->downstreamConnection()->socketOptions() : nullptr);
});

return &pool;
// The Connection Pool tracking is a work in progress. We plan for it to eventually have the
// ability to fail, but until we add upper layer handling for failures, it should not. So, assert
// that we don't accidentally add conditions that could allow it to fail.
ASSERT(pool.has_value(), "Pool allocation should never fail");
return &(pool.value().get());
}

Tcp::ConnectionPool::Instance*
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject {
struct ConnPoolsContainer {
ConnPoolsContainer(Event::Dispatcher& dispatcher)
: pools_{std::make_shared<ConnPools>(dispatcher)} {}
: pools_{std::make_shared<ConnPools>(dispatcher, absl::nullopt)} {}

typedef ConnPoolMap<std::vector<uint8_t>, Http::ConnectionPool::Instance> ConnPools;

Expand Down
14 changes: 12 additions & 2 deletions source/common/upstream/conn_pool_map.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include <functional>
#include <vector>

#include "envoy/event/dispatcher.h"

#include "common/common/debug_recursion_checker.h"

#include "absl/container/flat_hash_map.h"
#include "absl/types/optional.h"

namespace Envoy {
namespace Upstream {
Expand All @@ -17,15 +19,16 @@ template <typename KEY_TYPE, typename POOL_TYPE> class ConnPoolMap {
public:
using PoolFactory = std::function<std::unique_ptr<POOL_TYPE>()>;
using DrainedCb = std::function<void()>;
using OptPoolRef = absl::optional<std::reference_wrapper<POOL_TYPE>>;

ConnPoolMap(Event::Dispatcher& dispatcher);
ConnPoolMap(Event::Dispatcher& dispatcher, absl::optional<uint64_t> max_size);
~ConnPoolMap();
/**
* Returns an existing pool for `key`, or creates a new one using `factory`. Note that it is
* possible for this to fail if a limit on the number of pools allowed is reached.
* @return The pool corresponding to `key`, or `absl::nullopt`.
*/
POOL_TYPE& getPool(KEY_TYPE key, const PoolFactory& factory);
OptPoolRef getPool(KEY_TYPE key, const PoolFactory& factory);

/**
* @return the number of pools.
Expand All @@ -51,10 +54,17 @@ template <typename KEY_TYPE, typename POOL_TYPE> class ConnPoolMap {
void drainConnections();

private:
/**
* Frees the first idle pool in `active_pools_`.
* @return false if no pool was freed.
*/
bool freeOnePool();

absl::flat_hash_map<KEY_TYPE, std::unique_ptr<POOL_TYPE>> active_pools_;
Event::Dispatcher& thread_local_dispatcher_;
std::vector<DrainedCb> cached_callbacks_;
Common::DebugRecursionChecker recursion_checker_;
const absl::optional<uint64_t> max_size_;
};

} // namespace Upstream
Expand Down
63 changes: 51 additions & 12 deletions source/common/upstream/conn_pool_map_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,48 @@ namespace Envoy {
namespace Upstream {

template <typename KEY_TYPE, typename POOL_TYPE>
ConnPoolMap<KEY_TYPE, POOL_TYPE>::ConnPoolMap(Envoy::Event::Dispatcher& dispatcher)
: thread_local_dispatcher_(dispatcher) {}
ConnPoolMap<KEY_TYPE, POOL_TYPE>::ConnPoolMap(Envoy::Event::Dispatcher& dispatcher,
absl::optional<uint64_t> max_size)
: thread_local_dispatcher_(dispatcher), max_size_(max_size) {}

template <typename KEY_TYPE, typename POOL_TYPE>
ConnPoolMap<KEY_TYPE, POOL_TYPE>::~ConnPoolMap() = default;

template <typename KEY_TYPE, typename POOL_TYPE>
POOL_TYPE& ConnPoolMap<KEY_TYPE, POOL_TYPE>::getPool(KEY_TYPE key, const PoolFactory& factory) {
typename ConnPoolMap<KEY_TYPE, POOL_TYPE>::OptPoolRef
ConnPoolMap<KEY_TYPE, POOL_TYPE>::getPool(KEY_TYPE key, const PoolFactory& factory) {
Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
// TODO(klarose): Consider how we will change the connection pool's configuration in the future.
// The plan is to change the downstream socket options... We may want to take those as a parameter
// here. Maybe we'll pass them to the factory function?
auto inserted = active_pools_.emplace(key, nullptr);

// If we inserted a new element, create a pool and assign it to the iterator. Tell it about any
// cached callbacks.
if (inserted.second) {
inserted.first->second = factory();
for (const auto& cb : cached_callbacks_) {
inserted.first->second->addDrainedCallback(cb);
auto pool_iter = active_pools_.find(key);
if (pool_iter != active_pools_.end()) {
return std::ref(*(pool_iter->second));
}

// We need a new pool. Check if we have room.
if (max_size_.has_value() && size() >= max_size_.value()) {
// We're full. Try to free up a pool. If we can't, bail out.
if (!freeOnePool()) {
return absl::nullopt;
}

ASSERT(size() < max_size_.value(), "Freeing a pool should reduce the size to below the max.");
// TODO(klarose): Consider some simple hysteresis here. How can we prevent iterating over all
// pools when we're at the limit every time we want to allocate a new one, even if most of the
// pools are not busy, while balancing that with not unnecessarily freeing all pools? If we
// start freeing once we cross a threshold, then stop after we cross another, we could
// achieve that balance.
}

return *inserted.first->second;
// We have room for a new pool. Allocate one and let it know about any cached callbacks.
auto new_pool = factory();
for (const auto& cb : cached_callbacks_) {
new_pool->addDrainedCallback(cb);
}

auto inserted = active_pools_.emplace(key, std::move(new_pool));
return std::ref(*inserted.first->second);
}

template <typename KEY_TYPE, typename POOL_TYPE>
Expand Down Expand Up @@ -63,5 +81,26 @@ void ConnPoolMap<KEY_TYPE, POOL_TYPE>::drainConnections() {
pool_pair.second->drainConnections();
}
}

template <typename KEY_TYPE, typename POOL_TYPE>
bool ConnPoolMap<KEY_TYPE, POOL_TYPE>::freeOnePool() {
// Try to find a pool that isn't doing anything.
auto pool_iter = active_pools_.begin();
while (pool_iter != active_pools_.end()) {
if (!pool_iter->second->hasActiveConnections()) {
break;
}
++pool_iter;
}

if (pool_iter != active_pools_.end()) {
// We found one. Free it up, and let the caller know.
active_pools_.erase(pool_iter);
return true;
}

return false;
}

} // namespace Upstream
} // namespace Envoy
42 changes: 42 additions & 0 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,48 @@ TEST_F(Http1ConnPoolImplTest, RemoteCloseToCompleteResponse) {
dispatcher_.clearDeferredDeleteList();
}

TEST_F(Http1ConnPoolImplTest, NoActiveConnectionsByDefault) {
EXPECT_FALSE(conn_pool_.hasActiveConnections());
}

TEST_F(Http1ConnPoolImplTest, ActiveRequestHasActiveConnectionsTrue) {
ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection);
r1.startRequest();

EXPECT_TRUE(conn_pool_.hasActiveConnections());

// cleanup
r1.completeResponse(false);
conn_pool_.drainConnections();
EXPECT_CALL(conn_pool_, onClientDestroy());
dispatcher_.clearDeferredDeleteList();
}

TEST_F(Http1ConnPoolImplTest, ResponseCompletedConnectionReadyNoActiveConnections) {
ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection);
r1.startRequest();
r1.completeResponse(false);

EXPECT_FALSE(conn_pool_.hasActiveConnections());

conn_pool_.drainConnections();
EXPECT_CALL(conn_pool_, onClientDestroy());
dispatcher_.clearDeferredDeleteList();
}

TEST_F(Http1ConnPoolImplTest, PendingRequestIsConsideredActive) {
conn_pool_.expectClientCreate();
ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::Pending);

EXPECT_TRUE(conn_pool_.hasActiveConnections());

EXPECT_CALL(conn_pool_, onClientDestroy());
r1.handle_->cancel();
conn_pool_.drainConnections();
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

} // namespace Http1
} // namespace Http
} // namespace Envoy
Loading

0 comments on commit 97bda50

Please sign in to comment.