Skip to content

Commit

Permalink
enhance: Remove QueryCoord's scheduling of L0 segments
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Feb 5, 2025
1 parent f0b7446 commit 96156dc
Show file tree
Hide file tree
Showing 27 changed files with 459 additions and 593 deletions.
7 changes: 0 additions & 7 deletions internal/querycoordv2/checkers/index_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/indexpb"
"github.com/milvus-io/milvus/pkg/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -120,12 +119,6 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
continue
}

// skip update index for l0 segment
segmentInTarget := c.targetMgr.GetSealedSegment(ctx, collection.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst)
if segmentInTarget == nil || segmentInTarget.GetLevel() == datapb.SegmentLevel_L0 {
continue
}

missing := c.checkSegment(segment, indexInfos)
if len(missing) > 0 {
targets[segment.GetID()] = missing
Expand Down
9 changes: 2 additions & 7 deletions internal/querycoordv2/checkers/leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
)

var _ Checker = (*LeaderChecker)(nil)
Expand Down Expand Up @@ -164,10 +163,7 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
latestNodeDist := utils.FindMaxVersionSegments(dist)
for _, s := range latestNodeDist {
segment := c.target.GetSealedSegment(ctx, leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
// shouldn't set l0 segment location to delegator. l0 segment should be reload in delegator
if !existInTarget || isL0Segment {
if segment == nil {
continue
}

Expand Down Expand Up @@ -218,8 +214,7 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica *me
_, ok := distMap[sid]
segment := c.target.GetSealedSegment(ctx, leaderView.CollectionID, sid, meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
if ok || existInTarget || isL0Segment {
if ok || existInTarget {
continue
}
log.Debug("leader checker append a segment to remove",
Expand Down
47 changes: 0 additions & 47 deletions internal/querycoordv2/checkers/leader_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,29 +162,6 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).GetLeaderID(), node2)
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// test skip sync l0 segment
segments = []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(ctx, int64(1))
observer.target.UpdateCollectionCurrentTarget(ctx, 1)
// mock l0 segment exist on non delegator node, doesn't set to leader view
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, loadVersion, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}

func (suite *LeaderCheckerTestSuite) TestActivation() {
Expand Down Expand Up @@ -423,30 +400,6 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// skip sync l0 segments
segments := []*datapb.SegmentInfo{
{
ID: 3,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)

observer.target.UpdateCollectionNextTarget(ctx, int64(1))
observer.target.UpdateCollectionCurrentTarget(ctx, 1)

observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 1}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)

tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}

func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() {
Expand Down
68 changes: 1 addition & 67 deletions internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sort"
"time"

"github.com/blang/semver/v4"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -245,37 +244,8 @@ func (c *SegmentChecker) getSealedSegmentDiff(
distMap[s.GetID()] = s.Node
}

versionRangeFilter := semver.MustParseRange(">2.3.x")
checkLeaderVersion := func(leader *meta.LeaderView, segmentID int64) bool {
// if current shard leader's node version < 2.4, skip load L0 segment
info := c.nodeMgr.Get(leader.ID)
if info != nil && !versionRangeFilter(info.Version()) {
log.Warn("l0 segment is not supported in current node version, skip it",
zap.Int64("collection", replica.GetCollectionID()),
zap.Int64("segmentID", segmentID),
zap.String("channel", leader.Channel),
zap.Int64("leaderID", leader.ID),
zap.String("nodeVersion", info.Version().String()))
return false
}
return true
}

isSegmentLack := func(segment *datapb.SegmentInfo) bool {
node, existInDist := distMap[segment.ID]

if segment.GetLevel() == datapb.SegmentLevel_L0 {
// the L0 segments have to been in the same node as the channel watched
leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))

// if the leader node's version doesn't match load l0 segment's requirement, skip it
if leader != nil && checkLeaderVersion(leader, segment.ID) {
l0WithWrongLocation := node != leader.ID
return !existInDist || l0WithWrongLocation
}
return false
}

_, existInDist := distMap[segment.ID]
return !existInDist
}

Expand All @@ -290,18 +260,6 @@ func (c *SegmentChecker) getSealedSegmentDiff(
}
}

// l0 Segment which exist on current target, but not on dist
for _, segment := range currentTargetMap {
// to avoid generate duplicate segment task
if nextTargetMap[segment.ID] != nil {
continue
}

if isSegmentLack(segment) {
toLoad = append(toLoad, segment)
}
}

// get segment which exist on dist, but not on current target and next target
for _, segment := range dist {
_, existOnCurrent := currentTargetMap[segment.GetID()]
Expand All @@ -313,16 +271,6 @@ func (c *SegmentChecker) getSealedSegmentDiff(
}
}

level0Segments := lo.Filter(toLoad, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetLevel() == datapb.SegmentLevel_L0
})
// L0 segment found,
// QueryCoord loads the L0 segments first,
// to make sure all L0 delta logs will be delivered to the other segments.
if len(level0Segments) > 0 {
toLoad = level0Segments
}

return
}

Expand All @@ -336,14 +284,6 @@ func (c *SegmentChecker) findRepeatedSealedSegments(ctx context.Context, replica
dist := c.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithReplica(replica))
versions := make(map[int64]*meta.Segment)
for _, s := range dist {
// l0 segment should be release with channel together
segment := c.targetMgr.GetSealedSegment(ctx, s.GetCollectionID(), s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
if isL0Segment {
continue
}

maxVer, ok := versions[s.GetID()]
if !ok {
versions[s.GetID()] = s
Expand Down Expand Up @@ -408,7 +348,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
return nil
}

isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0
shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string {
return s.GetInsertChannel()
})
Expand All @@ -426,11 +365,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
rwNodes = replica.GetRWNodes()
}

// L0 segment can only be assign to shard leader's node
if isLevel0 {
rwNodes = []int64{leader.ID}
}

segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment {
return &meta.Segment{
SegmentInfo: s,
Expand Down
Loading

0 comments on commit 96156dc

Please sign in to comment.