Skip to content

Commit 52532e1

Browse files
authored
Fix remotely enabling/disabling features (#1088)
1 parent 906b54a commit 52532e1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+289
-170
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,12 @@ OS := $(shell uname -s | tr '[:upper:]' '[:lower:]')
6868
uname_m := $(shell uname -m)
6969

7070
ifeq ($(uname_m),aarch64)
71-
OSARCH = arm64
71+
OSARCH ?= arm64
7272
else
7373
ifeq ($(uname_m),x86_64)
74-
OSARCH = amd64
74+
OSARCH ?= amd64
7575
else
76-
OSARCH = $(uname_m)
76+
OSARCH ?= $(uname_m)
7777
endif
7878
endif
7979

src/core/environment.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ func (env *EnvironmentType) IsContainer() bool {
342342
res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) {
343343
for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} {
344344
if _, err := os.Stat(filename); err == nil {
345-
log.Debugf("Is a container because (%s) exists", filename)
346345
return true, nil
347346
}
348347
}

src/core/metrics/collectors/nginx.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type NginxCollector struct {
2929
}
3030

3131
func NewNginxCollector(conf *config.Config, env core.Environment, collectorConf *metrics.NginxCollectorConfig, binary core.NginxBinary) *NginxCollector {
32+
log.Debugf("Creating NGINX Collector")
3233
host := env.NewHostInfo("agentVersion", &conf.Tags, conf.ConfigDirs, false)
3334
dimensions := metrics.NewCommonDim(host, conf, collectorConf.NginxId)
3435
dimensions.NginxConfPath = collectorConf.ConfPath
@@ -45,6 +46,7 @@ func NewNginxCollector(conf *config.Config, env core.Environment, collectorConf
4546
}
4647

4748
func buildSources(dimensions *metrics.CommonDim, binary core.NginxBinary, collectorConf *metrics.NginxCollectorConfig, conf *config.Config, env core.Environment) []metrics.NginxSource {
49+
log.Debugf("Building NGINX metric sources")
4850
var nginxSources []metrics.NginxSource
4951
// worker metrics
5052
if len(conf.Nginx.NginxCountingSocket) > 0 && conf.IsFeatureEnabled(agent_config.FeatureNginxCounting) {
@@ -68,6 +70,7 @@ func buildSources(dimensions *metrics.CommonDim, binary core.NginxBinary, collec
6870
nginxSources = append(nginxSources, sources.NewNginxStatic(dimensions, sources.OSSNamespace))
6971
}
7072
}
73+
7174
return nginxSources
7275
}
7376

src/core/metrics/sources/nginx_plus.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type ExtendedStats struct {
7676
}
7777

7878
func NewNginxPlus(baseDimensions *metrics.CommonDim, nginxNamespace, plusNamespace, plusAPI string, clientVersion int) *NginxPlus {
79+
log.Debug("Creating NGINX Plus metrics source")
7980
return &NginxPlus{baseDimensions: baseDimensions, nginxNamespace: nginxNamespace, plusNamespace: plusNamespace, plusAPI: plusAPI, clientVersion: clientVersion, logger: NewMetricSourceLogger()}
8081
}
8182

src/core/pipe.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
7171
pluginsRegistered := []string{}
7272
extensionPluginsRegistered := []string{}
7373

74-
for _, plugin := range p.plugins {
74+
for _, plugin := range plugins {
7575
for _, subscription := range plugin.Subscriptions() {
7676
p.regMu.Lock()
7777
err := p.bus.Subscribe(subscription, plugin.Process)
@@ -83,7 +83,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
8383
pluginsRegistered = append(pluginsRegistered, *plugin.Info().name)
8484
}
8585

86-
for _, plugin := range p.extensionPlugins {
86+
for _, plugin := range extensionPlugins {
8787
for _, subscription := range plugin.Subscriptions() {
8888
p.regMu.Lock()
8989
err := p.bus.Subscribe(subscription, plugin.Process)
@@ -94,6 +94,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
9494
}
9595
extensionPluginsRegistered = append(extensionPluginsRegistered, *plugin.Info().name)
9696
}
97+
9798
log.Infof("The following core plugins have been registered: %q", pluginsRegistered)
9899
log.Infof("The following extension plugins have been registered: %q", extensionPluginsRegistered)
99100

src/plugins/agent_api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ func (a *AgentAPI) Close() {
191191
if err := a.server.Shutdown(context.Background()); err != nil {
192192
log.Errorf("Agent API HTTP Server Shutdown Error: %v", err)
193193
}
194+
log.Info("Agent API is closed")
194195
}
195196

196197
func (a *AgentAPI) Process(message *core.Message) {

src/plugins/commander.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ func NewCommander(cmdr client.Commander, config *config.Config) *Commander {
3838
}
3939

4040
func (c *Commander) Init(pipeline core.MessagePipeInterface) {
41+
log.Info("Commander initializing")
4142
c.pipeline = pipeline
4243
c.ctx = pipeline.Context()
43-
log.Info("Commander initializing")
4444
go c.dispatchLoop()
4545
}
4646

4747
func (c *Commander) Close() {
48-
log.Info("Commander is wrapping up")
48+
log.Info("Commander is closed")
4949
}
5050

5151
func (c *Commander) Info() *core.Info {
@@ -158,7 +158,6 @@ func (c *Commander) dispatchLoop() {
158158
case *proto.Command_AgentConnectRequest, *proto.Command_AgentConnectResponse:
159159
topic = core.AgentConnected
160160
case *proto.Command_AgentConfigRequest, *proto.Command_AgentConfig:
161-
log.Debugf("agent config %T command data type received and ignored", cmd.GetData())
162161
topic = core.AgentConfig
163162
case *proto.Command_CmdStatus:
164163
data := cmd.GetData().(*proto.Command_CmdStatus)

src/plugins/common.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/nginx/agent/v2/src/core/config"
77
"github.com/nginx/agent/v2/src/extensions"
88
log "github.com/sirupsen/logrus"
9+
"go.uber.org/atomic"
910

1011
agent_config "github.com/nginx/agent/sdk/v2/agent/config"
1112
"github.com/nginx/agent/sdk/v2/agent/events"
@@ -36,15 +37,15 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E
3637

3738
if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil {
3839
corePlugins = append(corePlugins,
39-
NewMetricsSender(reporter),
40+
NewMetricsSender(reporter, atomic.NewBool(false)),
4041
)
4142
}
4243

4344
corePlugins = append(corePlugins,
4445
NewConfigReader(loadedConfig),
4546
NewNginx(commander, binary, env, loadedConfig, processes),
4647
NewExtensions(loadedConfig, env),
47-
NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version, processes, agentEventsMeta),
48+
NewFeatures(commander, reporter, loadedConfig, env, binary, loadedConfig.Version, processes, agentEventsMeta),
4849
)
4950

5051
if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) {

src/plugins/config_reader.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func NewConfigReader(config *config.Config) *ConfigReader {
3737
}
3838

3939
func (r *ConfigReader) Init(pipeline core.MessagePipeInterface) {
40+
log.Info("ConfigReader initializing")
4041
r.messagePipeline = pipeline
4142
}
4243

@@ -45,7 +46,7 @@ func (r *ConfigReader) Info() *core.Info {
4546
}
4647

4748
func (r *ConfigReader) Close() {
48-
log.Info("ConfigReader is wrapping up")
49+
log.Info("ConfigReader is closed")
4950
}
5051

5152
func (r *ConfigReader) Process(msg *core.Message) {
@@ -73,8 +74,10 @@ func (r *ConfigReader) Process(msg *core.Message) {
7374
// Update the agent config on disk
7475
switch commandData := cmd.Data.(type) {
7576
case *proto.Command_AgentConfig:
77+
log.Debugf("Config reader: AgentConfig message recevied: %v, topic: %v", commandData, msg.Topic())
7678
r.updateAgentConfig(commandData.AgentConfig)
7779
case *proto.Command_AgentConnectResponse:
80+
log.Debugf("Config reader: AgentConnectResponse message recevied: %v, topic: %v", commandData, msg.Topic())
7881
r.updateAgentConfig(commandData.AgentConnectResponse.AgentConfig)
7982
}
8083
}
@@ -152,7 +155,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig)
152155
}
153156

154157
if synchronizeFeatures {
158+
log.Info("Agent config features changed, synchronizing features")
155159
r.synchronizeFeatures(payloadAgentConfig)
160+
r.config.Features = payloadAgentConfig.Details.Features
156161
}
157162

158163
r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig))
@@ -164,6 +169,7 @@ func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) {
164169
r.detailsMu.RLock()
165170
for _, feature := range r.config.Features {
166171
if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync {
172+
log.Debugf("Deregistering the feature %s", feature)
167173
r.deRegisterPlugin(feature)
168174
}
169175
}
@@ -177,16 +183,19 @@ func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) {
177183

178184
func (r *ConfigReader) deRegisterPlugin(data string) {
179185
if data == agent_config.FeatureFileWatcher {
180-
181186
err := r.messagePipeline.DeRegister([]string{agent_config.FeatureFileWatcher, agent_config.FeatureFileWatcherThrottle})
182187
if err != nil {
183-
log.Warnf("Error De-registering %v Plugin: %v", data, err)
188+
log.Warnf("Error deregistering %v plugin: %v", data, err)
189+
}
190+
} else if data == agent_config.FeatureMetrics {
191+
err := r.messagePipeline.DeRegister([]string{agent_config.FeatureMetrics, agent_config.FeatureMetricsThrottle, agent_config.FeatureMetricsSender})
192+
if err != nil {
193+
log.Warnf("Error deregistering %v plugin: %v", data, err)
184194
}
185-
186195
} else {
187196
err := r.messagePipeline.DeRegister([]string{data})
188197
if err != nil {
189-
log.Warnf("Error De-registering %v Plugin: %v", data, err)
198+
log.Warnf("Error deregistering %v plugin: %v", data, err)
190199
}
191200
}
192201
}

src/plugins/dataplane_status.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,26 @@ import (
2323
)
2424

2525
type DataPlaneStatus struct {
26-
messagePipeline core.MessagePipeInterface
27-
ctx context.Context
28-
sendStatus chan bool
29-
healthTicker *time.Ticker
30-
interval time.Duration
31-
meta *proto.Metadata
32-
binary core.NginxBinary
33-
env core.Environment
34-
version string
35-
tags *[]string
36-
configDirs string
37-
lastSendDetails time.Time
38-
envHostInfo *proto.HostInfo
39-
reportInterval time.Duration
40-
softwareDetails map[string]*proto.DataplaneSoftwareDetails
41-
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
42-
softwareDetailsMutex sync.RWMutex
43-
structMu sync.RWMutex
44-
processes []*core.Process
26+
messagePipeline core.MessagePipeInterface
27+
ctx context.Context
28+
sendStatus chan bool
29+
healthTicker *time.Ticker
30+
interval time.Duration
31+
meta *proto.Metadata
32+
binary core.NginxBinary
33+
env core.Environment
34+
version string
35+
tags *[]string
36+
configDirs string
37+
lastSendDetails time.Time
38+
envHostInfo *proto.HostInfo
39+
reportInterval time.Duration
40+
softwareDetails map[string]*proto.DataplaneSoftwareDetails
41+
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
42+
nginxConfigActivityStatusesMutex sync.RWMutex
43+
softwareDetailsMutex sync.RWMutex
44+
structMu sync.RWMutex
45+
processes []*core.Process
4546
}
4647

4748
const (
@@ -81,12 +82,15 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) {
8182

8283
func (dps *DataPlaneStatus) Close() {
8384
log.Info("DataPlaneStatus is wrapping up")
85+
dps.nginxConfigActivityStatusesMutex.Lock()
8486
dps.nginxConfigActivityStatuses = nil
87+
dps.nginxConfigActivityStatusesMutex.Unlock()
8588
dps.softwareDetailsMutex.Lock()
8689
dps.softwareDetails = nil
8790
dps.softwareDetailsMutex.Unlock()
8891
dps.healthTicker.Stop()
8992
dps.sendStatus <- true
93+
log.Info("DataPlaneStatus is closed")
9094
}
9195

9296
func (dps *DataPlaneStatus) Info() *core.Info {
@@ -144,8 +148,10 @@ func (dps *DataPlaneStatus) Subscriptions() []string {
144148

145149
func (dps *DataPlaneStatus) updateNginxConfigActivityStatuses(newAgentActivityStatus *proto.AgentActivityStatus) {
146150
log.Tracef("DataplaneStatus: Updating nginxConfigActivityStatuses with %v", newAgentActivityStatus)
147-
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); ok {
151+
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); dps.nginxConfigActivityStatuses != nil && ok {
152+
dps.nginxConfigActivityStatusesMutex.Lock()
148153
dps.nginxConfigActivityStatuses[newAgentActivityStatus.GetNginxConfigStatus().GetNginxId()] = newAgentActivityStatus
154+
dps.nginxConfigActivityStatusesMutex.Unlock()
149155
}
150156
}
151157

@@ -184,6 +190,8 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface)
184190
func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus {
185191
forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails)
186192

193+
dps.nginxConfigActivityStatusesMutex.Lock()
194+
defer dps.nginxConfigActivityStatusesMutex.Unlock()
187195
agentActivityStatuses := []*proto.AgentActivityStatus{}
188196
for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses {
189197
agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus)

src/plugins/events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (a *Events) Init(pipeline core.MessagePipeInterface) {
6363
}
6464

6565
func (a *Events) Close() {
66-
log.Info("Events is wrapping up")
66+
log.Info("Events is closed")
6767
}
6868

6969
func (a *Events) Process(msg *core.Message) {

src/plugins/extensions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (e *Extensions) Init(pipeline core.MessagePipeInterface) {
3636
}
3737

3838
func (e *Extensions) Close() {
39-
log.Info("Extensions is wrapping up")
39+
log.Info("Extensions is closed")
4040
}
4141

4242
func (e *Extensions) Process(msg *core.Message) {

0 commit comments

Comments
 (0)