Skip to content

Commit

Permalink
Add support for extracting node attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
ShimiTaNaka committed Jul 10, 2019
1 parent 81abc55 commit fe6897f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
32 changes: 30 additions & 2 deletions collector/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func getRoles(node NodeStatsNodeResponse) map[string]bool {
return roles
}

func getAttributesValues(node NodeStatsNodeResponse, attrs []string) []string {
attributesValues := []string{}
for _, attributeName := range attrs {
// Go over the node attributes and search for the requested attributes
if _, ok := node.Attributes[attributeName]; ok {
attributesValues = append(attributesValues, node.Attributes[attributeName])
} else {
attributesValues = append(attributesValues, "")
}
}
return attributesValues
}

func createRoleMetric(role string) *nodeMetric {
return &nodeMetric{
Type: prometheus.GaugeValue,
Expand All @@ -72,15 +85,18 @@ func createRoleMetric(role string) *nodeMetric {
var (
defaultNodeLabels = []string{"cluster", "host", "name", "es_master_node", "es_data_node", "es_ingest_node", "es_client_node"}
defaultRoleLabels = []string{"cluster", "host", "name"}
extendedNodeLabels = []string{}
defaultThreadPoolLabels = append(defaultNodeLabels, "type")
defaultBreakerLabels = append(defaultNodeLabels, "breaker")
defaultFilesystemDataLabels = append(defaultNodeLabels, "mount", "path")
defaultFilesystemIODeviceLabels = append(defaultNodeLabels, "device")
defaultCacheLabels = append(defaultNodeLabels, "cache")


defaultNodeLabelValues = func(cluster string, node NodeStatsNodeResponse) []string {
attrsValues := getAttributesValues(node, extendedNodeLabels)
roles := getRoles(node)
return []string{
defaultValues := []string{
cluster,
node.Host,
node.Name,
Expand All @@ -89,7 +105,10 @@ var (
fmt.Sprintf("%t", roles["ingest"]),
fmt.Sprintf("%t", roles["client"]),
}
returnValue := append(attrsValues, defaultValues...)
return returnValue
}

defaultThreadPoolLabelValues = func(cluster string, node NodeStatsNodeResponse, pool string) []string {
return append(defaultNodeLabelValues(cluster, node), pool)
}
Expand Down Expand Up @@ -169,7 +188,16 @@ type Nodes struct {
}

// NewNodes defines Nodes Prometheus metrics
func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, node string) *Nodes {
func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, node string, nodeAttributes []string) *Nodes {

extendedNodeLabels = nodeAttributes
defaultNodeLabels = append(extendedNodeLabels, defaultNodeLabels...)
defaultThreadPoolLabels = append(extendedNodeLabels, defaultThreadPoolLabels...)
defaultBreakerLabels = append(extendedNodeLabels, defaultBreakerLabels...)
defaultFilesystemDataLabels = append(extendedNodeLabels, defaultFilesystemDataLabels...)
defaultFilesystemIODeviceLabels = append(extendedNodeLabels, defaultFilesystemIODeviceLabels...)
defaultCacheLabels = append(extendedNodeLabels, defaultCacheLabels...)

return &Nodes{
logger: logger,
client: client,
Expand Down
2 changes: 1 addition & 1 deletion collector/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestNodesStats(t *testing.T) {
t.Fatalf("Failed to parse URL: %s", err)
}
u.User = url.UserPassword("elastic", "changeme")
c := NewNodes(log.NewNopLogger(), http.DefaultClient, u, true, "_local")
c := NewNodes(log.NewNopLogger(), http.DefaultClient, u, true, "_local", []string{})
nsr, err := c.fetchAndDecodeNodeStats()
if err != nil {
t.Fatalf("Failed to fetch or decode node stats: %s", err)
Expand Down
9 changes: 8 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/url"
"os"
"os/signal"
"strings"
"time"

"context"
Expand Down Expand Up @@ -77,6 +78,8 @@ func main() {
logOutput = kingpin.Flag("log.output",
"Sets the log output. Valid outputs are stdout and stderr").
Default("stdout").Envar("LOG_OUTPUT").String()
nodeAttributes = kingpin.Flag("node.attributes", "Node attributes to add as labels").
Default("").Envar("NODE_ATTRIBUTES").String()
)

kingpin.Version(version.Print(Name))
Expand Down Expand Up @@ -111,9 +114,13 @@ func main() {

// cluster info retriever
clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)
nodeAttributesList := []string{}
if *nodeAttributes != "" {
nodeAttributesList = strings.Split(*nodeAttributes, ",")
}

prometheus.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode, nodeAttributesList))

if *esExportIndices || *esExportShards {
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards)
Expand Down

0 comments on commit fe6897f

Please sign in to comment.