Skip to content

Commit

Permalink
Merge pull request kmesh-net#843 from hzxuzhonghu/fix-dns-memleak
Browse files Browse the repository at this point in the history
Fix dns memleak when dns cluster removed
  • Loading branch information
kmesh-bot authored Sep 19, 2024
2 parents 3ae4e4f + c461e2a commit ad3d9cd
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 32 deletions.
5 changes: 2 additions & 3 deletions pkg/controller/ads/ads_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ func (p *processor) handleCdsResponse(resp *service_discovery_v3.DiscoveryRespon
log.Debugf("unchanged cluster %s", cluster.GetName())
}
}

if len(dnsClusters) > 0 {
// send dns clusters to dns resolver
// send dns clusters to dns resolver, even dnsClusters is empty, we need to send empty list to dns resolver to clear the cache
if p.DnsResolverChan != nil {
p.DnsResolverChan <- dnsClusters
}
removed := p.Cache.ClusterCache.GetResourceNames().Difference(current)
Expand Down
33 changes: 14 additions & 19 deletions pkg/controller/ads/ads_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestHandleCdsResponse(t *testing.T) {
t.Cleanup(cleanup)
t.Run("new cluster, cluster type is eds", func(t *testing.T) {
p := newProcessor()
p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5)
cluster := &config_cluster_v3.Cluster{
Name: "ut-cluster",
ClusterDiscoveryType: &config_cluster_v3.Cluster_Type{
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestHandleCdsResponse(t *testing.T) {

t.Run("new cluster, cluster type is not eds", func(t *testing.T) {
p := newProcessor()
p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 1)
p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5)
cluster := &config_cluster_v3.Cluster{
Name: "ut-cluster",
ClusterDiscoveryType: &config_cluster_v3.Cluster_Type{
Expand Down Expand Up @@ -106,6 +107,7 @@ func TestHandleCdsResponse(t *testing.T) {

t.Run("cluster update case", func(t *testing.T) {
p := newProcessor()
p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5)
cluster := &config_cluster_v3.Cluster{
Name: "ut-cluster",
ClusterDiscoveryType: &config_cluster_v3.Cluster_Type{
Expand Down Expand Up @@ -156,7 +158,7 @@ func TestHandleCdsResponse(t *testing.T) {

t.Run("multiClusters: add a new eds cluster", func(t *testing.T) {
p := newProcessor()
p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 1)
p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5)
multiClusters := []*config_cluster_v3.Cluster{
{
Name: "ut-cluster1",
Expand Down Expand Up @@ -239,7 +241,7 @@ func TestHandleCdsResponse(t *testing.T) {

t.Run("multiClusters: remove cluster", func(t *testing.T) {
p := newProcessor()
p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 1)
p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5)
cluster := &config_cluster_v3.Cluster{
Name: "ut-cluster",
ClusterDiscoveryType: &config_cluster_v3.Cluster_Type{
Expand All @@ -257,40 +259,33 @@ func TestHandleCdsResponse(t *testing.T) {
}
err = p.handleCdsResponse(rsp)
assert.NoError(t, err)
dnsClusters := <-p.DnsResolverChan
assert.Equal(t, len(dnsClusters), 0)
newCluster1 := &config_cluster_v3.Cluster{
Name: "new-ut-cluster1",
ClusterDiscoveryType: &config_cluster_v3.Cluster_Type{
Type: config_cluster_v3.Cluster_LOGICAL_DNS,
},
}
newCluster2 := &config_cluster_v3.Cluster{
Name: "new-ut-cluster2",
ClusterDiscoveryType: &config_cluster_v3.Cluster_Type{
Type: config_cluster_v3.Cluster_EDS,
},
}
anyCluster1, err1 := anypb.New(newCluster1)
assert.NoError(t, err1)
anyCluster2, err2 := anypb.New(newCluster2)
assert.NoError(t, err2)
anyCluster1, err := anypb.New(newCluster1)
assert.NoError(t, err)
rsp = &service_discovery_v3.DiscoveryResponse{
Resources: []*anypb.Any{
anyCluster1,
anyCluster2,
},
}

err = p.handleCdsResponse(rsp)
assert.NoError(t, err)

dnsClusters = <-p.DnsResolverChan
assert.Equal(t, len(dnsClusters), 0)
// only cluster2 is eds typed
assert.Equal(t, []string{"new-ut-cluster2"}, p.Cache.edsClusterNames)
assert.Equal(t, []string{"new-ut-cluster1"}, p.Cache.edsClusterNames)
wantHash1 := hash.Sum64String(anyCluster1.String())
wantHash2 := hash.Sum64String(anyCluster2.String())
actualHash1 := p.Cache.ClusterCache.GetCdsHash(newCluster1.GetName())
assert.Equal(t, wantHash1, actualHash1)
actualHash2 := p.Cache.ClusterCache.GetCdsHash(newCluster2.GetName())
assert.Equal(t, wantHash2, actualHash2)
assert.Equal(t, []string{"new-ut-cluster2"}, p.req.ResourceNames)
assert.Equal(t, []string{"new-ut-cluster1"}, p.req.ResourceNames)
// `cluster` has been deleted
assert.Nil(t, p.Cache.ClusterCache.GetApiCluster(cluster.Name))
})
Expand Down
27 changes: 20 additions & 7 deletions pkg/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ var (
)

const (
MaxConcurrency uint32 = 5
RetryAfter = 5 * time.Millisecond
MaxConcurrency uint32 = 5
RetryAfter = 5 * time.Millisecond
DeRefreshInterval = 15 * time.Second
)

type DNSResolver struct {
Expand Down Expand Up @@ -232,6 +233,9 @@ func (r *DNSResolver) resolve(v *pendingResolveDomain) {
if ttl > v.refreshRate {
ttl = v.refreshRate
}
if ttl == 0 {
ttl = DeRefreshInterval
}
if !slices.Equal(entry.addresses, addrs) {
for _, c := range v.clusters {
ready := overwriteDnsCluster(c, v.domainName, addrs)
Expand Down Expand Up @@ -280,14 +284,23 @@ func (r *DNSResolver) refreshDNS() bool {
return true
}

func (r *DNSResolver) GetCacheResult(name string) []string {
var res []string
func (r *DNSResolver) GetDNSAddresses(domain string) []string {
r.RLock()
defer r.RUnlock()
if entry, ok := r.cache[name]; ok {
res = entry.addresses
if entry, ok := r.cache[domain]; ok {
return entry.addresses
}
return nil
}

func (r *DNSResolver) GetAllCachedDomains() []string {
r.RLock()
defer r.RUnlock()
out := make([]string, 0, len(r.cache))
for domain := range r.cache {
out = append(out, domain)
}
return res
return out
}

// doResolve is copied and adapted from github.com/istio/istio/pilot/pkg/model/network.go.
Expand Down
106 changes: 103 additions & 3 deletions pkg/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"math/rand"
"net"
"reflect"
"slices"
"sync"
"testing"
"time"
Expand All @@ -31,7 +30,10 @@ import (
v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/miekg/dns"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/test/util/retry"

core_v2 "kmesh.net/kmesh/api/v2/core"
"kmesh.net/kmesh/pkg/controller/ads"
Expand All @@ -55,6 +57,7 @@ func TestDNS(t *testing.T) {
t.Fatal(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
testDNSResolver.StartDNSResolver(stopCh)
testDNSResolver.resolvConfServers = []string{fakeDNSServer.Server.PacketConn.LocalAddr().String()}

Expand Down Expand Up @@ -144,7 +147,7 @@ func TestDNS(t *testing.T) {

time.Sleep(2 * time.Second)

res := testDNSResolver.GetCacheResult(testcase.domain)
res := testDNSResolver.GetDNSAddresses(testcase.domain)
if len(res) != 0 || len(testcase.expected) != 0 {
if !reflect.DeepEqual(res, testcase.expected) {
t.Errorf("dns resolve for %s do not match. \n got %v\nwant %v", testcase.domain, res, testcase.expected)
Expand All @@ -153,7 +156,7 @@ func TestDNS(t *testing.T) {
if testcase.expectedAfterTTL != nil {
ttl := time.Duration(math.Min(float64(testcase.ttl), float64(testcase.refreshRate)))
time.Sleep(ttl + 1)
res = testDNSResolver.GetCacheResult(testcase.domain)
res = testDNSResolver.GetDNSAddresses(testcase.domain)
if !reflect.DeepEqual(res, testcase.expectedAfterTTL) {
t.Errorf("dns refresh after ttl failed, for %s do not match. \n got %v\nwant %v", testcase.domain, res, testcase.expectedAfterTTL)
}
Expand Down Expand Up @@ -421,3 +424,100 @@ func TestGetPendingResolveDomain(t *testing.T) {
})
}
}

func TestHandleCdsResponseWithDns(t *testing.T) {
cluster1 := &clusterv3.Cluster{
Name: "ut-cluster1",
ClusterDiscoveryType: &clusterv3.Cluster_Type{
Type: clusterv3.Cluster_LOGICAL_DNS,
},
LoadAssignment: &endpointv3.ClusterLoadAssignment{
Endpoints: []*endpointv3.LocalityLbEndpoints{
{
LbEndpoints: []*endpointv3.LbEndpoint{
{
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &v3.Address{
Address: &v3.Address_SocketAddress{
SocketAddress: &v3.SocketAddress{
Address: "foo.bar",
PortSpecifier: &v3.SocketAddress_PortValue{
PortValue: uint32(9898),
},
},
},
},
},
},
},
},
},
},
},
}
cluster2 := &clusterv3.Cluster{
Name: "ut-cluster2",
ClusterDiscoveryType: &clusterv3.Cluster_Type{
Type: clusterv3.Cluster_STRICT_DNS,
},
LoadAssignment: &endpointv3.ClusterLoadAssignment{
Endpoints: []*endpointv3.LocalityLbEndpoints{
{
LbEndpoints: []*endpointv3.LbEndpoint{
{
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &v3.Address{
Address: &v3.Address_SocketAddress{
SocketAddress: &v3.SocketAddress{
Address: "foo.baz",
PortSpecifier: &v3.SocketAddress_PortValue{
PortValue: uint32(9898),
},
},
},
},
},
},
},
},
},
},
},
}

testcases := []struct {
name string
clusters []*clusterv3.Cluster
expected []string
}{
{
name: "add clusters with DNS type",
clusters: []*clusterv3.Cluster{cluster1, cluster2},
expected: []string{"foo.bar", "foo.baz"},
},
{
name: "remove all DNS type clusters",
clusters: []*clusterv3.Cluster{},
expected: []string{},
},
}

p := ads.NewController().Processor
stopCh := make(chan struct{})
defer close(stopCh)
dnsResolver, err := NewDNSResolver(ads.NewAdsCache())
assert.NoError(t, err)
dnsResolver.StartDNSResolver(stopCh)
p.DnsResolverChan = dnsResolver.DnsResolverChan
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
// notify dns resolver
dnsResolver.DnsResolverChan <- tc.clusters
retry.UntilOrFail(t, func() bool {
return slices.EqualUnordered(tc.expected, dnsResolver.GetAllCachedDomains())
}, retry.Timeout(1*time.Second))
})
}
}

0 comments on commit ad3d9cd

Please sign in to comment.