Skip to content

Commit

Permalink
Fix wildcard picking up services it shouldn't for ingress/terminating…
Browse files Browse the repository at this point in the history
… gateways
  • Loading branch information
kyhavlov committed Aug 2, 2022
1 parent 20ffcba commit 499211f
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 22 deletions.
96 changes: 89 additions & 7 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
if svc.Kind == structs.ServiceKindTypical && svc.Service != "consul" {
// Check if this service is covered by a gateway's wildcard specifier, we force the service kind to a gateway-service here as that take precedence
sn := structs.NewServiceName(svc.Service, &svc.EnterpriseMeta)
if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil {
if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, svc, structs.GatewayServiceKindService); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err = checkGatewayAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil {
Expand Down Expand Up @@ -1984,11 +1984,6 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta, svc.PeerName); err != nil {
return err
}
if svc.PeerName == "" {
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
}
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name}
if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
Expand All @@ -2001,6 +1996,12 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
}

if svc.PeerName == "" {
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
}

return nil
}

Expand Down Expand Up @@ -3652,6 +3653,18 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
continue
}

supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, sn.ServiceName, entMeta)
if err != nil {
return err
}

if service.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress {
continue
}
if service.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating {
continue
}

existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
Expand Down Expand Up @@ -3717,6 +3730,42 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
return nil
}

// serviceConnectInstances returns whether the service has at least one connect instance,
// and at least one non-connect instance.
func serviceConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, bool, error) {
hasConnectInstance := false
connectQuery := Query{
Value: serviceName,
EnterpriseMeta: *entMeta,
}
svc, err := tx.First(tableServices, indexConnect, connectQuery)
if err != nil {
return false, false, fmt.Errorf("failed service lookup: %s", err)
}
if svc != nil {
hasConnectInstance = true
}

hasNonConnectInstance := false
nonConnectQuery := Query{
Value: serviceName,
EnterpriseMeta: *entMeta,
}
iter, err := tx.Get(tableServices, indexService, nonConnectQuery)
if err != nil {
return false, false, fmt.Errorf("failed service lookup: %s", err)
}
for service := iter.Next(); service != nil; service = iter.Next() {
sn := service.(*structs.ServiceNode)
if !sn.ServiceConnect.Native {
hasNonConnectInstance = true
break
}
}

return hasConnectInstance, hasNonConnectInstance, nil
}

// updateGatewayService associates services with gateways after an eligible event
// ie. Registering a service in a namespace targeted by a gateway
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
Expand Down Expand Up @@ -3754,14 +3803,31 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi
// checkWildcardForGatewaysAndUpdate checks whether a service matches a
// wildcard definition in gateway config entries and if so adds it the the
// gateway-services table.
func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, kind structs.GatewayServiceKind) error {
func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, ns *structs.NodeService, kind structs.GatewayServiceKind) error {
sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta}
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err)
}

supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, svc.Name, &svc.EnterpriseMeta)
if err != nil {
return err
}
if ns != nil && ns.Connect.Native {
supportsIngress = true
} else {
supportsTerminating = true
}

for service := svcGateways.Next(); service != nil; service = svcGateways.Next() {
if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil {
if wildcardSvc.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress {
continue
}
if wildcardSvc.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating {
continue
}

// Copy the wildcard mapping and modify it
gatewaySvc := wildcardSvc.Clone()
Expand Down Expand Up @@ -3818,12 +3884,28 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
}
}

// Check whether there are any connect or non-connect instances remaining for this service.
// If there are no connect instances left, ingress gateways with a wildcard entry can remove
// their association with it (same with terminating gateways if there are no non-connect
// instances left).
hasConnectInstance, hasNonConnectInstance, err := serviceConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return err
}

// Do the updates in a separate loop so we don't trash the iterator.
for _, m := range mappings {
// Only delete if association was created by a wildcard specifier.
// Otherwise the service was specified in the config entry, and the association should be maintained
// for when the service is re-registered
if m.FromWildcard {
if m.GatewayKind == structs.ServiceKindIngressGateway && hasConnectInstance {
continue
}
if m.GatewayKind == structs.ServiceKindTerminatingGateway && hasNonConnectInstance {
continue
}

if err := tx.Delete(tableGatewayServices, m); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
Expand Down
44 changes: 31 additions & 13 deletions agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
crand "crypto/rand"
"fmt"
"github.com/hashicorp/consul/acl"
"reflect"
"sort"
"strings"
"testing"
"time"

"github.com/hashicorp/consul/acl"

"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -5753,6 +5754,10 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
assert.Nil(t, s.EnsureService(13, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000}))
assert.Nil(t, s.EnsureService(14, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))

// Connect services (should be ignored by terminating gateway)
assert.Nil(t, s.EnsureService(15, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000}))
assert.Nil(t, s.EnsureService(16, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000}))

// Register two gateways
assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443}))
Expand Down Expand Up @@ -5895,6 +5900,16 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
},
}
assert.Equal(t, expect, out)

// Delete the non-connect instance of api
assert.Nil(t, s.DeleteService(21, "foo", "api", nil, ""))

// Gateway with wildcard entry should have no services left, because the last
// non-connect instance of 'api' was deleted.
idx, out, err = s.GatewayServices(ws, "other-gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(21))
assert.Empty(t, out)
}

func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
Expand All @@ -5904,7 +5919,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
t.Run("check service1 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
// Multiple instances of the ingress2 service
require.Len(t, results, 4)

Expand All @@ -5923,7 +5938,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
t.Run("check service2 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service2", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
require.Len(t, results, 2)

ids := make(map[string]struct{})
Expand All @@ -5941,7 +5956,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
ws := memdb.NewWatchSet()
idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
require.Len(t, results, 1)
require.Equal(t, "wildcardIngress", results[0].Service.ID)
})
Expand All @@ -5952,17 +5967,17 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {

idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
require.Len(t, results, 3)

idx, results, err = s.CheckIngressServiceNodes(ws, "service2", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
require.Len(t, results, 1)

idx, results, err = s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
// TODO(ingress): index goes backward when deleting last config entry
// require.Equal(t,uint64(11), idx)
require.Len(t, results, 0)
Expand Down Expand Up @@ -6346,8 +6361,8 @@ func TestStateStore_GatewayServices_IngressProtocolFiltering(t *testing.T) {
}

testRegisterNode(t, s, 0, "node1")
testRegisterService(t, s, 1, "node1", "service1")
testRegisterService(t, s, 2, "node1", "service2")
testRegisterConnectService(t, s, 1, "node1", "service1")
testRegisterConnectService(t, s, 2, "node1", "service2")
assert.NoError(t, s.EnsureConfigEntry(4, ingress1))
})

Expand Down Expand Up @@ -6510,15 +6525,17 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")

// Register a service against the nodes.
// Register some connect and non-connect services against the nodes.
testRegisterIngressService(t, s, 3, "node1", "wildcardIngress")
testRegisterIngressService(t, s, 4, "node1", "ingress1")
testRegisterIngressService(t, s, 5, "node1", "ingress2")
testRegisterIngressService(t, s, 6, "node2", "ingress2")
testRegisterIngressService(t, s, 7, "node1", "nothingIngress")
testRegisterService(t, s, 8, "node1", "service1")
testRegisterService(t, s, 9, "node2", "service2")
testRegisterService(t, s, 10, "node2", "service3")
testRegisterConnectService(t, s, 8, "node1", "service1")
testRegisterConnectService(t, s, 9, "node2", "service2")
testRegisterConnectService(t, s, 10, "node2", "service3")
testRegisterService(t, s, 17, "node1", "service4")
testRegisterService(t, s, 18, "node2", "service5")

// Default protocol to http
proxyDefaults := &structs.ProxyConfigEntry{
Expand Down Expand Up @@ -7883,6 +7900,7 @@ func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) {
Address: "127.0.0.3",
Port: 443,
EnterpriseMeta: *defaultMeta,
Connect: structs.ServiceConnect{Native: true},
}
require.NoError(t, s.EnsureService(5, "foo", &svc))
assert.True(t, watchFired(ws))
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *a
gsKind = structs.GatewayServiceKindUnknown
}
serviceName := structs.NewServiceName(c.GetName(), c.GetEnterpriseMeta())
if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, gsKind); err != nil {
if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, nil, gsKind); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err := checkGatewayAndUpdate(tx, idx, &serviceName, gsKind); err != nil {
Expand Down Expand Up @@ -434,7 +434,7 @@ func insertConfigEntryWithTxn(tx WriteTxn, idx uint64, conf structs.ConfigEntry)
if err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, gsKind); err != nil {
if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, nil, gsKind); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err := checkGatewayAndUpdate(tx, idx, &sn, gsKind); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID s
testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
}

func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, true, func(service *structs.NodeService) {
service.Connect = structs.ServiceConnect{Native: true}
})
}

func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
svc := &structs.NodeService{
ID: serviceID,
Expand Down

0 comments on commit 499211f

Please sign in to comment.