Skip to content

Fix to allow multiple access logs to collect metrics #1100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 30, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package accesslog

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -42,7 +43,7 @@ type (
logger *zap.Logger
mb *metadata.MetricsBuilder
rb *metadata.ResourceBuilder
pipe *pipeline.DirectedPipeline
pipes []*pipeline.DirectedPipeline
wg *sync.WaitGroup
cancel context.CancelFunc
entries []*entry.Entry
Expand Down Expand Up @@ -103,20 +104,27 @@ func (nls *NginxLogScraper) ID() component.ID {
return component.NewID(metadata.Type)
}

// nolint: unparam
func (nls *NginxLogScraper) Start(parentCtx context.Context, _ component.Host) error {
nls.logger.Info("NGINX access log scraper started")
ctx, cancel := context.WithCancel(parentCtx)
nls.cancel = cancel

var err error
nls.pipe, err = nls.initStanzaPipeline(nls.operators, nls.logger)
if err != nil {
return fmt.Errorf("init stanza pipeline: %w", err)
for _, op := range nls.operators {
nls.logger.Info("Initializing NGINX access log scraper pipeline", zap.Any("operator_id", op.ID()))
pipe, err := nls.initStanzaPipeline([]operator.Config{op}, nls.logger)
if err != nil {
nls.logger.Error("Error initializing pipeline", zap.Any("operator_id", op.ID()), zap.Any("error", err))
continue
}
nls.pipes = append(nls.pipes, pipe)
}

startError := nls.pipe.Start(storage.NewNopClient())
if startError != nil {
return fmt.Errorf("stanza pipeline start: %w", startError)
for _, pipe := range nls.pipes {
startError := pipe.Start(storage.NewNopClient())
if startError != nil {
nls.logger.Error("Error starting pipeline", zap.Any("error", startError))
}
}

nls.wg.Add(1)
Expand Down Expand Up @@ -206,7 +214,14 @@ func (nls *NginxLogScraper) Shutdown(_ context.Context) error {
}
nls.wg.Wait()

return nls.pipe.Stop()
var err error
for _, pipe := range nls.pipes {
if stopErr := pipe.Stop(); stopErr != nil {
err = errors.Join(err, stopErr)
}
}

return err
}

func (nls *NginxLogScraper) initStanzaPipeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,6 @@ func TestAccessLogScraper(t *testing.T) {
pmetrictest.IgnoreResourceAttributeValue("instance.id")))
}

func TestAccessLogScraperError(t *testing.T) {
t.Run("include config missing", func(tt *testing.T) {
logScraper := NewScraper(receivertest.NewNopSettings(component.Type{}), &config.Config{})
err := logScraper.Start(context.Background(), componenttest.NewNopHost())
require.Error(tt, err)
assert.Contains(tt, err.Error(), "init stanza pipeline")
})
}

// Copies the contents of one file to another with the given delay. Used to simulate writing log entries to a log file.
// Reason for nolint: we must use testify's assert instead of require,
// for more info see https://github.com/stretchr/testify/issues/772#issuecomment-945166599
Expand Down