forked from imjerrybao/apex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetric.go
140 lines (118 loc) · 2.74 KB
/
metric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package metrics
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
)
var metricsNames = []string{"Invocations", "Errors", "Duration", "Throttles"}
// Metric collects metrics for single function.
type Metric struct {
Config
FunctionName string
}
// Collect and aggregate metrics for on function.
func (m *Metric) Collect() (a AggregatedMetrics) {
for n := range m.collect(m.gen()) {
value := aggregate(n.Value)
switch n.Name {
case "Duration":
a.Duration = value
case "Errors":
a.Errors = value
case "Invocations":
a.Invocations = value
case "Throttles":
a.Throttles = value
}
}
return
}
// cloudWatchMetric represents a CloudWatch metric with a given name and value.
type cloudWatchMetric struct {
Name string
Value []*cloudwatch.Datapoint
}
// stats for function `name`.
func (m *Metric) stats(name string) (*cloudwatch.GetMetricStatisticsOutput, error) {
return m.Service.GetMetricStatistics(&cloudwatch.GetMetricStatisticsInput{
StartTime: &m.StartDate,
EndTime: &m.EndDate,
MetricName: &name,
Namespace: aws.String("AWS/Lambda"),
Period: aws.Int64(int64(period(m.StartDate, m.EndDate).Seconds())),
Statistics: []*string{
aws.String("Sum"),
},
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("FunctionName"),
Value: &m.FunctionName,
},
},
Unit: aws.String(unit(name)),
})
}
// collect starts a new cloudwatch session and requests the key metrics.
func (m *Metric) collect(in <-chan string) <-chan cloudWatchMetric {
var wg sync.WaitGroup
out := make(chan cloudWatchMetric)
for name := range in {
wg.Add(1)
name := name
go func() {
defer wg.Done()
res, err := m.stats(name)
if err != nil {
// TODO: refactor so that errors are reported in cmd
fmt.Println(err.Error())
return
}
out <- cloudWatchMetric{
Name: name,
Value: res.Datapoints,
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// gen generates the key metric structs and returns a channel pipeline.
func (m *Metric) gen() <-chan string {
out := make(chan string, len(metricsNames))
for _, n := range metricsNames {
out <- n
}
close(out)
return out
}
// period returns the resolution of metrics.
func period(start, end time.Time) time.Duration {
switch n := end.Sub(start).Hours(); {
case n > 24:
return time.Hour * 24
default:
return time.Hour
}
}
// unit for metric name.
func unit(name string) string {
switch name {
case "Duration":
return "Milliseconds"
default:
return "Count"
}
}
// aggregate accumulates the datapoints.
func aggregate(datapoints []*cloudwatch.Datapoint) int {
sum := 0.0
for _, datapoint := range datapoints {
sum += *datapoint.Sum
}
return int(sum)
}