Skip to content
This repository has been archived by the owner on Mar 19, 2024. It is now read-only.

WIP GRPC rate limiting #533

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
checkpoint, update registration to use catalog api
  • Loading branch information
sarahalsmiller authored and mikemorris committed Apr 5, 2023
commit 0f35a9f046131edfd64710073f795b4358ea4da8
111 changes: 70 additions & 41 deletions internal/consul/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (

const (
serviceCheckName = "consul-api-gateway Gateway Listener"
serviceCheckInterval = "10s"
serviceCheckTTL = "20s"
serviceDeregistrationTime = "1m"
serviceCheckInterval = time.Second * 10
serviceCheckTTL = time.Second * 20
serviceDeregistrationTime = time.Minute
)

// ServiceRegistry handles the logic for registering a consul-api-gateway service in Consul.
Expand Down Expand Up @@ -76,51 +76,78 @@ func (s *ServiceRegistry) WithTries(tries uint64) *ServiceRegistry {

// Register registers a Gateway service with Consul.
func (s *ServiceRegistry) RegisterGateway(ctx context.Context, ttl bool) error {
serviceChecks := api.AgentServiceChecks{{
Name: fmt.Sprintf("%s - Ready", serviceCheckName),
TCP: fmt.Sprintf("%s:%d", s.host, 20000),
Interval: serviceCheckInterval,
DeregisterCriticalServiceAfter: serviceDeregistrationTime,
//serviceCheck := api.AgentServiceCheck{
// Name: fmt.Sprintf("%s - Ready", serviceCheckName),
// TCP: fmt.Sprintf("%s:%d", s.host, 20000),
// Interval: serviceCheckInterval,
// DeregisterCriticalServiceAfter: serviceDeregistrationTime,
//}
//if ttl {
// serviceCheck = api.AgentServiceCheck{
// CheckID: s.id,
// Name: fmt.Sprintf("%s - Health", s.name),
// TTL: serviceCheckTTL,
// DeregisterCriticalServiceAfter: serviceDeregistrationTime,
// }
//}

serviceChecks := api.HealthChecks{{
Name: fmt.Sprintf("%s - Ready", serviceCheckName),
Definition: api.HealthCheckDefinition{
TCP: fmt.Sprintf("%s:%d", s.host, 20000),
IntervalDuration: serviceCheckInterval,
DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime,
},
}}
if ttl {
serviceChecks = api.AgentServiceChecks{{
CheckID: s.id,
Name: fmt.Sprintf("%s - Health", s.name),
TTL: serviceCheckTTL,
DeregisterCriticalServiceAfter: serviceDeregistrationTime,
serviceChecks = api.HealthChecks{{
CheckID: s.id,
Name: fmt.Sprintf("%s - Health", s.name),
Definition: api.HealthCheckDefinition{
TCP: fmt.Sprintf("%s:%d", s.host, 20000),
TimeoutDuration: serviceCheckTTL,
DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime,
},
}}
}

return s.register(ctx, &api.AgentServiceRegistration{
Kind: api.ServiceKind(api.IngressGateway),
ID: s.id,
Name: s.name,
Namespace: s.namespace,
Partition: s.partition,
Address: s.host,
Tags: s.tags,
Meta: map[string]string{
"external-source": "consul-api-gateway",
return s.register(ctx, &api.CatalogRegistration{
Service: &api.AgentService{
Kind: api.ServiceKind(api.IngressGateway),
ID: s.id,
Service: s.name,
Namespace: s.namespace,
Partition: s.partition,
Address: s.host,
Tags: s.tags,
Meta: map[string]string{
"external-source": "consul-api-gateway",
},
},
Checks: serviceChecks,
}, ttl)
}

// Register registers a service with Consul.
func (s *ServiceRegistry) Register(ctx context.Context) error {
return s.register(ctx, &api.AgentServiceRegistration{
Kind: api.ServiceKindTypical,
ID: s.id,
Name: s.name,
Namespace: s.namespace,
Partition: s.partition,
Address: s.host,
Tags: s.tags,
Checks: api.AgentServiceChecks{{
CheckID: s.id,
Name: fmt.Sprintf("%s - Health", s.name),
TTL: serviceCheckTTL,
DeregisterCriticalServiceAfter: serviceDeregistrationTime,
return s.register(ctx, &api.CatalogRegistration{
Service: &api.AgentService{
Kind: api.ServiceKindTypical,
ID: s.id,
Service: s.name,
Namespace: s.namespace,
Partition: s.partition,
Address: s.host,
Tags: s.tags,
},

Checks: api.HealthChecks{{
CheckID: s.id,
Name: fmt.Sprintf("%s - Health", s.name),
Definition: api.HealthCheckDefinition{
TimeoutDuration: serviceCheckTTL,
DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime,
},
}},
}, true)
}
Expand All @@ -130,7 +157,7 @@ func (s *ServiceRegistry) updateTTL(ctx context.Context) error {
return s.client.Agent().UpdateTTLOpts(s.id, "service healthy", "pass", opts.WithContext(ctx))
}

func (s *ServiceRegistry) register(ctx context.Context, registration *api.AgentServiceRegistration, ttl bool) error {
func (s *ServiceRegistry) register(ctx context.Context, registration *api.CatalogRegistration, ttl bool) error {
if s.cancel != nil {
return nil
}
Expand Down Expand Up @@ -168,7 +195,7 @@ func (s *ServiceRegistry) register(ctx context.Context, registration *api.AgentS
return nil
}

func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *api.AgentServiceRegistration) {
func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *api.CatalogRegistration) {
_, _, err := s.client.Agent().Service(s.id, &api.QueryOptions{
Namespace: s.namespace,
})
Expand All @@ -192,7 +219,7 @@ func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *
s.logger.Error("error fetching service", "error", err)
}

func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *api.AgentServiceRegistration) error {
func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *api.CatalogRegistration) error {
return backoff.Retry(func() error {
err := s.registerService(ctx, registration)
if err != nil {
Expand All @@ -202,8 +229,10 @@ func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *a
}, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.tries), ctx))
}

func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.AgentServiceRegistration) error {
return s.client.Agent().ServiceRegisterOpts(registration, (&api.ServiceRegisterOpts{}).WithContext(ctx))
func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.CatalogRegistration) error {
_, err := s.client.Catalog().Register(registration, nil)
return err
//return s.client.Agent().ServiceRegisterOpts(registration, (&api.ServiceRegisterOpts{}).WithContext(ctx))
}

// Deregister de-registers a service from Consul.
Expand Down