forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
134 lines (108 loc) · 5.04 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
_ "github.com/grafana/loki/pkg/build"
"github.com/grafana/loki/pkg/canary/comparator"
"github.com/grafana/loki/pkg/canary/reader"
"github.com/grafana/loki/pkg/canary/writer"
)
type canary struct {
lock sync.Mutex
writer *writer.Writer
reader *reader.Reader
comparator *comparator.Comparator
}
func main() {
lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector")
lVal := flag.String("labelvalue", "loki-canary", "The unique label value for this instance of loki-canary to use in the log selector")
sName := flag.String("streamname", "stream", "The stream name for this instance of loki-canary to use in the log selector")
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
tls := flag.Bool("tls", false, "Does the loki connection use TLS?")
user := flag.String("user", "", "Loki username")
pass := flag.String("pass", "", "Loki password")
queryTimeout := flag.Duration("query-timeout", 10*time.Second, "How long to wait for a query response from Loki")
interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries")
size := flag.Int("size", 100, "Size in bytes of each log line")
wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries on websocket before querying loki for them")
maxWait := flag.Duration("max-wait", 5*time.Minute, "Duration to keep querying Loki for missing websocket entries before reporting them missing")
pruneInterval := flag.Duration("pruneinterval", 60*time.Second, "Frequency to check sent vs received logs, "+
"also the frequency which queries for missing logs will be dispatched to loki")
buckets := flag.Int("buckets", 10, "Number of buckets in the response_latency histogram")
metricTestInterval := flag.Duration("metric-test-interval", 1*time.Hour, "The interval the metric test query should be run")
metricTestQueryRange := flag.Duration("metric-test-range", 24*time.Hour, "The range value [24h] used in the metric test instant-query."+
" Note: this value is truncated to the running time of the canary until this value is reached")
spotCheckInterval := flag.Duration("spot-check-interval", 15*time.Minute, "Interval that a single result will be kept from sent entries and spot-checked against Loki, "+
"e.g. 15min default one entry every 15 min will be saved and then queried again every 15min until spot-check-max is reached")
spotCheckMax := flag.Duration("spot-check-max", 4*time.Hour, "How far back to check a spot check entry before dropping it")
spotCheckQueryRate := flag.Duration("spot-check-query-rate", 1*time.Minute, "Interval that the canary will query Loki for the current list of all spot check entries")
spotCheckWait := flag.Duration("spot-check-initial-wait", 10*time.Second, "How long should the spot check query wait before starting to check for entries")
printVersion := flag.Bool("version", false, "Print this builds version information")
flag.Parse()
if *printVersion {
fmt.Println(version.Print("loki-canary"))
os.Exit(0)
}
if *addr == "" {
_, _ = fmt.Fprintf(os.Stderr, "Must specify a Loki address with -addr\n")
os.Exit(1)
}
sentChan := make(chan time.Time)
receivedChan := make(chan time.Time)
c := &canary{}
startCanary := func() {
c.stop()
c.lock.Lock()
defer c.lock.Unlock()
c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}
startCanary()
http.HandleFunc("/resume", func(_ http.ResponseWriter, _ *http.Request) {
_, _ = fmt.Fprintf(os.Stderr, "restarting\n")
startCanary()
})
http.HandleFunc("/suspend", func(_ http.ResponseWriter, _ *http.Request) {
_, _ = fmt.Fprintf(os.Stderr, "suspending\n")
c.stop()
})
http.Handle("/metrics", promhttp.Handler())
go func() {
err := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
if err != nil {
panic(err)
}
}()
terminate := make(chan os.Signal, 1)
signal.Notify(terminate, syscall.SIGTERM, os.Interrupt)
for range terminate {
_, _ = fmt.Fprintf(os.Stderr, "shutting down\n")
c.stop()
return
}
}
func (c *canary) stop() {
c.lock.Lock()
defer c.lock.Unlock()
if c.writer == nil || c.reader == nil || c.comparator == nil {
return
}
c.writer.Stop()
c.reader.Stop()
c.comparator.Stop()
c.writer = nil
c.reader = nil
c.comparator = nil
}