Skip to content

Commit

Permalink
Extract loki tailer and prometheus metrics into shared packages
Browse files Browse the repository at this point in the history
  • Loading branch information
iand committed Oct 14, 2022
1 parent c09ff4f commit 6e01f32
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 26 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/ipfs-shipyard/thunderdome

go 1.19
22 changes: 13 additions & 9 deletions skyfish/loki.go → pkg/loki/loki.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package loki

import (
"context"
Expand All @@ -18,14 +18,14 @@ import (
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"

"github.com/ipfs-shipyard/thunderdome/pkg/prom"
)

const (
tailPath = "/loki/api/v1/tail"
)

var userAgent = fmt.Sprintf("%s/0.1", appName)

type Request struct {
Method string `json:"method"`
URI string `json:"uri"`
Expand All @@ -50,6 +50,7 @@ type LokiTailer struct {
}

type LokiConfig struct {
AppName string
URI string // URI of the loki server, e.g. https://logs-prod-us-central1.grafana.net
Username string // For grafana cloud this is a numeric user id
Password string // For grafana cloud this is the API token
Expand All @@ -71,31 +72,35 @@ func NewLokiTailer(cfg *LokiConfig) (*LokiTailer, error) {

var err error

l.requestsIncomingCounter, err = newPrometheusCounter(
l.requestsIncomingCounter, err = prom.NewPrometheusCounter(
cfg.AppName,
"loki_requests_incoming_total",
"The total number of requests read from loki.",
)
if err != nil {
return nil, fmt.Errorf("new counter: %w", err)
}

l.requestsDroppedCounter, err = newPrometheusCounter(
l.requestsDroppedCounter, err = prom.NewPrometheusCounter(
cfg.AppName,
"loki_requests_dropped_total",
"The total number of requests that could not be sent to the publisher.",
)
if err != nil {
return nil, fmt.Errorf("new counter: %w", err)
}

l.errorCounter, err = newPrometheusCounter(
l.errorCounter, err = prom.NewPrometheusCounter(
cfg.AppName,
"loki_error_total",
"The total number of errors encountered when reading from loki.",
)
if err != nil {
return nil, fmt.Errorf("new counter: %w", err)
}

l.connectedGauge, err = newPrometheusGauge(
l.connectedGauge, err = prom.NewPrometheusGauge(
cfg.AppName,
"loki_connected",
"Indicates whether the tailer is connected to loki.",
)
Expand Down Expand Up @@ -141,7 +146,6 @@ func (l *LokiTailer) Run(ctx context.Context) error {
for _, stream := range tr.Streams {
for _, entry := range stream.Values {
l.requestsIncomingCounter.Add(1)
totalRequestsReceived.Add(1)

var line logline
err := json.Unmarshal([]byte(entry.Line()), &line)
Expand Down Expand Up @@ -285,7 +289,7 @@ func (l *LokiTailer) getHTTPRequestHeader() (http.Header, error) {
)
}

h.Set("User-Agent", userAgent)
h.Set("User-Agent", fmt.Sprintf("%s/0.1", l.cfg.AppName))

if l.cfg.OrgID != "" {
h.Set("X-Scope-OrgID", l.cfg.OrgID)
Expand Down
8 changes: 4 additions & 4 deletions skyfish/metrics.go → pkg/prom/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package prom

import (
"context"
Expand All @@ -19,7 +19,7 @@ type PrometheusServer struct {
pe *promexp.Exporter
}

func NewPrometheusServer(addr string) (*PrometheusServer, error) {
func NewPrometheusServer(addr string, appName string) (*PrometheusServer, error) {
pe, err := promexp.NewExporter(promexp.Options{
Namespace: appName,
Registerer: prom.DefaultRegisterer,
Expand Down Expand Up @@ -52,7 +52,7 @@ func (p *PrometheusServer) Run(ctx context.Context) error {
return server.ListenAndServe()
}

func newPrometheusCounter(name string, help string) (prometheus.Counter, error) {
func NewPrometheusCounter(appName string, name string, help string) (prometheus.Counter, error) {
m := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "thunderdome",
Expand All @@ -72,7 +72,7 @@ func newPrometheusCounter(name string, help string) (prometheus.Counter, error)
return m, nil
}

func newPrometheusGauge(name string, help string) (prometheus.Gauge, error) {
func NewPrometheusGauge(appName string, name string, help string) (prometheus.Gauge, error) {
m := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "thunderdome",
Expand Down
5 changes: 2 additions & 3 deletions skyfish/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (

// Some global counts that will be periodically logged
var (
totalRequestsReceived atomic.Int64
totalRequestsSent atomic.Int64
totalRequestsSent atomic.Int64
)

type Health struct{}
Expand All @@ -24,7 +23,7 @@ func (h *Health) Run(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
log.Printf("sent %d requests of %d received", totalRequestsSent.Load(), totalRequestsReceived.Load())
log.Printf("sent %d requests", totalRequestsSent.Load())
}
}
}
10 changes: 7 additions & 3 deletions skyfish/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/pkg/profile"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"

"github.com/ipfs-shipyard/thunderdome/pkg/loki"
"github.com/ipfs-shipyard/thunderdome/pkg/prom"
)

const appName = "skyfish"
Expand Down Expand Up @@ -113,7 +116,8 @@ func main() {
func Run(cc *cli.Context) error {
ctx := cc.Context

cfg := &LokiConfig{
cfg := &loki.LokiConfig{
AppName: appName,
URI: flags.lokiURI,
Username: flags.lokiUsername,
Password: flags.lokiPassword,
Expand All @@ -122,14 +126,14 @@ func Run(cc *cli.Context) error {

rg := &RunGroup{}

source, err := NewLokiTailer(cfg)
source, err := loki.NewLokiTailer(cfg)
if err != nil {
return fmt.Errorf("loki source: %w", err)
}
rg.Add(source)

if flags.prometheusAddr != "" {
ps, err := NewPrometheusServer(flags.prometheusAddr)
ps, err := prom.NewPrometheusServer(flags.prometheusAddr, appName)
if err != nil {
return fmt.Errorf("start prometheus: %w", err)
}
Expand Down
22 changes: 15 additions & 7 deletions skyfish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/prometheus/client_golang/prometheus"

"github.com/ipfs-shipyard/thunderdome/pkg/loki"
"github.com/ipfs-shipyard/thunderdome/pkg/prom"
)

const MaxMessageSize = 256 * 1024 // sns has 256kb max message size

type Publisher struct {
reqs <-chan Request
reqs <-chan loki.Request
awscfg *aws.Config
topicArn string
snsErrorCounter prometheus.Counter
Expand All @@ -26,46 +29,51 @@ type Publisher struct {
connectedGauge prometheus.Gauge
}

func NewPublisher(awscfg *aws.Config, topicArn string, reqs <-chan Request) (*Publisher, error) {
func NewPublisher(awscfg *aws.Config, topicArn string, reqs <-chan loki.Request) (*Publisher, error) {
p := &Publisher{
reqs: reqs,
awscfg: awscfg,
topicArn: topicArn,
}
var err error
p.connectedGauge, err = newPrometheusGauge(
p.connectedGauge, err = prom.NewPrometheusGauge(
appName,
"publisher_connected",
"Indicates whether the application is connected to sns.",
)
if err != nil {
return nil, fmt.Errorf("new gauge: %w", err)
}

p.snsErrorCounter, err = newPrometheusCounter(
p.snsErrorCounter, err = prom.NewPrometheusCounter(
appName,
"publisher_sns_error_total",
"The total number of errors encountered when publishing requests to sns.",
)
if err != nil {
return nil, fmt.Errorf("new counter: %w", err)
}

p.processErrorCounter, err = newPrometheusCounter(
p.processErrorCounter, err = prom.NewPrometheusCounter(
appName,
"publisher_process_error_total",
"The total number of errors encountered when preparing requests to be published.",
)
if err != nil {
return nil, fmt.Errorf("new counter: %w", err)
}

p.messagesCounter, err = newPrometheusCounter(
p.messagesCounter, err = prom.NewPrometheusCounter(
appName,
"publisher_sns_messages_total",
"The total number of sns messages published.",
)
if err != nil {
return nil, fmt.Errorf("new counter: %w", err)
}

p.requestsCounter, err = newPrometheusCounter(
p.requestsCounter, err = prom.NewPrometheusCounter(
appName,
"publisher_requests_total",
"The total number of requests published in messages.",
)
Expand Down

0 comments on commit 6e01f32

Please sign in to comment.