forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunning_processor.go
133 lines (110 loc) · 3.01 KB
/
running_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package models
import (
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)
type RunningProcessor struct {
sync.Mutex
log telegraf.Logger
Processor telegraf.StreamingProcessor
Config *ProcessorConfig
}
type RunningProcessors []*RunningProcessor
func (rp RunningProcessors) Len() int {
return len(rp)
}
func (rp RunningProcessors) Swap(i, j int) {
rp[i], rp[j] = rp[j], rp[i]
}
func (rp RunningProcessors) Less(i, j int) bool {
// If the processors are defined in separate files only sort based on order
if rp[i].Config.ID != rp[j].Config.ID {
return rp[i].Config.Order < rp[j].Config.Order
}
// If Order is defined for both processors, sort according to the number set
if rp[i].Config.Order != 0 && rp[j].Config.Order != 0 {
// If both orders are equal, ensure config order is maintained
if rp[i].Config.Order == rp[j].Config.Order {
return rp[i].Config.Line < rp[j].Config.Line
}
return rp[i].Config.Order < rp[j].Config.Order
}
// If "Order" is defined for one processor but not another,
// the processor without an "Order" will always take precedence.
// This adheres to the original implementation.
if rp[i].Config.Order != 0 {
return false
}
if rp[j].Config.Order != 0 {
return true
}
return rp[i].Config.Line < rp[j].Config.Line
}
// ProcessorConfig containing a name and filter
type ProcessorConfig struct {
ID string
Name string
Alias string
Order int64
Line int
Filter Filter
}
func NewRunningProcessor(processor telegraf.StreamingProcessor, config *ProcessorConfig) *RunningProcessor {
tags := map[string]string{"processor": config.Name}
if config.Alias != "" {
tags["alias"] = config.Alias
}
processErrorsRegister := selfstat.Register("process", "errors", tags)
logger := NewLogger("processors", config.Name, config.Alias)
logger.OnErr(func() {
processErrorsRegister.Incr(1)
})
SetLoggerOnPlugin(processor, logger)
return &RunningProcessor{
Processor: processor,
Config: config,
log: logger,
}
}
func (rp *RunningProcessor) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}
func (rp *RunningProcessor) Init() error {
if p, ok := rp.Processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (rp *RunningProcessor) Log() telegraf.Logger {
return rp.log
}
func (rp *RunningProcessor) LogName() string {
return logName("processors", rp.Config.Name, rp.Config.Alias)
}
func (rp *RunningProcessor) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}
func (rp *RunningProcessor) Start(acc telegraf.Accumulator) error {
return rp.Processor.Start(acc)
}
func (rp *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
if ok := rp.Config.Filter.Select(m); !ok {
// pass downstream
acc.AddMetric(m)
return nil
}
rp.Config.Filter.Modify(m)
if len(m.FieldList()) == 0 {
// drop metric
rp.metricFiltered(m)
return nil
}
return rp.Processor.Add(m, acc)
}
func (rp *RunningProcessor) Stop() {
rp.Processor.Stop()
}