Skip to content

Commit a57586c

Browse files
authored
Merge branch 'master' into fix_standalone_validation_branch
2 parents f865a37 + 9cd87e2 commit a57586c

File tree

8 files changed

+672
-596
lines changed

8 files changed

+672
-596
lines changed

pkg/actors/actors.go

+40-19
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ import (
3434

3535
"github.com/dapr/components-contrib/state"
3636
actorerrors "github.com/dapr/dapr/pkg/actors/errors"
37+
"github.com/dapr/dapr/pkg/actors/health"
3738
"github.com/dapr/dapr/pkg/actors/internal"
3839
"github.com/dapr/dapr/pkg/actors/timers"
3940
"github.com/dapr/dapr/pkg/channel"
4041
configuration "github.com/dapr/dapr/pkg/config"
4142
diag "github.com/dapr/dapr/pkg/diagnostics"
4243
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
43-
"github.com/dapr/dapr/pkg/health"
4444
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
4545
"github.com/dapr/dapr/pkg/modes"
4646
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
@@ -122,6 +122,7 @@ type actorsRuntime struct {
122122
internalActors map[string]InternalActor
123123
internalActorChannel *internalActorChannel
124124
sec security.Handler
125+
checker *health.Checker
125126
wg sync.WaitGroup
126127
closed atomic.Bool
127128
closeCh chan struct{}
@@ -178,12 +179,17 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) (ActorRuntime,
178179

179180
// Init reminders and placement
180181
providerOpts := internal.ActorsProviderOptions{
181-
Config: a.actorsConfig.Config,
182-
Security: a.sec,
183-
AppHealthFn: a.getAppHealthCheckChan,
184-
Clock: a.clock,
185-
APILevel: &a.apiLevel,
186-
Resiliency: a.resiliency,
182+
Config: a.actorsConfig.Config,
183+
Security: a.sec,
184+
AppHealthFn: func(ctx context.Context) <-chan bool {
185+
if a.checker == nil {
186+
return nil
187+
}
188+
return a.checker.HealthChannel()
189+
},
190+
Clock: a.clock,
191+
APILevel: &a.apiLevel,
192+
Resiliency: a.resiliency,
187193
}
188194

189195
// Initialize the placement client if we don't have a mocked one already
@@ -269,6 +275,19 @@ func (a *actorsRuntime) Init(ctx context.Context) (err error) {
269275
a.actorsReminders.OnPlacementTablesUpdated(ctx)
270276
})
271277

278+
a.checker, err = a.getAppHealthChecker()
279+
if err != nil {
280+
return fmt.Errorf("actors: couldn't create health check: %w", err)
281+
}
282+
283+
if a.checker != nil {
284+
a.wg.Add(1)
285+
go func() {
286+
defer a.wg.Done()
287+
a.checker.Run(ctx)
288+
}()
289+
}
290+
272291
for _, actorType := range hat {
273292
err = a.placement.AddHostedActorType(actorType, a.actorsConfig.GetIdleTimeoutForType(actorType))
274293
if err != nil {
@@ -294,26 +313,27 @@ func (a *actorsRuntime) Init(ctx context.Context) (err error) {
294313
return nil
295314
}
296315

297-
func (a *actorsRuntime) getAppHealthCheckChan(ctx context.Context) <-chan bool {
316+
func (a *actorsRuntime) getAppHealthChecker() (*health.Checker, error) {
298317
if len(a.actorsConfig.Config.HostedActorTypes.ListActorTypes()) == 0 || a.appChannel == nil {
299-
return nil
318+
return nil, nil
300319
}
301320

302321
// Be careful to configure healthz endpoint option. If app healthz returns unhealthy status, Dapr will
303322
// disconnect from placement to remove the node from consistent hashing ring.
304323
// i.e if app is busy state, the healthz status would be flaky, which leads to frequent
305324
// actor rebalancing. It will impact the entire service.
306-
return a.getAppHealthCheckChanWithOptions(ctx,
325+
return a.getAppHealthCheckerWithOptions(
307326
health.WithFailureThreshold(4),
308-
health.WithInterval(5*time.Second),
327+
health.WithHealthyStateInterval(5*time.Second),
328+
health.WithUnHealthyStateInterval(time.Second/2),
309329
health.WithRequestTimeout(2*time.Second),
310330
health.WithHTTPClient(a.actorsConfig.HealthHTTPClient),
311331
)
312332
}
313333

314-
func (a *actorsRuntime) getAppHealthCheckChanWithOptions(ctx context.Context, opts ...health.Option) <-chan bool {
334+
func (a *actorsRuntime) getAppHealthCheckerWithOptions(opts ...health.Option) (*health.Checker, error) {
315335
opts = append(opts, health.WithAddress(a.actorsConfig.HealthEndpoint+"/healthz"))
316-
return health.StartEndpointHealthCheck(ctx, opts...)
336+
return health.New(opts...)
317337
}
318338

319339
func constructCompositeKey(keys ...string) string {
@@ -1100,19 +1120,20 @@ func isInternalActor(actorType string) bool {
11001120
func (a *actorsRuntime) Close() error {
11011121
defer a.wg.Wait()
11021122

1123+
var errs []error
11031124
if a.closed.CompareAndSwap(false, true) {
1104-
defer close(a.closeCh)
1105-
errs := []error{}
1125+
defer func() { close(a.closeCh) }()
1126+
if a.checker != nil {
1127+
a.checker.Close()
1128+
}
11061129
if a.placement != nil {
1107-
err := a.placement.Close()
1108-
if err != nil {
1130+
if err := a.placement.Close(); err != nil {
11091131
errs = append(errs, fmt.Errorf("failed to close placement service: %w", err))
11101132
}
11111133
}
1112-
return errors.Join(errs...)
11131134
}
11141135

1115-
return nil
1136+
return errors.Join(errs...)
11161137
}
11171138

11181139
// ValidateHostEnvironment validates that actors can be initialized properly given a set of parameters

pkg/actors/actors_test.go

+55-12
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ import (
1717
"context"
1818
"encoding/json"
1919
"fmt"
20+
"net/http"
21+
"net/http/httptest"
2022
"strconv"
2123
"strings"
24+
"sync"
2225
"sync/atomic"
2326
"testing"
2427
"time"
@@ -31,11 +34,11 @@ import (
3134
clocktesting "k8s.io/utils/clock/testing"
3235

3336
"github.com/dapr/components-contrib/state"
37+
"github.com/dapr/dapr/pkg/actors/health"
3438
"github.com/dapr/dapr/pkg/actors/internal"
3539
"github.com/dapr/dapr/pkg/apis/resiliency/v1alpha1"
3640
"github.com/dapr/dapr/pkg/channel"
3741
"github.com/dapr/dapr/pkg/config"
38-
"github.com/dapr/dapr/pkg/health"
3942
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
4043
"github.com/dapr/dapr/pkg/modes"
4144
internalsv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
@@ -1031,22 +1034,45 @@ func TestActorsAppHealthCheck(t *testing.T) {
10311034
defer testActorsRuntime.Close()
10321035
clock := testActorsRuntime.clock.(*clocktesting.FakeClock)
10331036

1037+
var i int
1038+
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1039+
if i == 0 {
1040+
w.WriteHeader(http.StatusOK)
1041+
} else {
1042+
w.WriteHeader(http.StatusInternalServerError)
1043+
}
1044+
i++
1045+
}))
1046+
t.Cleanup(testServer.Close)
1047+
10341048
testActorsRuntime.actorsConfig.Config.HostedActorTypes = internal.NewHostedActors([]string{"actor1"})
1049+
testActorsRuntime.actorsConfig.HealthEndpoint = testServer.URL
10351050
ctx, cancel := context.WithCancel(context.Background())
1036-
defer cancel()
1051+
t.Cleanup(cancel)
10371052

10381053
opts := []health.Option{
10391054
health.WithClock(clock),
10401055
health.WithFailureThreshold(1),
1041-
health.WithInterval(1 * time.Second),
1056+
health.WithHealthyStateInterval(2 * time.Second),
1057+
health.WithUnHealthyStateInterval(2 * time.Second),
10421058
health.WithRequestTimeout(100 * time.Millisecond),
10431059
}
1044-
closingCh := make(chan struct{})
1060+
var wg sync.WaitGroup
1061+
1062+
checker, err := testActorsRuntime.getAppHealthCheckerWithOptions(opts...)
1063+
require.NoError(t, err)
1064+
1065+
wg.Add(2)
1066+
go func() {
1067+
defer wg.Done()
1068+
checker.Run(ctx)
1069+
}()
1070+
10451071
healthy := atomic.Bool{}
10461072
healthy.Store(true)
10471073
go func() {
1048-
defer close(closingCh)
1049-
for v := range testActorsRuntime.getAppHealthCheckChanWithOptions(ctx, opts...) {
1074+
defer wg.Done()
1075+
for v := range checker.HealthChannel() {
10501076
healthy.Store(v)
10511077
}
10521078
}()
@@ -1060,7 +1086,7 @@ func TestActorsAppHealthCheck(t *testing.T) {
10601086

10611087
// Cancel now which should cause the shutdown
10621088
cancel()
1063-
<-closingCh
1089+
wg.Wait()
10641090
}
10651091
}
10661092

@@ -1073,23 +1099,40 @@ func TestActorsAppHealthCheck(t *testing.T) {
10731099
defer testActorsRuntime.Close()
10741100
clock := testActorsRuntime.clock.(*clocktesting.FakeClock)
10751101

1102+
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1103+
w.WriteHeader(http.StatusOK)
1104+
}))
1105+
t.Cleanup(testServer.Close)
1106+
10761107
testActorsRuntime.actorsConfig.HostedActorTypes = internal.NewHostedActors([]string{})
1108+
testActorsRuntime.actorsConfig.HealthEndpoint = testServer.URL
10771109
ctx, cancel := context.WithCancel(context.Background())
10781110
defer cancel()
10791111

10801112
opts := []health.Option{
10811113
health.WithClock(clock),
10821114
health.WithFailureThreshold(1),
1083-
health.WithInterval(1 * time.Second),
1115+
health.WithHealthyStateInterval(2 * time.Second),
1116+
health.WithUnHealthyStateInterval(2 * time.Second),
10841117
health.WithRequestTimeout(100 * time.Millisecond),
10851118
}
10861119

1087-
closingCh := make(chan struct{})
1120+
var wg sync.WaitGroup
1121+
checker, err := testActorsRuntime.getAppHealthCheckerWithOptions(opts...)
1122+
require.NoError(t, err)
1123+
1124+
wg.Add(2)
1125+
go func() {
1126+
defer wg.Done()
1127+
checker.Run(ctx)
1128+
}()
1129+
10881130
healthy := atomic.Bool{}
10891131
healthy.Store(true)
1132+
10901133
go func() {
1091-
defer close(closingCh)
1092-
for v := range testActorsRuntime.getAppHealthCheckChanWithOptions(ctx, opts...) {
1134+
defer wg.Done()
1135+
for v := range checker.HealthChannel() {
10931136
healthy.Store(v)
10941137
}
10951138
}()
@@ -1100,7 +1143,7 @@ func TestActorsAppHealthCheck(t *testing.T) {
11001143

11011144
// Cancel now which should cause the shutdown
11021145
cancel()
1103-
<-closingCh
1146+
wg.Wait()
11041147
})
11051148
}
11061149

0 commit comments

Comments
 (0)