Skip to content

Commit

Permalink
sdk: add freelist tracking and ephemeral port range skipping to freeport
Browse files Browse the repository at this point in the history
This should cut down on test flakiness.

Problems handled:

- If you had enough parallel test cases running, the former circular
approach to handling the port block could hand out the same port to
multiple cases before they each had a chance to bind them, leading to
one of the two tests to fail.

- The freeport library would allocate out of the ephemeral port range.
This has been corrected for Linux (which should cover CI).

- The library now waits until a formerly-in-use port is verified to be
free before putting it back into circulation.
  • Loading branch information
rboyer committed Sep 17, 2019
1 parent 90d9455 commit f9496dc
Show file tree
Hide file tree
Showing 18 changed files with 685 additions and 85 deletions.
15 changes: 9 additions & 6 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,8 +1229,11 @@ func TestAgent_RestoreServiceWithAliasCheck(t *testing.T) {
testCtx, testCancel := context.WithCancel(context.Background())
defer testCancel()

testHTTPServer := launchHTTPCheckServer(t, testCtx)
defer testHTTPServer.Close()
testHTTPServer, returnPort := launchHTTPCheckServer(t, testCtx)
defer func() {
testHTTPServer.Close()
returnPort()
}()

registerServicesAndChecks := func(t *testing.T, a *TestAgent) {
// add one persistent service with a simple check
Expand Down Expand Up @@ -1338,8 +1341,8 @@ node_name = "` + a.Config.NodeName + `"
}
}

func launchHTTPCheckServer(t *testing.T, ctx context.Context) *httptest.Server {
ports := freeport.GetT(t, 1)
func launchHTTPCheckServer(t *testing.T, ctx context.Context) (srv *httptest.Server, returnPortsFn func()) {
ports := freeport.MustTake(1)
port := ports[0]

addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(port))
Expand All @@ -1353,12 +1356,12 @@ func launchHTTPCheckServer(t *testing.T, ctx context.Context) *httptest.Server {
_, _ = w.Write([]byte("OK\n"))
})

srv := &httptest.Server{
srv = &httptest.Server{
Listener: listener,
Config: &http.Server{Handler: handler},
}
srv.Start()
return srv
return srv, func() { freeport.Return(ports) }
}

func TestAgent_AddCheck_Alias(t *testing.T) {
Expand Down
23 changes: 20 additions & 3 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/net-rpc-msgpackrpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
Expand All @@ -22,15 +22,27 @@ import (
func testClientConfig(t *testing.T) (string, *Config) {
dir := testutil.TempDir(t, "consul")
config := DefaultConfig()

ports := freeport.MustTake(2)

returnPortsFn := func() {
// The method of plumbing this into the client shutdown hook doesn't
// cover all exit points, so we insulate this against multiple
// invocations and then it's safe to call it a bunch of times.
freeport.Return(ports)
config.NotifyShutdown = nil // self-erasing
}
config.NotifyShutdown = returnPortsFn

config.Datacenter = "dc1"
config.DataDir = dir
config.NodeName = uniqueNodeName(t.Name())
config.RPCAddr = &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: freeport.Get(1)[0],
Port: ports[0],
}
config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfLANConfig.MemberlistConfig.BindPort = freeport.Get(1)[0]
config.SerfLANConfig.MemberlistConfig.BindPort = ports[1]
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
Expand Down Expand Up @@ -59,6 +71,7 @@ func testClientWithConfig(t *testing.T, cb func(c *Config)) (string, *Client) {
}
client, err := NewClient(config)
if err != nil {
config.NotifyShutdown()
t.Fatalf("err: %v", err)
}
return dir, client
Expand Down Expand Up @@ -416,6 +429,7 @@ func TestClient_RPC_TLS(t *testing.T) {
defer s1.Shutdown()

dir2, conf2 := testClientConfig(t)
defer conf2.NotifyShutdown()
conf2.VerifyOutgoing = true
configureTLS(conf2)
c1, err := NewClient(conf2)
Expand Down Expand Up @@ -460,6 +474,7 @@ func TestClient_RPC_RateLimit(t *testing.T) {
testrpc.WaitForLeader(t, s1.RPC, "dc1")

dir2, conf2 := testClientConfig(t)
defer conf2.NotifyShutdown()
conf2.RPCRate = 2
conf2.RPCMaxBurst = 2
c1, err := NewClient(conf2)
Expand Down Expand Up @@ -527,6 +542,7 @@ func TestClient_SnapshotRPC_RateLimit(t *testing.T) {
testrpc.WaitForLeader(t, s1.RPC, "dc1")

dir2, conf1 := testClientConfig(t)
defer conf1.NotifyShutdown()
conf1.RPCRate = 2
conf1.RPCMaxBurst = 2
c1, err := NewClient(conf1)
Expand Down Expand Up @@ -569,6 +585,7 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) {
defer s1.Shutdown()

dir2, conf2 := testClientConfig(t)
defer conf2.NotifyShutdown()
conf2.VerifyOutgoing = true
configureTLS(conf2)
c1, err := NewClient(conf2)
Expand Down
3 changes: 3 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type Config struct {
// configured at this point.
NotifyListen func()

// NotifyShutdown is called after Server is completely Shutdown.
NotifyShutdown func()

// RPCAddr is the RPC address used by Consul. This should be reachable
// by the WAN and LAN
RPCAddr *net.TCPAddr
Expand Down
12 changes: 9 additions & 3 deletions agent/consul/operator_raft_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/net-rpc-msgpackrpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/raft"
"github.com/pascaldekloe/goe/verify"
)
Expand Down Expand Up @@ -145,10 +145,13 @@ func TestOperator_RaftRemovePeerByAddress(t *testing.T) {

testrpc.WaitForLeader(t, s1.RPC, "dc1")

ports := freeport.MustTake(1)
defer freeport.Return(ports)

// Try to remove a peer that's not there.
arg := structs.RaftRemovePeerRequest{
Datacenter: "dc1",
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", freeport.Get(1)[0])),
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])),
}
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
Expand Down Expand Up @@ -277,7 +280,10 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {

// Add it manually to Raft.
{
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", freeport.Get(1)[0])), 0, 0)
ports := freeport.MustTake(1)
defer freeport.Return(ports)

future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])), 0, 0)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,10 @@ func (s *Server) Shutdown() error {
// Close the connection pool
s.connPool.Shutdown()

if s.config.NotifyShutdown != nil {
s.config.NotifyShutdown()
}

return nil
}

Expand Down
16 changes: 15 additions & 1 deletion agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ func testServerConfig(t *testing.T) (string, *Config) {
dir := testutil.TempDir(t, "consul")
config := DefaultConfig()

ports := freeport.Get(3)
ports := freeport.MustTake(3)

returnPortsFn := func() {
// The method of plumbing this into the server shutdown hook doesn't
// cover all exit points, so we insulate this against multiple
// invocations and then it's safe to call it a bunch of times.
freeport.Return(ports)
config.NotifyShutdown = nil // self-erasing
}
config.NotifyShutdown = returnPortsFn

config.NodeName = uniqueNodeName(t.Name())
config.Bootstrap = true
config.Datacenter = "dc1"
Expand All @@ -56,6 +66,7 @@ func testServerConfig(t *testing.T) (string, *Config) {

nodeID, err := uuid.GenerateUUID()
if err != nil {
returnPortsFn()
t.Fatal(err)
}
config.NodeID = types.NodeID(nodeID)
Expand Down Expand Up @@ -112,6 +123,8 @@ func testServerConfig(t *testing.T) (string, *Config) {
},
}

config.NotifyShutdown = returnPortsFn

return dir, config
}

Expand Down Expand Up @@ -168,6 +181,7 @@ func testServerWithConfig(t *testing.T, cb func(*Config)) (string, *Server) {

srv, err = newServer(config)
if err != nil {
config.NotifyShutdown()
os.RemoveAll(dir)
r.Fatalf("err: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ func TestParseWait(t *testing.T) {
t.Fatalf("Bad: %v", b)
}
}

func TestPProfHandlers_EnableDebug(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand All @@ -751,6 +752,7 @@ func TestPProfHandlers_EnableDebug(t *testing.T) {

require.Equal(http.StatusOK, resp.Code)
}

func TestPProfHandlers_DisableDebugNoACLs(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
42 changes: 34 additions & 8 deletions agent/testagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type TestAgent struct {
// when Shutdown() is called.
Config *config.RuntimeConfig

// returnPortsFn will put the ports claimed for the test back into the
// general freeport pool
returnPortsFn func()

// LogOutput is the sink for the logs. If nil, logs are written
// to os.Stderr.
LogOutput io.Writer
Expand Down Expand Up @@ -150,12 +154,21 @@ func (a *TestAgent) Start() (err error) {
hclDataDir = `data_dir = "` + d + `"`
}

portsConfig, returnPortsFn := randomPortsSource(a.UseTLS)
a.returnPortsFn = returnPortsFn
a.Config = TestConfig(
randomPortsSource(a.UseTLS),
portsConfig,
config.Source{Name: a.Name, Format: "hcl", Data: a.HCL},
config.Source{Name: a.Name + ".data_dir", Format: "hcl", Data: hclDataDir},
)

defer func() {
if err != nil && a.returnPortsFn != nil {
a.returnPortsFn()
a.returnPortsFn = nil
}
}()

// write the keyring
if a.Key != "" {
writeKey := func(key, filename string) error {
Expand Down Expand Up @@ -286,6 +299,14 @@ func (a *TestAgent) Shutdown() error {
return nil
}

// Return ports last of all
defer func() {
if a.returnPortsFn != nil {
a.returnPortsFn()
a.returnPortsFn = nil
}
}()

// shutdown agent before endpoints
defer a.Agent.ShutdownEndpoints()
if err := a.Agent.ShutdownAgent(); err != nil {
Expand Down Expand Up @@ -350,27 +371,32 @@ func (a *TestAgent) consulConfig() *consul.Config {
// chance of port conflicts for concurrently executed test binaries.
// Instead of relying on one set of ports to be sufficient we retry
// starting the agent with different ports on port conflict.
func randomPortsSource(tls bool) config.Source {
ports := freeport.Get(6)
func randomPortsSource(tls bool) (src config.Source, returnPortsFn func()) {
ports := freeport.MustTake(6)

var http, https int
if tls {
ports[1] = -1
http = -1
https = ports[2]
} else {
ports[2] = -1
http = ports[1]
https = -1
}

return config.Source{
Name: "ports",
Format: "hcl",
Data: `
ports = {
dns = ` + strconv.Itoa(ports[0]) + `
http = ` + strconv.Itoa(ports[1]) + `
https = ` + strconv.Itoa(ports[2]) + `
http = ` + strconv.Itoa(http) + `
https = ` + strconv.Itoa(https) + `
serf_lan = ` + strconv.Itoa(ports[3]) + `
serf_wan = ` + strconv.Itoa(ports[4]) + `
server = ` + strconv.Itoa(ports[5]) + `
}
`,
}
}, func() { freeport.Return(ports) }
}

func NodeID() string {
Expand Down
9 changes: 6 additions & 3 deletions connect/proxy/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"bytes"
"context"
"fmt"
"github.com/hashicorp/consul/connect"
"log"
"net"
"os"
"testing"
"time"

"github.com/hashicorp/consul/connect"

metrics "github.com/armon/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -110,7 +111,8 @@ func TestPublicListener(t *testing.T) {
// Can't enable t.Parallel since we rely on the global metrics instance.

ca := agConnect.TestCA(t, nil)
ports := freeport.GetT(t, 1)
ports := freeport.MustTake(1)
defer freeport.Return(ports)

testApp := NewTestTCPServer(t)
defer testApp.Close()
Expand Down Expand Up @@ -162,7 +164,8 @@ func TestUpstreamListener(t *testing.T) {
// Can't enable t.Parallel since we rely on the global metrics instance.

ca := agConnect.TestCA(t, nil)
ports := freeport.GetT(t, 1)
ports := freeport.MustTake(1)
defer freeport.Return(ports)

// Run a test server that we can dial.
testSvr := connect.NewTestServer(t, "db", ca)
Expand Down
4 changes: 3 additions & 1 deletion connect/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func TestProxy_public(t *testing.T) {
t.Parallel()

require := require.New(t)
ports := freeport.GetT(t, 1)

ports := freeport.MustTake(1)
defer freeport.Return(ports)

a := agent.NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
Expand Down
Loading

0 comments on commit f9496dc

Please sign in to comment.