Skip to content

Commit

Permalink
scrape: catch errors when creating HTTP clients (prometheus#5182)
Browse files Browse the repository at this point in the history
* scrape: catch errors when creating HTTP clients

This change makes sure that no scrape pool is created with a nil HTTP
client.

Signed-off-by: Simon Pasquier <[email protected]>

* Address Tariq's comment

Signed-off-by: Simon Pasquier <[email protected]>

* Address Brian's comment

Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier authored Feb 13, 2019
1 parent 37e35f9 commit 12708ac
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 47 deletions.
27 changes: 18 additions & 9 deletions scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package scrape

import (
"fmt"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -104,26 +105,26 @@ func (m *Manager) reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
var sp *scrapePool
existing, ok := m.scrapePools[setName]
if !ok {
if _, ok := m.scrapePools[setName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue
}
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
sp, err := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}
m.scrapePools[setName] = sp
} else {
sp = existing
}

wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(sp, groups)
}(m.scrapePools[setName], groups)

}
m.mtxScrape.Unlock()
Expand Down Expand Up @@ -158,16 +159,24 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
}
m.scrapeConfigs = c

// Cleanup and reload pool if config has changed.
// Cleanup and reload pool if the configuration has changed.
var failed bool
for name, sp := range m.scrapePools {
if cfg, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
} else if !reflect.DeepEqual(sp.config, cfg) {
sp.reload(cfg)
err := sp.reload(cfg)
if err != nil {
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
failed = true
}
}
}

if failed {
return fmt.Errorf("failed to apply the new configuration")
}
return nil
}

Expand Down
110 changes: 89 additions & 21 deletions scrape/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,47 +222,115 @@ func TestPopulateLabels(t *testing.T) {
}
}

// TestScrapeManagerReloadNoChange tests that no scrape reload happens when there is no config change.
func TestManagerReloadNoChange(t *testing.T) {
tsetName := "test"
func loadConfiguration(t *testing.T, c string) *config.Config {
t.Helper()

cfgText := `
cfg := &config.Config{}
if err := yaml.UnmarshalStrict([]byte(c), cfg); err != nil {
t.Fatalf("Unable to load YAML config: %s", err)
}
return cfg
}

func noopLoop() loop {
return &testLoop{
startFunc: func(interval, timeout time.Duration, errc chan<- error) {},
stopFunc: func() {},
}
}

func TestManagerApplyConfig(t *testing.T) {
// Valid initial configuration.
cfgText1 := `
scrape_configs:
- job_name: '` + tsetName + `'
- job_name: job1
static_configs:
- targets: ["foo:9090"]
- targets: ["bar:9090"]
`
cfg := &config.Config{}
if err := yaml.UnmarshalStrict([]byte(cfgText), cfg); err != nil {
t.Fatalf("Unable to load YAML config cfgYaml: %s", err)
}
// Invalid configuration.
cfgText2 := `
scrape_configs:
- job_name: job1
scheme: https
static_configs:
- targets: ["foo:9090"]
tls_config:
ca_file: /not/existing/ca/file
`
// Valid configuration.
cfgText3 := `
scrape_configs:
- job_name: job1
scheme: https
static_configs:
- targets: ["foo:9090"]
`
var (
cfg1 = loadConfiguration(t, cfgText1)
cfg2 = loadConfiguration(t, cfgText2)
cfg3 = loadConfiguration(t, cfgText3)

scrapeManager := NewManager(nil, nil)
// Load the current config.
scrapeManager.ApplyConfig(cfg)
ch = make(chan struct{}, 1)
)

// As reload never happens, new loop should never be called.
scrapeManager := NewManager(nil, nil)
newLoop := func(_ *Target, s scraper, _ int, _ bool, _ []*relabel.Config) loop {
t.Fatal("reload happened")
return nil
ch <- struct{}{}
return noopLoop()
}

sp := &scrapePool{
appendable: &nopAppendable{},
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{
1: &testLoop{},
1: noopLoop(),
},
newLoop: newLoop,
logger: nil,
config: cfg.ScrapeConfigs[0],
config: cfg1.ScrapeConfigs[0],
}
scrapeManager.scrapePools = map[string]*scrapePool{
tsetName: sp,
"job1": sp,
}

scrapeManager.ApplyConfig(cfg)
// Apply the initial configuration.
if err := scrapeManager.ApplyConfig(cfg1); err != nil {
t.Fatalf("unable to apply configuration: %s", err)
}
select {
case <-ch:
t.Fatal("reload happened")
default:
}

// Apply a configuration for which the reload fails.
if err := scrapeManager.ApplyConfig(cfg2); err == nil {
t.Fatalf("expecting error but got none")
}
select {
case <-ch:
t.Fatal("reload happened")
default:
}

// Apply a configuration for which the reload succeeds.
if err := scrapeManager.ApplyConfig(cfg3); err != nil {
t.Fatalf("unable to apply configuration: %s", err)
}
select {
case <-ch:
default:
t.Fatal("reload didn't happen")
}

// Re-applying the same configuration shouldn't trigger a reload.
if err := scrapeManager.ApplyConfig(cfg3); err != nil {
t.Fatalf("unable to apply configuration: %s", err)
}
select {
case <-ch:
t.Fatal("reload happened")
default:
}
}

func TestManagerTargetsUpdates(t *testing.T) {
Expand Down
46 changes: 39 additions & 7 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -61,6 +62,30 @@ var (
},
[]string{"interval"},
)
targetScrapePools = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pools_total",
Help: "Total number of scrape pool creation atttempts.",
},
)
targetScrapePoolsFailed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pools_failed_total",
Help: "Total number of scrape pool creations that failed.",
},
)
targetScrapePoolReloads = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pool_reloads_total",
Help: "Total number of scrape loop reloads.",
},
)
targetScrapePoolReloadsFailed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pool_reloads_failed_total",
Help: "Total number of failed scrape loop reloads.",
},
)
targetSyncIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "prometheus_target_sync_length_seconds",
Expand Down Expand Up @@ -105,6 +130,10 @@ var (
func init() {
prometheus.MustRegister(targetIntervalLength)
prometheus.MustRegister(targetReloadIntervalLength)
prometheus.MustRegister(targetScrapePools)
prometheus.MustRegister(targetScrapePoolsFailed)
prometheus.MustRegister(targetScrapePoolReloads)
prometheus.MustRegister(targetScrapePoolReloadsFailed)
prometheus.MustRegister(targetSyncIntervalLength)
prometheus.MustRegister(targetScrapePoolSyncsCounter)
prometheus.MustRegister(targetScrapeSampleLimit)
Expand Down Expand Up @@ -136,15 +165,16 @@ const maxAheadTime = 10 * time.Minute

type labelsMutator func(labels.Labels) labels.Labels

func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) (*scrapePool, error) {
targetScrapePools.Inc()
if logger == nil {
logger = log.NewNopLogger()
}

client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
if err != nil {
// Any errors that could occur here should be caught during config validation.
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
targetScrapePoolsFailed.Inc()
return nil, errors.Wrap(err, "error creating HTTP client")
}

buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
Expand Down Expand Up @@ -182,7 +212,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
)
}

return sp
return sp, nil
}

func (sp *scrapePool) ActiveTargets() []*Target {
Expand Down Expand Up @@ -227,16 +257,17 @@ func (sp *scrapePool) stop() {
// reload the scrape pool with the given scrape configuration. The target state is preserved
// but all scrape loops are restarted with the new scrape configuration.
// This method returns after all scrape loops that were stopped have stopped scraping.
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
targetScrapePoolReloads.Inc()
start := time.Now()

sp.mtx.Lock()
defer sp.mtx.Unlock()

client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
if err != nil {
// Any errors that could occur here should be caught during config validation.
level.Error(sp.logger).Log("msg", "Error creating HTTP client", "err", err)
targetScrapePoolReloadsFailed.Inc()
return errors.Wrap(err, "error creating HTTP client")
}
sp.config = cfg
sp.client = client
Expand Down Expand Up @@ -272,6 +303,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(start).Seconds(),
)
return nil
}

// Sync converts target groups into actual scrape targets and synchronizes
Expand Down
18 changes: 8 additions & 10 deletions scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/relabel"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

dto "github.com/prometheus/client_model/go"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
Expand All @@ -48,9 +46,9 @@ import (

func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp = newScrapePool(cfg, app, nil)
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp, _ = newScrapePool(cfg, app, nil)
)

if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
Expand Down Expand Up @@ -85,7 +83,7 @@ func TestDroppedTargetsList(t *testing.T) {
},
},
}
sp = newScrapePool(cfg, app, nil)
sp, _ = newScrapePool(cfg, app, nil)
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __metrics_path__=\"\", __scheme__=\"\", job=\"dropMe\"}"
expectedLength = 1
)
Expand Down Expand Up @@ -307,7 +305,7 @@ func TestScrapePoolReload(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp := newScrapePool(cfg, app, nil)
sp, _ := newScrapePool(cfg, app, nil)

loop := sp.newLoop(&Target{}, nil, 0, false, nil)
appl, ok := loop.(*scrapeLoop)
Expand Down Expand Up @@ -350,7 +348,7 @@ func TestScrapePoolRaces(t *testing.T) {
newConfig := func() *config.ScrapeConfig {
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
}
sp := newScrapePool(newConfig(), &nopAppendable{}, nil)
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, nil)
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{
Expand Down

0 comments on commit 12708ac

Please sign in to comment.