Skip to content

Commit

Permalink
[envoy]Specify IP Addr for Default Listener Case (magma#3718)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Yurchenko <[email protected]>
  • Loading branch information
koolzz authored Nov 14, 2020
1 parent a602ff3 commit a9b5a15
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"magma/feg/cloud/go/protos"
lte_proto "magma/lte/cloud/go/protos"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand Down Expand Up @@ -175,21 +176,23 @@ func newCallbacks(signal chan struct{}, fetches int, requests int) *callbacks {
}

func GetControllerClient() *ControllerClient {
cli := ControllerClient{}
ctx := context.Background()

glog.Infof("Starting Envoy control plane")

signal := make(chan struct{})
cb := newCallbacks(signal, 0, 0)

cli := ControllerClient{}
cli.config = cache.NewSnapshotCache(mode == Ads, Hasher{}, nil)

ctx := context.Background()
srv := xds.NewServer(ctx, cli.config, cb)

// start the xDS server
go RunManagementServer(ctx, srv, port)
go RunManagementGateway(ctx, srv, gatewayPort)

<-signal

cb.Report()

return &cli
Expand Down Expand Up @@ -251,7 +254,7 @@ func getVirtualHost(virtualHostName string, domains []string, requestHeadersToAd
}
}

func getFilterChains(ues []*protos.AddUEHeaderEnrichmentRequest, virtualHosts []*v2route.VirtualHost) []*listener.FilterChain {
func getUEFilterChains(ues []*protos.AddUEHeaderEnrichmentRequest) ([]*listener.FilterChain, error) {
filterChains := []*listener.FilterChain{}
for _, req := range ues {
var ue_ip_addr = string(req.UeIp.Address)
Expand All @@ -268,7 +271,8 @@ func getFilterChains(ues []*protos.AddUEHeaderEnrichmentRequest, virtualHosts []
requestHeadersToAdd = append(requestHeadersToAdd, headerValueOption)
}

virtualHosts = []*v2route.VirtualHost{getVirtualHost(virtualHostName, req.Websites, requestHeadersToAdd)}
virtualHosts := []*v2route.VirtualHost{getVirtualHost(virtualHostName, []string{"*"}, []*core.HeaderValueOption{})}
virtualHosts = append(virtualHosts, getVirtualHost(virtualHostName, req.Websites, requestHeadersToAdd))
pbst, err := ptypes.MarshalAny(getHttpConnectionManager(routeConfigName, virtualHosts))
if err != nil {
glog.Errorf("Couldn't marshal UE HTTP connection manager")
Expand All @@ -289,46 +293,24 @@ func getFilterChains(ues []*protos.AddUEHeaderEnrichmentRequest, virtualHosts []
FilterChainMatch: filterChainMatch,
Filters: filters,
})
glog.Infof("Returning virtual hosts %s", virtualHosts)
}
return filterChains

return filterChains, nil
}

func getListener(ues []*protos.AddUEHeaderEnrichmentRequest) ([]cache.Resource, error) {
virtualHosts := []*v2route.VirtualHost{getVirtualHost(virtualHostName, []string{"*"}, []*core.HeaderValueOption{})}
pbst, err := ptypes.MarshalAny(getHttpConnectionManager(defaultRouteName, virtualHosts))
glog.Infof("Creating listener " + listenerName)
filterChains, err := getUEFilterChains(ues)
if err != nil {
return nil, errors.New("Couldn't marshal HTTP connection manager")
return nil, err
}

filterChains := []*listener.FilterChain{{
FilterChainMatch: &listener.FilterChainMatch{},
Filters: []*listener.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: pbst,
},
}},
}}

o_src := &orig_src.OriginalSrc{}
mo_src, err := ptypes.MarshalAny(o_src)
if err != nil {
return nil, errors.New("Couldn't marshal OriginalSrc")
}

filterChains = append(filterChains, getFilterChains(ues, virtualHosts)...)

glog.Infof("Creating listener " + listenerName)
address := &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Address: any_addr,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: httpPort,
},
},
},
}
listenerFilters := []*listener.ListenerFilter{
{
Name: "envoy.filters.listener.original_dst",
Expand All @@ -340,6 +322,18 @@ func getListener(ues []*protos.AddUEHeaderEnrichmentRequest) ([]cache.Resource,
},
},
}

address := &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Address: any_addr,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: httpPort,
},
},
},
}

var listener = []cache.Resource{
&v2.Listener{
Name: listenerName,
Expand All @@ -349,10 +343,23 @@ func getListener(ues []*protos.AddUEHeaderEnrichmentRequest) ([]cache.Resource,
ListenerFilters: listenerFilters,
}}

glog.Infof("Returning listener %s", listener[0])
return listener, nil
}

func getDefaultReq() []*protos.AddUEHeaderEnrichmentRequest {
return []*protos.AddUEHeaderEnrichmentRequest{{
UeIp: &lte_proto.IPAddress{
Version: lte_proto.IPAddress_IPV4,
Address: []byte("0.0.0.0"),
},
Websites: []string{"0.0.0.0"},
Headers: []*protos.Header{},
}}
}

func (cli *ControllerClient) UpdateSnapshot(ues []*protos.AddUEHeaderEnrichmentRequest) {
glog.Infof("Updating Snapshot foe UEs %s", ues)
cluster := []cache.Resource{
&v2.Cluster{
Name: clusterName,
Expand All @@ -361,12 +368,16 @@ func (cli *ControllerClient) UpdateSnapshot(ues []*protos.AddUEHeaderEnrichmentR
LbPolicy: v2.Cluster_CLUSTER_PROVIDED,
},
}
nodeId := cli.config.GetStatusKeys()[0]

if len(ues) == 0 {
ues = getDefaultReq()
}
listener, err := getListener(ues)
if err != nil {
glog.Error(err)
return
if (err != nil) {
glog.Errorf("Get Listener error %s", err)
return
}
nodeId := cli.config.GetStatusKeys()[0]

atomic.AddInt32(&cli.version, 1)
glog.Infof("Saved snapshot version " + fmt.Sprint(cli.version))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (s *envoyControllerService) AddUEHeaderEnrichment(
req *protos.AddUEHeaderEnrichmentRequest,
) (*protos.AddUEHeaderEnrichmentResult, error) {
glog.Infof("AddUEHeaderEnrichmentResult received for IP %s", req.UeIp.Address)
glog.Infof("req %s", (req))
s.ue_infos = add_to_list(s.ue_infos, req)
s.controller_cli.UpdateSnapshot(s.ue_infos)

Expand All @@ -46,6 +47,7 @@ func (s *envoyControllerService) DeactivateUEHeaderEnrichment(
req *protos.DeactivateUEHeaderEnrichmentRequest,
) (*protos.DeactivateUEHeaderEnrichmentResult, error) {
glog.Infof("DeactivateUEHeaderEnrichmentResult received for IP %s", req.UeIp.Address)
glog.Infof("req %s", (req))
s.ue_infos = remove_from_list(s.ue_infos, req.UeIp)
s.controller_cli.UpdateSnapshot(s.ue_infos)

Expand All @@ -69,9 +71,9 @@ func remove_from_list(l []*protos.AddUEHeaderEnrichmentRequest, ip *lte_proto.IP
func add_to_list(l []*protos.AddUEHeaderEnrichmentRequest, new *protos.AddUEHeaderEnrichmentRequest) []*protos.AddUEHeaderEnrichmentRequest {
for i, other := range l {
if string(other.UeIp.Address) == string(new.UeIp.Address) {
// Overwrite duplicate UE
ret := append(l[:i], l[i+1:]...)
return append(ret, new)
// Overwrite duplicate UE
ret := append(l[:i], l[i+1:]...)
return append(ret, new)
}
}
l = append(l, new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestEnvoyControllerInit(t *testing.T) {
cli.On("UpdateSnapshot", add_ue_reqs[1:]).Return()
_, err = srv.DeactivateUEHeaderEnrichment(ctx, deactivate_req)

// Make sure duplicate doesn't get reinserted but gets overwritten
// Make sure duplicate doesn't get reinserted but gets overwritten
cli.On("UpdateSnapshot", overwrtie_ue_req).Return()
_, err = srv.AddUEHeaderEnrichment(ctx, overwrtie_ue_req[0])

Expand Down

0 comments on commit a9b5a15

Please sign in to comment.