Skip to content

Commit

Permalink
feat: Add worker event listener
Browse files Browse the repository at this point in the history
- add worker event listener
- add worker event handler
- start event listener when a worker starts
  • Loading branch information
elimt committed Apr 5, 2024
1 parent 8506ed3 commit 3f94c90
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1366,10 +1366,11 @@ func TestWorkerLocalStorageStateStatus(t *testing.T) {
kms := kms.TestKms(t, conn, wrapper)

serverRepo, _ := server.NewRepository(ctx, rw, rw, kms)
serverRepo.UpsertController(ctx, &store.Controller{
_, err := serverRepo.UpsertController(ctx, &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
require.NoError(t, err)
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
Expand Down
24 changes: 24 additions & 0 deletions internal/daemon/worker/recording/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package recording

// pauseReason type is used by the pause and resume methods of the
// RecordingManager interface to determine when to pause and resume recording.
// The pause() method should be called with a pauseReason type that accurately
// describes the reason why the recording manager is being paused.
// The resume() method should be called with the same pauseReason type only
// when the caller can definitely state that all places that called pause() no
// longer need the recorder manager to be paused.
type pauseReason uint8

const (
// unknown is the default value for PauseReason
// This should not be used as a reason to pause the recording manager.
// This is solely used to for testing purposes.
unknown pauseReason = iota

// localStorageException is used to pause the recording manager when
// there is an exception with the local storage.
localStorageException pauseReason = iota
)
6 changes: 6 additions & 0 deletions internal/daemon/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
event.WriteError(cancelCtx, op, errors.New("worker name and keyId are both empty; at least one is needed to identify a worker"),
event.WithInfoMsg("error making status request to controller"))
}
// If the local storage state is unknown, and we have recording storage set, get the state from the recording storage
// and set it on the worker. This is done once to ensure that the worker has the correct state for the first status
// call.
if w.localStorageState.Load() == server.UnknownLocalStorageState && w.RecordingStorage != nil {
w.localStorageState.Store(w.RecordingStorage.GetLocalStorageState(cancelCtx))
}
versionInfo := version.Get()
connectionState := w.downstreamConnManager.Connected()
result, err := client.Status(statusCtx, &pbs.StatusRequest{
Expand Down
40 changes: 35 additions & 5 deletions internal/daemon/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ var recordingStorageFactory func(

var recorderManagerFactory func(*Worker) (recorderManager, error)

var eventListenerFactory func(*Worker) (event.EventListener, error)

var initializeReverseGrpcClientCollectors = noopInitializePromCollectors

func noopInitializePromCollectors(r prometheus.Registerer) {}
Expand Down Expand Up @@ -142,11 +144,15 @@ type Worker struct {

recorderManager recorderManager

everAuthenticated *ua.Uint32
lastStatusSuccess *atomic.Value
workerStartTime time.Time
operationalState *atomic.Value
localStorageState *atomic.Value
everAuthenticated *ua.Uint32
lastStatusSuccess *atomic.Value
workerStartTime time.Time
operationalState *atomic.Value
// localStorageState is the current state of the local storage.
// The local storage state is updated based on the local storage events.
localStorageState *atomic.Value

storageEventListener event.EventListener
upstreamConnectionState *atomic.Value

controllerMultihopConn *atomic.Value
Expand Down Expand Up @@ -329,6 +335,14 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
// FIXME: This is really ugly, but works.
session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())

if eventListenerFactory != nil {
var err error
w.storageEventListener, err = eventListenerFactory(w)
if err != nil {
return nil, fmt.Errorf("error calling eventListenerFactory: %w", err)
}
}

if recorderManagerFactory != nil {
var err error
w.recorderManager, err = recorderManagerFactory(w)
Expand Down Expand Up @@ -547,6 +561,16 @@ func (w *Worker) Start() error {
return errors.Wrap(w.baseContext, err, op, errors.WithMsg("error starting worker listeners"))
}

if w.storageEventListener != nil {
if err := w.storageEventListener.Start(w.baseContext); err != nil {
return errors.Wrap(w.baseContext, err, op, errors.WithMsg("error starting worker event listener"))
}

if w.RecordingStorage != nil {
w.localStorageState.Store(w.RecordingStorage.GetLocalStorageState(w.baseContext))
}
}

w.operationalState.Store(server.ActiveOperationalState)

// Rather than deal with some of the potential error conditions for Add on
Expand Down Expand Up @@ -688,9 +712,15 @@ func (w *Worker) Shutdown() error {
ar.SetAddresses(nil)
}

err := w.storageEventListener.Shutdown(w.baseContext)
if err != nil {
return fmt.Errorf("error shutting down worker event listener: %w", err)
}

w.started.Store(false)
w.tickerWg.Wait()
recManWg.Wait()

if w.conf.Eventer != nil {
if err := w.conf.Eventer.FlushNodes(context.Background()); err != nil {
return fmt.Errorf("error flushing worker eventer nodes: %w", err)
Expand Down
14 changes: 12 additions & 2 deletions internal/daemon/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"sync"
"testing"
"time"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/hashicorp/boundary/internal/event"
"github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/boundary/internal/util"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/configutil/v2"
"github.com/hashicorp/go-secure-stdlib/listenerutil"
Expand Down Expand Up @@ -152,7 +154,6 @@ func TestWorkerNew(t *testing.T) {
Listeners: []*base.ServerListener{
{Config: &listenerutil.ListenerConfig{Purpose: []string{"proxy"}}},
},
Eventer: &event.Eventer{},
},
RawConfig: &config.Config{
Worker: &config.Worker{
Expand All @@ -178,6 +179,11 @@ func TestWorkerNew(t *testing.T) {
if tt.in.RawConfig == nil {
tt.in.RawConfig = &config.Config{SharedConfig: &configutil.SharedConfig{DisableMlock: true}}
}
if util.IsNil(tt.in.Eventer) {
require.NoError(t, event.InitSysEventer(hclog.Default(), &sync.Mutex{}, "worker_test", event.WithEventerConfig(&event.EventerConfig{})))
defer event.TestResetSystEventer(t)
tt.in.Eventer = event.SysEventer()
}

w, err := New(context.Background(), tt.in)
if tt.expErr {
Expand Down Expand Up @@ -325,14 +331,18 @@ func TestSetupWorkerAuthStorage(t *testing.T) {
}

func Test_Worker_getSessionTls(t *testing.T) {
require.NoError(t, event.InitSysEventer(hclog.Default(), &sync.Mutex{}, "worker_test", event.WithEventerConfig(&event.EventerConfig{})))
defer event.TestResetSystEventer(t)

conf := &Config{
Server: &base.Server{
Listeners: []*base.ServerListener{
{Config: &listenerutil.ListenerConfig{Purpose: []string{"api"}}},
{Config: &listenerutil.ListenerConfig{Purpose: []string{"proxy"}}},
{Config: &listenerutil.ListenerConfig{Purpose: []string{"cluster"}}},
},
Logger: hclog.Default(),
Eventer: event.SysEventer(),
Logger: hclog.Default(),
},
}
conf.RawConfig = &config.Config{SharedConfig: &configutil.SharedConfig{DisableMlock: true}}
Expand Down
1 change: 1 addition & 0 deletions internal/errors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
WorkerNotFoundForRequest = 133 // WorkerNotFoundForRequest represents an error when no appropriate worker is found which meets the conditions required to handle a request
Closed = 134 // Closed represents an error when an operation cannot be completed because the thing being operated on is closed
ChecksumMismatch = 135 // ChecksumMismatch represents an error when a checksum is mismatched
Paused = 136 // Paused represents an error when an operation cannot be completed because the thing being operated on is paused

InvalidListToken Code = 136 // InvalidListToken represents an error where the provided list token is invalid

Expand Down
3 changes: 3 additions & 0 deletions internal/event/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ var (
ErrMaxRetries = errors.New("too many retries")
ErrIo = errors.New("error during io operation")
ErrRecordNotFound = errors.New("record not found")
// ErrInvalidOperation represents an error when an operation cannot be completed
// because the thing being operated on is in an invalid state
ErrInvalidOperation = errors.New("invalid operation")
)
24 changes: 24 additions & 0 deletions internal/event/event_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package event

import (
"context"

"github.com/hashicorp/eventlogger"
)

// EventHandlerFunc is a function that handles an event.
type EventHandlerFunc func(ctx context.Context, e *eventlogger.Event)

// EventListener is an interface for listening to events.
type EventListener interface {
// RegisterEventHandlerFunc registers an event handler function for the given event type.
// A given event type can have multiple event handler functions registered.
RegisterEventHandlerFunc(ctx context.Context, ev Type, ehf EventHandlerFunc) error
// Start starts the event listener.
Start(ctx context.Context) error
// Shutdown stops the event listener.
Shutdown(ctx context.Context) error
}
29 changes: 3 additions & 26 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/fs"

"github.com/hashicorp/boundary/internal/boundary"
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/storagebuckets"
plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin"
)
Expand All @@ -33,6 +34,8 @@ type RecordingStorage interface {
// CreateTemp creates a temporary file that is cleaned up when closed. All temp files
// are also removed when storage is initialized.
CreateTemp(ctx context.Context, p string) (TempFile, error)
// GetLocalStorageState returns the current local storage state of the storage instance.
GetLocalStorageState(ctx context.Context) server.LocalStorageState
}

// Bucket is a resource that represents a bucket in an external object store
Expand Down Expand Up @@ -78,29 +81,3 @@ type Writer interface {
io.Writer
WriteAndClose([]byte) (int, error)
}

// LocalStorageState is represents the state of local storage.
type LocalStorageState string

const (
// AvailableLocalStorageState indicates local storage is
// (minimumAvailableDiskSpace * 1.25) above the minimum disk space threshold.
// This could indicates recovery from a low, critical, or out of disk space
// local storage state.
AvailableLocalStorageState LocalStorageState = "available"
// LowStorageLocalStorageState indicates local storage is below the minimum
// disk space threshold.
LowStorageLocalStorageState LocalStorageState = "low storage"
// CriticallyLowStorageLocalStorageState indicates local storage is below the
// half the minimum disk space threshold.
CriticallyLowStorageLocalStorageState LocalStorageState = "critically low storage"
// OutOfStorageLocalStorageState indicates local storage is below 1MB.
OutOfStorageLocalStorageState LocalStorageState = "out of storage"
// NotConfiguredLocalStorageState indicates the local storage path is not
// configured. Intervention from an admin might be necessary to help resolve
// the issue.
NotConfiguredLocalStorageState LocalStorageState = "not configured"
// UnknownLocalStorageState is the default state for local storage. It indicates
// the local storage state is unknown.
UnknownLocalStorageState LocalStorageState = "unknown"
)

0 comments on commit 3f94c90

Please sign in to comment.