Skip to content

Commit

Permalink
Add the operator usage instances command and api endpoint (hashicor…
Browse files Browse the repository at this point in the history
…p#16205)

This endpoint shows total services, connect service instances and
billable service instances in the local datacenter or globally. Billable
instances = total service instances - connect services - consul server instances.
  • Loading branch information
kyhavlov authored Feb 8, 2023
1 parent df03b45 commit 898e59b
Show file tree
Hide file tree
Showing 24 changed files with 943 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .changelog/16205.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
command: Adds the `operator usage instances` subcommand for displaying total services, connect service instances and billable service instances in the local datacenter or globally.
```
2 changes: 1 addition & 1 deletion agent/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"net/http"
"strings"

metrics "github.com/armon/go-metrics"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"

cachetype "github.com/hashicorp/consul/agent/cache-types"
Expand Down
62 changes: 62 additions & 0 deletions agent/consul/operator_usage_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package consul

import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)

// Usage returns counts for service usage within catalog.
func (op *Operator) Usage(args *structs.OperatorUsageRequest, reply *structs.Usage) error {
reply.Usage = make(map[string]structs.ServiceUsage)

if args.Global {
remoteDCs := op.srv.router.GetDatacenters()
for _, dc := range remoteDCs {
remoteArgs := &structs.OperatorUsageRequest{
DCSpecificRequest: structs.DCSpecificRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{
Token: args.Token,
},
},
}
var resp structs.Usage
if _, err := op.srv.ForwardRPC("Operator.Usage", remoteArgs, &resp); err != nil {
op.logger.Warn("error forwarding usage request to remote datacenter", "datacenter", dc, "error", err)
}
if usage, ok := resp.Usage[dc]; ok {
reply.Usage[dc] = usage
}
}
}

var authzContext acl.AuthorizerContext
authz, err := op.srv.ResolveTokenAndDefaultMeta(args.Token, structs.DefaultEnterpriseMetaInDefaultPartition(), &authzContext)
if err != nil {
return err
}
err = authz.ToAllowAuthorizer().OperatorReadAllowed(&authzContext)
if err != nil {
return err
}

if err = op.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}

return op.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
// Get service usage.
index, serviceUsage, err := state.ServiceUsage(ws)
if err != nil {
return err
}

reply.QueryMeta.Index, reply.Usage[op.srv.config.Datacenter] = index, serviceUsage
return nil
})
}
87 changes: 69 additions & 18 deletions agent/consul/state/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,6 @@ type UsageEntry struct {
Count int
}

// ServiceUsage contains all of the usage data related to services
type ServiceUsage struct {
Services int
ServiceInstances int
ConnectServiceInstances map[string]int
EnterpriseServiceUsage
}

// NodeUsage contains all of the usage data related to nodes
type NodeUsage struct {
Nodes int
Expand Down Expand Up @@ -128,6 +120,8 @@ func updateUsage(tx WriteTxn, changes Changes) error {
addEnterpriseServiceInstanceUsage(usageDeltas, change)

connectDeltas(change, usageDeltas, delta)
billableServiceInstancesDeltas(change, usageDeltas, delta)

// Construct a mapping of all of the various service names that were
// changed, in order to compare it with the finished memdb state.
// Make sure to account for the fact that services can change their names.
Expand Down Expand Up @@ -271,6 +265,53 @@ func connectDeltas(change memdb.Change, usageDeltas map[string]int, delta int) {
}
}

// billableServiceInstancesDeltas calculates deltas for the billable services. Billable services
// are of "typical" service kind (i.e. non-connect or connect-native), excluding the "consul" service.
func billableServiceInstancesDeltas(change memdb.Change, usageDeltas map[string]int, delta int) {
// Billable service instances = # of typical service instances (i.e. non-connect) + connect-native service instances.
// Specifically, it should exclude "consul" service instances from the count.
//
// If the service has been updated, then we check
// 1. If the service name changed to or from "consul" and update deltas such that we exclude consul server service instances.
// This case is a bit contrived because we don't expect consul service to change once it's registered (beyond changing its instance count).
// a) If changed to "consul" -> decrement deltas by one
// b) If changed from "consul" and it's not a "connect" service -> increase deltas by one
// 2. If the service kind changed to or from "typical", we need to we need to update deltas so that we only account
// for non-connect or connect-native instances.
if change.Updated() {
// When there's an update, the delta arg passed to this function is 0, and so we need to explicitly increment
// or decrement by 1 depending on the situation.
before := change.Before.(*structs.ServiceNode)
after := change.After.(*structs.ServiceNode)
// Service name changed away from "consul" means we now need to account for this service instances unless it's a "connect" service.
if before.ServiceName == structs.ConsulServiceName && after.ServiceName != structs.ConsulServiceName {
if after.ServiceKind == structs.ServiceKindTypical {
usageDeltas[billableServiceInstancesTableName()] += 1
addEnterpriseBillableServiceInstanceUsage(usageDeltas, after, 1)
}
}
if before.ServiceName != structs.ConsulServiceName && after.ServiceName == structs.ConsulServiceName {
usageDeltas[billableServiceInstancesTableName()] -= 1
addEnterpriseBillableServiceInstanceUsage(usageDeltas, before, -1)
}

if before.ServiceKind != structs.ServiceKindTypical && after.ServiceKind == structs.ServiceKindTypical {
usageDeltas[billableServiceInstancesTableName()] += 1
addEnterpriseBillableServiceInstanceUsage(usageDeltas, after, 1)
} else if before.ServiceKind == structs.ServiceKindTypical && after.ServiceKind != structs.ServiceKindTypical {
usageDeltas[billableServiceInstancesTableName()] -= 1
addEnterpriseBillableServiceInstanceUsage(usageDeltas, before, -1)
}
} else {
svc := changeObject(change).(*structs.ServiceNode)
// If it's not an update, only update delta if it's a typical service and not the "consul" service.
if svc.ServiceKind == structs.ServiceKindTypical && svc.ServiceName != structs.ConsulServiceName {
usageDeltas[billableServiceInstancesTableName()] += delta
addEnterpriseBillableServiceInstanceUsage(usageDeltas, svc, delta)
}
}
}

// writeUsageDeltas will take in a map of IDs to deltas and update each
// entry accordingly, checking for integer underflow. The index that is
// passed in will be recorded on the entry as well.
Expand All @@ -289,7 +330,7 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
// large numbers.
delta = 0
}
err := tx.Insert(tableUsage, &UsageEntry{
err = tx.Insert(tableUsage, &UsageEntry{
ID: id,
Count: delta,
Index: idx,
Expand Down Expand Up @@ -365,37 +406,43 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) {

// ServiceUsage returns the latest seen Raft index, a compiled set of service
// usage data, and any errors.
func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, ServiceUsage, error) {
func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, structs.ServiceUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()

serviceInstances, err := firstUsageEntry(ws, tx, tableServices)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}

services, err := firstUsageEntry(ws, tx, serviceNamesUsageTable)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}

serviceKindInstances := make(map[string]int)
for _, kind := range allConnectKind {
usage, err := firstUsageEntry(ws, tx, connectUsageTableName(kind))
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
serviceKindInstances[kind] = usage.Count
}

usage := ServiceUsage{
ServiceInstances: serviceInstances.Count,
Services: services.Count,
ConnectServiceInstances: serviceKindInstances,
billableServiceInstances, err := firstUsageEntry(ws, tx, billableServiceInstancesTableName())
if err != nil {
return 0, structs.ServiceUsage{}, fmt.Errorf("failed billable services lookup: %s", err)
}

usage := structs.ServiceUsage{
ServiceInstances: serviceInstances.Count,
Services: services.Count,
ConnectServiceInstances: serviceKindInstances,
BillableServiceInstances: billableServiceInstances.Count,
}
results, err := compileEnterpriseServiceUsage(ws, tx, usage)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}

return serviceInstances.Index, results, nil
Expand Down Expand Up @@ -469,3 +516,7 @@ func firstUsageEntry(ws memdb.WatchSet, tx ReadTxn, id string) (*UsageEntry, err

return realUsage, nil
}

func billableServiceInstancesTableName() string {
return fmt.Sprintf("billable-%s", tableServices)
}
4 changes: 3 additions & 1 deletion agent/consul/state/usage_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueSer

func addEnterpriseConnectServiceInstanceUsage(map[string]int, *structs.ServiceNode, int) {}

func addEnterpriseBillableServiceInstanceUsage(map[string]int, *structs.ServiceNode, int) {}

func addEnterpriseKVUsage(map[string]int, memdb.Change) {}

func addEnterpriseConfigEntryUsage(map[string]int, memdb.Change) {}

func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage structs.ServiceUsage) (structs.ServiceUsage, error) {
return usage, nil
}

Expand Down
15 changes: 15 additions & 0 deletions agent/consul/state/usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
for k := range usage.ConnectServiceInstances {
require.Equal(t, 0, usage.ConnectServiceInstances[k])
}
require.Equal(t, 0, usage.BillableServiceInstances)
}

func TestStateStore_Usage_ServiceUsage(t *testing.T) {
Expand All @@ -184,6 +185,7 @@ func TestStateStore_Usage_ServiceUsage(t *testing.T) {
require.Equal(t, 8, usage.ServiceInstances)
require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 3, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 6, usage.BillableServiceInstances)

testRegisterSidecarProxy(t, s, 16, "node2", "service2")

Expand Down Expand Up @@ -225,6 +227,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
require.Equal(t, 4, usage.ServiceInstances)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 3, usage.BillableServiceInstances)

require.NoError(t, s.DeleteNode(4, "node1", nil, ""))

Expand All @@ -236,6 +239,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
for k := range usage.ConnectServiceInstances {
require.Equal(t, 0, usage.ConnectServiceInstances[k])
}
require.Equal(t, 0, usage.BillableServiceInstances)
}

// Test that services from remote peers aren't counted in writes or deletes.
Expand Down Expand Up @@ -263,6 +267,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
require.Equal(t, 3, usage.ServiceInstances)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 2, usage.BillableServiceInstances)
})

testutil.RunStep(t, "deletes", func(t *testing.T) {
Expand All @@ -275,6 +280,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
require.Equal(t, 0, usage.ServiceInstances)
require.Equal(t, 0, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 0, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 0, usage.BillableServiceInstances)
})
}

Expand Down Expand Up @@ -311,6 +317,7 @@ func TestStateStore_Usage_Restore(t *testing.T) {
require.Equal(t, idx, uint64(9))
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 2)
require.Equal(t, usage.BillableServiceInstances, 2)
}

func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
Expand Down Expand Up @@ -411,6 +418,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, idx, uint64(2))
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 1)
require.Equal(t, usage.BillableServiceInstances, 1)
})

t.Run("update service to be connect native", func(t *testing.T) {
Expand All @@ -432,6 +440,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 1)
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 1, usage.BillableServiceInstances)
})

t.Run("update service to not be connect native", func(t *testing.T) {
Expand All @@ -453,6 +462,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 1)
require.Equal(t, 0, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 1, usage.BillableServiceInstances)
})

t.Run("rename service with a multiple instances", func(t *testing.T) {
Expand Down Expand Up @@ -484,6 +494,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, usage.Services, 2)
require.Equal(t, usage.ServiceInstances, 3)
require.Equal(t, 2, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 3, usage.BillableServiceInstances)

update := &structs.NodeService{
ID: "service2",
Expand All @@ -502,6 +513,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, usage.Services, 3)
require.Equal(t, usage.ServiceInstances, 3)
require.Equal(t, 2, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 3, usage.BillableServiceInstances)

})
}
Expand All @@ -528,6 +540,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 1)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 0, usage.BillableServiceInstances)
})

t.Run("rename service with a multiple instances", func(t *testing.T) {
Expand All @@ -554,6 +567,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
require.Equal(t, usage.Services, 2)
require.Equal(t, usage.ServiceInstances, 3)
require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 1, usage.BillableServiceInstances)

update := &structs.NodeService{
ID: "service3",
Expand All @@ -569,6 +583,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
require.Equal(t, usage.Services, 3)
require.Equal(t, usage.ServiceInstances, 3)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 2, usage.BillableServiceInstances)
})
}

Expand Down
4 changes: 4 additions & 0 deletions agent/consul/usagemetrics/usagemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ var Gauges = []prometheus.GaugeDefinition{
Name: []string{"state", "config_entries"},
Help: "Measures the current number of unique configuration entries registered with Consul, labeled by Kind. It is only emitted by Consul servers. Added in v1.10.4.",
},
{
Name: []string{"state", "billable_service_instances"},
Help: "Total number of billable service instances in the local datacenter.",
},
}

type getMembersFunc func() []serf.Member
Expand Down
8 changes: 7 additions & 1 deletion agent/consul/usagemetrics/usagemetrics_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/hashicorp/serf/serf"

"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
)

func (u *UsageMetricsReporter) emitNodeUsage(nodeUsage state.NodeUsage) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func (u *UsageMetricsReporter) emitMemberUsage(members []serf.Member) {
)
}

func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage) {
func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage structs.ServiceUsage) {
metrics.SetGaugeWithLabels(
[]string{"consul", "state", "services"},
float32(serviceUsage.Services),
Expand All @@ -96,6 +97,11 @@ func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage)
float32(serviceUsage.ServiceInstances),
u.metricLabels,
)
metrics.SetGaugeWithLabels(
[]string{"state", "billable_service_instances"},
float32(serviceUsage.BillableServiceInstances),
u.metricLabels,
)

for k, i := range serviceUsage.ConnectServiceInstances {
metrics.SetGaugeWithLabels(
Expand Down
Loading

0 comments on commit 898e59b

Please sign in to comment.