Skip to content

Commit

Permalink
fix: ignore invalid NTP responses
Browse files Browse the repository at this point in the history
Due to the bug introduced when refactoring for PTP devices, invalid NTP
responses (including for example NTP kiss of death), were incorrectly
handled when only a single NTP server was used.

The error was logged, but the response was used to adjust the time which
leads to unexpected time jumps.

Properly ignore any invalid NTP response.

Signed-off-by: Andrey Smirnov <[email protected]>
(cherry picked from commit d4a6d01)
  • Loading branch information
smira committed Sep 25, 2024
1 parent 28b81b2 commit e53eff9
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 35 deletions.
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ require (
sigs.k8s.io/yaml v1.4.0
)

require github.com/containerd/containerd/api v1.7.19
require (
github.com/containerd/containerd/api v1.7.19
github.com/containerd/errdefs v0.1.0
github.com/containerd/platforms v0.2.1
)

require (
cloud.google.com/go/compute v1.24.0 // indirect
Expand Down Expand Up @@ -215,11 +219,9 @@ require (
github.com/cilium/ebpf v0.12.3 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
github.com/containerd/continuity v0.4.2 // indirect
github.com/containerd/errdefs v0.1.0 // indirect
github.com/containerd/fifo v1.1.0 // indirect
github.com/containerd/go-cni v1.1.9 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.14.3 // indirect
github.com/containerd/ttrpc v1.2.5 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"context"

containerdapi "github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
criconstants "github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/platforms"
"github.com/containerd/errdefs"
"github.com/containerd/platforms"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down
18 changes: 9 additions & 9 deletions internal/app/machined/pkg/controllers/runtime/cri_image_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/reference/docker"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/distribution/reference"
"github.com/siderolabs/gen/optional"
"github.com/siderolabs/gen/xslices"
"go.uber.org/zap"
Expand Down Expand Up @@ -195,8 +195,8 @@ func (ctrl *CRIImageGCController) Run(ctx context.Context, r controller.Runtime,
func buildExpectedImageNames(logger *zap.Logger, actualImages []images.Image, expectedImages []string) (map[string]struct{}, error) {
var parseErrors []error

expectedReferences := xslices.Map(expectedImages, func(ref string) docker.Named {
res, parseErr := docker.ParseNamed(ref)
expectedReferences := xslices.Map(expectedImages, func(ref string) reference.Named {
res, parseErr := reference.ParseNamed(ref)

parseErrors = append(parseErrors, parseErr)

Expand All @@ -210,7 +210,7 @@ func buildExpectedImageNames(logger *zap.Logger, actualImages []images.Image, ex
expectedImageNames := map[string]struct{}{}

for _, image := range actualImages {
imageRef, err := docker.ParseAnyReference(image.Name)
imageRef, err := reference.ParseAnyReference(image.Name)
if err != nil {
logger.Debug("failed to parse image reference", zap.Error(err), zap.String("image", image.Name))

Expand All @@ -220,32 +220,32 @@ func buildExpectedImageNames(logger *zap.Logger, actualImages []images.Image, ex
digest := image.Target.Digest.String()

switch ref := imageRef.(type) {
case docker.NamedTagged:
case reference.NamedTagged:
for _, expectedRef := range expectedReferences {
if expectedRef.Name() != ref.Name() {
continue
}

if expectedTagged, ok := expectedRef.(docker.Tagged); ok && ref.Tag() == expectedTagged.Tag() {
if expectedTagged, ok := expectedRef.(reference.Tagged); ok && ref.Tag() == expectedTagged.Tag() {
// this is expected image by tag, inject other forms of the ref
expectedImageNames[digest] = struct{}{}
expectedImageNames[expectedRef.Name()+":"+expectedTagged.Tag()] = struct{}{}
expectedImageNames[expectedRef.Name()+"@"+digest] = struct{}{}
}
}
case docker.Canonical:
case reference.Canonical:
for _, expectedRef := range expectedReferences {
if expectedRef.Name() != ref.Name() {
continue
}

if expectedDigested, ok := expectedRef.(docker.Digested); ok && ref.Digest() == expectedDigested.Digest() {
if expectedDigested, ok := expectedRef.(reference.Digested); ok && ref.Digest() == expectedDigested.Digest() {
// this is expected image by digest, inject other forms of the ref
expectedImageNames[digest] = struct{}{}
expectedImageNames[expectedRef.Name()+"@"+digest] = struct{}{}

// if the image is also tagged, inject the tagged version of it
if expectedTagged, ok := expectedRef.(docker.Tagged); ok {
if expectedTagged, ok := expectedRef.(reference.Tagged); ok {
expectedImageNames[expectedRef.Name()+":"+expectedTagged.Tag()] = struct{}{}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/contrib/seccomp"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/containerd/errdefs"

"github.com/siderolabs/talos/internal/app/machined/pkg/system/events"
"github.com/siderolabs/talos/internal/app/machined/pkg/system/runner"
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/containers/containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
v2 "github.com/containerd/cgroups/v3/cgroup2/stats"
"github.com/containerd/containerd"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/errdefs"
"github.com/containerd/typeurl/v2"
"github.com/hashicorp/go-multierror"

Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/containers/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/reference/docker"
"github.com/containerd/errdefs"
"github.com/distribution/reference"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/siderolabs/go-retry/retry"

Expand Down Expand Up @@ -64,7 +64,7 @@ func Pull(ctx context.Context, reg config.Registries, client *containerd.Client,
o(&opts)
}

namedRef, err := docker.ParseDockerRef(ref)
namedRef, err := reference.ParseDockerRef(ref)
if err != nil {
return nil, fmt.Errorf("failed to parse image reference %q: %w", ref, err)
}
Expand Down Expand Up @@ -123,17 +123,17 @@ func Pull(ctx context.Context, reg config.Registries, client *containerd.Client,
return img, nil
}

func manageAliases(ctx context.Context, client *containerd.Client, namedRef docker.Named, img containerd.Image) error {
func manageAliases(ctx context.Context, client *containerd.Client, namedRef reference.Named, img containerd.Image) error {
// re-tag pulled image
imageDigest := img.Target().Digest.String()

refs := []string{imageDigest}

if _, ok := namedRef.(docker.NamedTagged); ok {
if _, ok := namedRef.(reference.NamedTagged); ok {
refs = append(refs, namedRef.String())
}

if _, ok := namedRef.(docker.Canonical); ok {
if _, ok := namedRef.(reference.Canonical); ok {
refs = append(refs, namedRef.String())
} else {
refs = append(refs, namedRef.Name()+"@"+imageDigest)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (

"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/containerd/errdefs"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/siderolabs/go-kmsg"
"github.com/siderolabs/go-procfs/procfs"
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/install/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/containerd/errdefs"

"github.com/siderolabs/talos/internal/pkg/containers/image"
"github.com/siderolabs/talos/pkg/machinery/config/config"
Expand Down
15 changes: 6 additions & 9 deletions internal/pkg/ntp/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,15 @@ func (syncer *Syncer) queryNTP(server string) (*Measurement, error) {
)

validationError := resp.Validate()
if validationError != nil {
return nil, validationError
}

measurement := &Measurement{
return &Measurement{
ClockOffset: resp.ClockOffset,
Leap: resp.Leap,
Spike: false,
}

if validationError == nil {
measurement.Spike = syncer.isSpike(resp)
}

return measurement, validationError
Spike: syncer.isSpike(resp),
}, nil
}

// log2i returns 0 for v == 0 and v == 1.
Expand Down
82 changes: 80 additions & 2 deletions internal/pkg/ntp/ntp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type NTPSuite struct {
systemClock time.Time
clockAdjustments []time.Duration

failingServer int
spikyServer int
failingServer int
spikyServer int
kissOfDeathServer int
}

func TestNTPSuite(t *testing.T) {
Expand All @@ -49,6 +50,8 @@ func (suite *NTPSuite) SetupTest() {
suite.systemClock = time.Now().UTC()
suite.clockAdjustments = nil
suite.failingServer = 0
suite.spikyServer = 0
suite.kissOfDeathServer = 0
}

func (suite *NTPSuite) getSystemClock() time.Time {
Expand All @@ -73,6 +76,7 @@ func (suite *NTPSuite) adjustSystemClock(val *unix.Timex) (status timex.State, e
return
}

//nolint:gocyclo
func (suite *NTPSuite) fakeQuery(host string) (resp *beevikntp.Response, err error) {
switch host {
case "127.0.0.1": // error
Expand Down Expand Up @@ -161,6 +165,26 @@ func (suite *NTPSuite) fakeQuery(host string) (resp *beevikntp.Response, err err
suite.Require().NoError(resp.Validate())

return resp, nil
case "127.0.0.8": // kiss of death alternating
suite.kissOfDeathServer++

if suite.kissOfDeathServer%2 == 1 {
return &beevikntp.Response{ // kiss of death
Stratum: 0,
Time: suite.systemClock,
ReferenceTime: suite.systemClock,
ClockOffset: 2 * time.Millisecond,
RTT: time.Millisecond / 2,
}, nil
} else {
return &beevikntp.Response{ // normal response
Stratum: 1,
Time: suite.systemClock,
ReferenceTime: suite.systemClock,
ClockOffset: time.Millisecond,
RTT: time.Millisecond / 2,
}, nil
}
default:
return nil, fmt.Errorf("unknown host %q", host)
}
Expand Down Expand Up @@ -243,6 +267,60 @@ func (suite *NTPSuite) TestSyncContinuous() {
wg.Wait()
}

//nolint:dupl
func (suite *NTPSuite) TestSyncKissOfDeath() {
syncer := ntp.NewSyncer(logging.Wrap(log.Writer()).With(zap.String("controller", "ntp")), []string{"127.0.0.8"})

syncer.AdjustTime = suite.adjustSystemClock
syncer.CurrentTime = suite.getSystemClock
syncer.NTPQuery = suite.fakeQuery

syncer.MinPoll = time.Second
syncer.MaxPoll = time.Second

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup

wg.Add(1)

go func() {
defer wg.Done()

syncer.Run(ctx)
}()

select {
case <-syncer.Synced():
case <-time.After(10 * time.Second):
suite.Assert().Fail("time sync timeout")
}

suite.Assert().NoError(
retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
suite.clockLock.Lock()
defer suite.clockLock.Unlock()

if len(suite.clockAdjustments) < 2 {
return retry.ExpectedErrorf("not enough syncs")
}

for _, adj := range suite.clockAdjustments {
// kiss of death syncs should be ignored
suite.Assert().Equal(time.Millisecond, adj)
}

return nil
}),
)

cancel()

wg.Wait()
}

//nolint:dupl
func (suite *NTPSuite) TestSyncWithSpikes() {
syncer := ntp.NewSyncer(logging.Wrap(log.Writer()).With(zap.String("controller", "ntp")), []string{"127.0.0.7"})

Expand Down

0 comments on commit e53eff9

Please sign in to comment.