Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Remove QueryCoord's scheduling of L0 segments #39552

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions internal/querycoordv2/checkers/index_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/indexpb"
"github.com/milvus-io/milvus/pkg/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -120,12 +119,6 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
continue
}

// skip update index for l0 segment
segmentInTarget := c.targetMgr.GetSealedSegment(ctx, collection.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst)
if segmentInTarget == nil || segmentInTarget.GetLevel() == datapb.SegmentLevel_L0 {
continue
}

missing := c.checkSegment(segment, indexInfos)
if len(missing) > 0 {
targets[segment.GetID()] = missing
Expand Down
9 changes: 2 additions & 7 deletions internal/querycoordv2/checkers/leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
)

var _ Checker = (*LeaderChecker)(nil)
Expand Down Expand Up @@ -164,10 +163,7 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
latestNodeDist := utils.FindMaxVersionSegments(dist)
for _, s := range latestNodeDist {
segment := c.target.GetSealedSegment(ctx, leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
// shouldn't set l0 segment location to delegator. l0 segment should be reload in delegator
if !existInTarget || isL0Segment {
if segment == nil {
continue
}

Expand Down Expand Up @@ -218,8 +214,7 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica *me
_, ok := distMap[sid]
segment := c.target.GetSealedSegment(ctx, leaderView.CollectionID, sid, meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
if ok || existInTarget || isL0Segment {
if ok || existInTarget {
continue
}
log.Debug("leader checker append a segment to remove",
Expand Down
47 changes: 0 additions & 47 deletions internal/querycoordv2/checkers/leader_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,29 +162,6 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).GetLeaderID(), node2)
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// test skip sync l0 segment
segments = []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(ctx, int64(1))
observer.target.UpdateCollectionCurrentTarget(ctx, 1)
// mock l0 segment exist on non delegator node, doesn't set to leader view
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, loadVersion, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}

func (suite *LeaderCheckerTestSuite) TestActivation() {
Expand Down Expand Up @@ -423,30 +400,6 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// skip sync l0 segments
segments := []*datapb.SegmentInfo{
{
ID: 3,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)

observer.target.UpdateCollectionNextTarget(ctx, int64(1))
observer.target.UpdateCollectionCurrentTarget(ctx, 1)

observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 1}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)

tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}

func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() {
Expand Down
68 changes: 1 addition & 67 deletions internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sort"
"time"

"github.com/blang/semver/v4"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -245,37 +244,8 @@ func (c *SegmentChecker) getSealedSegmentDiff(
distMap[s.GetID()] = s.Node
}

versionRangeFilter := semver.MustParseRange(">2.3.x")
checkLeaderVersion := func(leader *meta.LeaderView, segmentID int64) bool {
// if current shard leader's node version < 2.4, skip load L0 segment
info := c.nodeMgr.Get(leader.ID)
if info != nil && !versionRangeFilter(info.Version()) {
log.Warn("l0 segment is not supported in current node version, skip it",
zap.Int64("collection", replica.GetCollectionID()),
zap.Int64("segmentID", segmentID),
zap.String("channel", leader.Channel),
zap.Int64("leaderID", leader.ID),
zap.String("nodeVersion", info.Version().String()))
return false
}
return true
}

isSegmentLack := func(segment *datapb.SegmentInfo) bool {
node, existInDist := distMap[segment.ID]

if segment.GetLevel() == datapb.SegmentLevel_L0 {
// the L0 segments have to been in the same node as the channel watched
leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))

// if the leader node's version doesn't match load l0 segment's requirement, skip it
if leader != nil && checkLeaderVersion(leader, segment.ID) {
l0WithWrongLocation := node != leader.ID
return !existInDist || l0WithWrongLocation
}
return false
}

_, existInDist := distMap[segment.ID]
return !existInDist
}

Expand All @@ -290,18 +260,6 @@ func (c *SegmentChecker) getSealedSegmentDiff(
}
}

// l0 Segment which exist on current target, but not on dist
for _, segment := range currentTargetMap {
// to avoid generate duplicate segment task
if nextTargetMap[segment.ID] != nil {
continue
}

if isSegmentLack(segment) {
toLoad = append(toLoad, segment)
}
}

// get segment which exist on dist, but not on current target and next target
for _, segment := range dist {
_, existOnCurrent := currentTargetMap[segment.GetID()]
Expand All @@ -313,16 +271,6 @@ func (c *SegmentChecker) getSealedSegmentDiff(
}
}

level0Segments := lo.Filter(toLoad, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetLevel() == datapb.SegmentLevel_L0
})
// L0 segment found,
// QueryCoord loads the L0 segments first,
// to make sure all L0 delta logs will be delivered to the other segments.
if len(level0Segments) > 0 {
toLoad = level0Segments
}

return
}

Expand All @@ -336,14 +284,6 @@ func (c *SegmentChecker) findRepeatedSealedSegments(ctx context.Context, replica
dist := c.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithReplica(replica))
versions := make(map[int64]*meta.Segment)
for _, s := range dist {
// l0 segment should be release with channel together
segment := c.targetMgr.GetSealedSegment(ctx, s.GetCollectionID(), s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
if isL0Segment {
continue
}

maxVer, ok := versions[s.GetID()]
if !ok {
versions[s.GetID()] = s
Expand Down Expand Up @@ -408,7 +348,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
return nil
}

isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0
shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string {
return s.GetInsertChannel()
})
Expand All @@ -426,11 +365,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
rwNodes = replica.GetRWNodes()
}

// L0 segment can only be assign to shard leader's node
if isLevel0 {
rwNodes = []int64{leader.ID}
}

segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment {
return &meta.Segment{
SegmentInfo: s,
Expand Down
Loading
Loading