Skip to content

Commit

Permalink
attempt to deflake ggv2 setup test (solo-io#10410)
Browse files Browse the repository at this point in the history
Co-authored-by: Sam Heilbron <[email protected]>
  • Loading branch information
yuval-k and sam-heilbron authored Dec 2, 2024
1 parent 951dfcb commit 53c1aab
Show file tree
Hide file tree
Showing 11 changed files with 572 additions and 196 deletions.
6 changes: 6 additions & 0 deletions changelog/v1.18.0-rc3/defalke-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
changelog:
- type: NON_USER_FACING
description: >-
Deflake a test
skipCI-kube-tests:true
4 changes: 2 additions & 2 deletions projects/gateway2/proxy_syncer/perclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

func snapshotPerClient(l *zap.Logger, dbg *krt.DebugHandler, uccCol krt.Collection[krtcollections.UniqlyConnectedClient],
mostXdsSnapshots krt.Collection[xdsSnapWrapper], endpoints PerClientEnvoyEndpoints, clusters PerClientEnvoyClusters) krt.Collection[xdsSnapWrapper] {
mostXdsSnapshots krt.Collection[XdsSnapWrapper], endpoints PerClientEnvoyEndpoints, clusters PerClientEnvoyClusters) krt.Collection[XdsSnapWrapper] {

xdsSnapshotsForUcc := krt.NewCollection(uccCol, func(kctx krt.HandlerContext, ucc krtcollections.UniqlyConnectedClient) *xdsSnapWrapper {
xdsSnapshotsForUcc := krt.NewCollection(uccCol, func(kctx krt.HandlerContext, ucc krtcollections.UniqlyConnectedClient) *XdsSnapWrapper {
maybeMostlySnap := krt.FetchOne(kctx, mostXdsSnapshots, krt.FilterKey(ucc.Role))
if maybeMostlySnap == nil {
l.Debug("snapshotPerClient - snapshot missing", zap.String("proxyKey", ucc.Role))
Expand Down
50 changes: 16 additions & 34 deletions projects/gateway2/proxy_syncer/proxy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type ProxySyncer struct {
proxyReconcileQueue ggv2utils.AsyncQueue[gloov1.ProxyList]

statusReport krt.Singleton[report]
mostXdsSnapshots krt.Collection[xdsSnapWrapper]
perclientSnapCollection krt.Collection[xdsSnapWrapper]
mostXdsSnapshots krt.Collection[XdsSnapWrapper]
perclientSnapCollection krt.Collection[XdsSnapWrapper]
proxiesToReconcile krt.Singleton[proxyList]
proxyTrigger *krt.RecomputeTrigger

Expand Down Expand Up @@ -184,24 +184,6 @@ func NewProxyTranslator(translator setup.TranslatorFactory,
}
}

type xdsSnapWrapper struct {
snap *xds.EnvoySnapshot
proxyKey string
proxyWithReport translatorutils.ProxyWithReports
pluginRegistry registry.PluginRegistry
fullReports reporter.ResourceReports
}

var _ krt.ResourceNamer = xdsSnapWrapper{}

func (p xdsSnapWrapper) Equals(in xdsSnapWrapper) bool {
return p.snap.Equal(in.snap)
}

func (p xdsSnapWrapper) ResourceName() string {
return p.proxyKey
}

type RedactedSecret krtcollections.ResourceWrapper[*gloov1.Secret]

func (us RedactedSecret) ResourceName() string {
Expand Down Expand Up @@ -231,7 +213,7 @@ func (l RedactedSecret) MarshalJSON() ([]byte, error) {
var _ json.Marshaler = RedactedSecret{}

type glooProxy struct {
proxy *gloov1.Proxy
Proxy *gloov1.Proxy
// plugins used to generate this proxy
pluginRegistry registry.PluginRegistry
// the GWAPI reports generated for translation from a GW->Proxy
Expand All @@ -242,7 +224,7 @@ type glooProxy struct {
var _ krt.ResourceNamer = glooProxy{}

func (p glooProxy) Equals(in glooProxy) bool {
if !proto.Equal(p.proxy, in.proxy) {
if !proto.Equal(p.Proxy, in.Proxy) {
return false
}
if !maps.Equal(p.reportMap.Gateways, in.reportMap.Gateways) {
Expand All @@ -258,7 +240,7 @@ func (p glooProxy) Equals(in glooProxy) bool {
}

func (p glooProxy) ResourceName() string {
return xds.SnapshotCacheKey(p.proxy)
return xds.SnapshotCacheKey(p.Proxy)
}

type report struct {
Expand Down Expand Up @@ -353,7 +335,7 @@ func (s *ProxySyncer) Init(ctx context.Context, dbg *krt.DebugHandler) error {
ctx,
s.istioClient,
rlkubev1a1.SchemeGroupVersion.WithResource("ratelimitconfigs"),
krt.WithName("KubeRateLimitConfwithDebug,igs"),
krt.WithName("KubeRateLimitConfigs"), withDebug,
)

upstreams := SetupCollectionDynamic[glookubev1.Upstream](
Expand Down Expand Up @@ -424,7 +406,7 @@ func (s *ProxySyncer) Init(ctx context.Context, dbg *krt.DebugHandler) error {
proxy := s.buildProxy(ctx, gw)
return proxy
}, withDebug, krt.WithName("GlooProxies"))
s.mostXdsSnapshots = krt.NewCollection(glooProxies, func(kctx krt.HandlerContext, proxy glooProxy) *xdsSnapWrapper {
s.mostXdsSnapshots = krt.NewCollection(glooProxies, func(kctx krt.HandlerContext, proxy glooProxy) *XdsSnapWrapper {
// we are recomputing xds snapshots as proxies have changed, signal that we need to sync xds with these new snapshots
xdsSnap := s.translateProxy(
ctx,
Expand Down Expand Up @@ -455,7 +437,7 @@ func (s *ProxySyncer) Init(ctx context.Context, dbg *krt.DebugHandler) error {
proxies := krt.Fetch(kctx, glooProxies)
var l gloov1.ProxyList
for _, p := range proxies {
l = append(l, p.proxy)
l = append(l, p.Proxy)
}
return &proxyList{l}
})
Expand Down Expand Up @@ -593,7 +575,7 @@ func (s *ProxySyncer) Start(ctx context.Context) error {
}
}()

s.perclientSnapCollection.RegisterBatch(func(o []krt.Event[xdsSnapWrapper], initialSync bool) {
s.perclientSnapCollection.RegisterBatch(func(o []krt.Event[XdsSnapWrapper], initialSync bool) {
for _, e := range o {
if e.Event != controllers.EventDelete {
snapWrap := e.Latest()
Expand Down Expand Up @@ -674,7 +656,7 @@ func (s *ProxySyncer) buildProxy(ctx context.Context, gw *gwv1.Gateway) *glooPro
})

return &glooProxy{
proxy: proxy,
Proxy: proxy,
pluginRegistry: pluginRegistry,
reportMap: rm,
}
Expand All @@ -691,7 +673,7 @@ func (s *ProxySyncer) translateProxy(
kus krt.Collection[krtcollections.UpstreamWrapper],
authConfigs krt.Collection[*extauthkubev1.AuthConfig],
rlConfigs krt.Collection[*rlkubev1a1.RateLimitConfig],
) *xdsSnapWrapper {
) *XdsSnapWrapper {
cfgmaps := krt.Fetch(kctx, kcm)
endpoints := krt.Fetch(kctx, kep)
secrets := krt.Fetch(kctx, ks)
Expand All @@ -705,7 +687,7 @@ func (s *ProxySyncer) translateProxy(
// see also: https://github.com/solo-io/solo-projects/issues/7080

latestSnap := gloosnapshot.ApiSnapshot{}
latestSnap.Proxies = gloov1.ProxyList{proxy.proxy}
latestSnap.Proxies = gloov1.ProxyList{proxy.Proxy}

acfgs := make([]*extauthv1.AuthConfig, 0, len(authcfgs))
for _, kac := range authcfgs {
Expand Down Expand Up @@ -746,18 +728,18 @@ func (s *ProxySyncer) translateProxy(
}
latestSnap.Upstreams = gupstreams

xdsSnapshot, reports, proxyReport := s.proxyTranslator.buildXdsSnapshot(kctx, ctx, proxy.proxy, &latestSnap)
xdsSnapshot, reports, proxyReport := s.proxyTranslator.buildXdsSnapshot(kctx, ctx, proxy.Proxy, &latestSnap)

// TODO(Law): now we not able to merge reports after translation!

// build ResourceReports struct containing only this Proxy
r := make(reporter.ResourceReports)
filteredReports := reports.FilterByKind("Proxy")
r[proxy.proxy] = filteredReports[proxy.proxy]
r[proxy.Proxy] = filteredReports[proxy.Proxy]

// build object used by status plugins
proxyWithReport := translatorutils.ProxyWithReports{
Proxy: proxy.proxy,
Proxy: proxy.Proxy,
Reports: translatorutils.TranslationReports{
ProxyReport: proxyReport,
ResourceReports: r,
Expand All @@ -781,7 +763,7 @@ func (s *ProxySyncer) translateProxy(
zap.Stringer("Routes", resourcesStringer(envoySnap.Routes)),
zap.Stringer("Endpoints", resourcesStringer(envoySnap.Endpoints)),
)
out := xdsSnapWrapper{
out := XdsSnapWrapper{
snap: envoySnap,
proxyKey: proxy.ResourceName(),
proxyWithReport: proxyWithReport,
Expand Down
212 changes: 212 additions & 0 deletions projects/gateway2/proxy_syncer/xdswrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package proxy_syncer

import (
"encoding/json"
"fmt"

udpaannontations "github.com/cncf/xds/go/udpa/annotations"
"github.com/solo-io/gloo/pkg/utils/envutils"
"github.com/solo-io/gloo/projects/gloo/pkg/xds"

"github.com/solo-io/solo-kit/pkg/api/v2/reporter"
"istio.io/istio/pkg/kube/krt"

"github.com/solo-io/gloo/projects/gateway2/translator/plugins/registry"
"github.com/solo-io/gloo/projects/gateway2/translator/translatorutils"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/anypb"

oldproto "github.com/golang/protobuf/proto"
)

var (
UseDetailedUnmarshalling = envutils.IsEnvTruthy("DETAILED_SNAP_UNMARSHALLING")
)

type XdsSnapWrapper struct {
snap *xds.EnvoySnapshot
proxyKey string
proxyWithReport translatorutils.ProxyWithReports
pluginRegistry registry.PluginRegistry
fullReports reporter.ResourceReports
}

func (p XdsSnapWrapper) WithSnapshot(snap *xds.EnvoySnapshot) XdsSnapWrapper {
p.snap = snap
return p
}

var _ krt.ResourceNamer = XdsSnapWrapper{}

func (p XdsSnapWrapper) Equals(in XdsSnapWrapper) bool {
return p.snap.Equal(in.snap)
}

func (p XdsSnapWrapper) ResourceName() string {
return p.proxyKey
}

// note: this is feature gated, as i'm not confident the new logic can't panic, in all envoy configs
// once 1.18 is out, we can remove the feature gate.
func (p XdsSnapWrapper) MarshalJSON() (out []byte, err error) {
if !UseDetailedUnmarshalling {
// use a new struct to prevent infinite recursion
return json.Marshal(struct {
snap *xds.EnvoySnapshot
proxyKey string
proxyWithReport translatorutils.ProxyWithReports
pluginRegistry registry.PluginRegistry
fullReports reporter.ResourceReports
}{
snap: p.snap,
proxyKey: p.proxyKey,
proxyWithReport: p.proxyWithReport,
pluginRegistry: p.pluginRegistry,
fullReports: p.fullReports,
})
}

snap := p.snap.Clone().(*xds.EnvoySnapshot)

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic handling snapshot: %v", r)
}
}()

// redact things
redact(snap)
snapJson := map[string][]any{}
for _, r := range snap.Listeners.Items {
rJson, _ := protojson.Marshal(r.ResourceProto().(proto.Message))
var rAny any
json.Unmarshal(rJson, &rAny)
snapJson["Listeners"] = append(snapJson["Listeners"], rAny)
}
for _, r := range snap.Clusters.Items {
rJson, _ := protojson.Marshal(r.ResourceProto().(proto.Message))
var rAny any
json.Unmarshal(rJson, &rAny)
snapJson["Clusters"] = append(snapJson["Clusters"], rAny)
}

for _, r := range snap.Routes.Items {
rJson, _ := protojson.Marshal(r.ResourceProto().(proto.Message))
var rAny any
json.Unmarshal(rJson, &rAny)
snapJson["Routes"] = append(snapJson["Routes"], rAny)
}

for _, r := range snap.Endpoints.Items {
rJson, _ := protojson.Marshal(r.ResourceProto().(proto.Message))
var rAny any
json.Unmarshal(rJson, &rAny)
snapJson["Endpoints"] = append(snapJson["Endpoints"], rAny)
}

return json.Marshal(struct {
Snap any
ProxyKey string
}{
Snap: snapJson,
ProxyKey: p.proxyKey,
})
}

func redact(snap *xds.EnvoySnapshot) {
// clusters and listener might have secrets
for _, l := range snap.Listeners.Items {
redactProto(l.ResourceProto())
}
for _, l := range snap.Clusters.Items {
redactProto(l.ResourceProto())
}
}

func redactProto(m oldproto.Message) {
var msg proto.Message = m.(proto.Message)
visitFields(msg.ProtoReflect(), false)
}

func isSensitive(fd protoreflect.FieldDescriptor) bool {
opts := fd.Options().(*descriptorpb.FieldOptions)
if !proto.HasExtension(opts, udpaannontations.E_Sensitive) {
return false
}

maybeExt := proto.GetExtension(opts, udpaannontations.E_Sensitive)
return maybeExt.(bool)
}

func visitFields(msg protoreflect.Message, ancestor_sensitive bool) {
msg.Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool {
sensitive := ancestor_sensitive || isSensitive(fd)

if fd.IsList() {
list := v.List()
for i := 0; i < list.Len(); i++ {
elem := list.Get(i)
if fd.Message() != nil {
visitMessage(msg, fd, elem, sensitive)
} else {
// Redact scalar fields if needed
if sensitive {
list.Set(i, redactValue(fd, elem))
}
}
}
} else if fd.IsMap() {
m := v.Map()
m.Range(func(k protoreflect.MapKey, v protoreflect.Value) bool {
if fd.MapValue().Message() != nil {
visitMessage(msg, fd.MapValue(), v, sensitive)
} else {
// Redact scalar fields if needed
if sensitive {
m.Set(k, redactValue(fd.MapValue(), v))
}
}
return true
})
} else {
if fd.Message() != nil {
visitMessage(msg, fd, v, sensitive)
} else {
// Redact scalar fields if needed
if sensitive {
msg.Set(fd, redactValue(fd, v))
}
}
}
return true
})
}

func visitMessage(msg protoreflect.Message, fd protoreflect.FieldDescriptor, v protoreflect.Value, sensitive bool) {
visitMsg := v.Message()
var anyMsg proto.Message
m := visitMsg.Interface()
if anymsg, ok := m.(*anypb.Any); ok {
anyMsg, _ = anypb.UnmarshalNew(anymsg, proto.UnmarshalOptions{})
visitMsg = anyMsg.ProtoReflect()

}
visitFields(visitMsg, sensitive)
if anyMsg != nil {
anymsg, _ := anypb.New(anyMsg)
msg.Set(fd, protoreflect.ValueOf(anymsg.ProtoReflect()))
}
}

func redactValue(fd protoreflect.FieldDescriptor, v protoreflect.Value) protoreflect.Value {
switch fd.Kind() {
case protoreflect.StringKind:
return protoreflect.ValueOfString("[REDACTED]")
case protoreflect.BytesKind:
return protoreflect.ValueOfBytes([]byte("[REDACTED]"))
}
return v
}
Loading

0 comments on commit 53c1aab

Please sign in to comment.