Skip to content

Commit

Permalink
enhance: Remove unnecessary collection and partition label from the m…
Browse files Browse the repository at this point in the history
…etrics (#39536)

/kind improvement

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Feb 5, 2025
1 parent b1cee78 commit f0b7446
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 40 deletions.
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (t *clusteringCompactionTask) Process() bool {
lastStateDuration := ts - t.GetTaskProto().GetLastStateStartTime()
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration * 1000))
updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)}

Expand All @@ -117,7 +117,7 @@ func (t *clusteringCompactionTask) Process() bool {
elapse := ts - t.GetTaskProto().StartTime
log.Info("clustering compaction task total elapse", zap.Duration("costs", time.Duration(elapse)*time.Second))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
Observe(float64(elapse * 1000))
}
err = t.updateAndSaveTaskMeta(updateOps...)
Expand Down
21 changes: 19 additions & 2 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (c *importChecker) Start() {
for collID, collJobs := range jobsByColl {
c.checkCollection(collID, collJobs)
}
c.LogStats()
c.LogJobStats(jobs)
c.LogTaskStats()
}
}
}
Expand All @@ -125,7 +126,23 @@ func (c *importChecker) Close() {
})
}

func (c *importChecker) LogStats() {
func (c *importChecker) LogJobStats(jobs []ImportJob) {
byState := lo.GroupBy(jobs, func(job ImportJob) string {
return job.GetState().String()
})
stateNum := make(map[string]int)
for state := range internalpb.ImportJobState_value {
if state == internalpb.ImportJobState_None.String() {
continue
}
num := len(byState[state])
stateNum[state] = num
metrics.ImportJobs.WithLabelValues(state).Set(float64(num))
}
log.Info("import job stats", zap.Any("stateNum", stateNum))
}

func (c *importChecker) LogTaskStats() {
logFunc := func(tasks []ImportTask, taskType TaskType) {
byState := lo.GroupBy(tasks, func(t ImportTask) datapb.ImportTaskStateV2 {
return t.GetState()
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/import_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
err = s.imeta.AddTask(context.TODO(), it1)
s.NoError(err)

s.checker.LogStats()
s.checker.LogTaskStats()
}

func (s *ImportCheckerSuite) TestCheckJob() {
Expand Down
2 changes: 0 additions & 2 deletions internal/querynodev2/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
collections[segment.Collection()],
nodeID,
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segments.SegmentTypeGrowing.String(),
).Set(float64(numEntities))
}
Expand Down Expand Up @@ -136,7 +135,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
collections[segment.Collection()],
nodeID,
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segments.SegmentTypeSealed.String(),
).Set(float64(numEntities))
}
Expand Down
4 changes: 0 additions & 4 deletions internal/querynodev2/segments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,7 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
segment.Level().String(),
).Inc()
}
Expand Down Expand Up @@ -707,9 +705,7 @@ func (mgr *segmentManager) release(ctx context.Context, segment Segment) {
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
segment.Level().String(),
).Dec()

Expand Down
2 changes: 0 additions & 2 deletions internal/querynodev2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,8 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
l0Segment := segments.NewMockSegment(suite.T())
l0Segment.EXPECT().ID().Return(10000)
l0Segment.EXPECT().Collection().Return(suite.collectionID)
l0Segment.EXPECT().Partition().Return(common.AllPartitionsID)
l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0)
l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed)
l0Segment.EXPECT().Indexes().Return(nil)
l0Segment.EXPECT().Shard().Return(suite.channel)
l0Segment.EXPECT().Release(ctx).Return()

Expand Down
9 changes: 8 additions & 1 deletion pkg/metrics/datacoord_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ var (
Buckets: longTaskBuckets,
}, []string{
isVectorFieldLabelName,
collectionIDLabelName,
channelNameLabelName,
compactionTypeLabelName,
stageLabelName,
Expand Down Expand Up @@ -327,6 +326,14 @@ var (
Help: "number of IndexNodes managed by IndexCoord",
}, []string{})

ImportJobs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "import_jobs",
Help: "the import jobs grouping by state",
}, []string{"import_state"})

ImportTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Expand Down
21 changes: 0 additions & 21 deletions pkg/metrics/datanode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,6 @@ var (
collectionIDLabelName,
})

DataNodeProduceTimeTickLag = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "produce_tt_lag_ms",
Help: "now time minus tt pts per physical channel",
}, []string{
nodeIDLabelName,
collectionIDLabelName,
channelNameLabelName,
})

DataNodeConsumeMsgCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Expand Down Expand Up @@ -269,7 +257,6 @@ func RegisterDataNode(registry *prometheus.Registry) {
// deprecated metrics
registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken)
registry.MustRegister(DataNodeNumProducers)
registry.MustRegister(DataNodeProduceTimeTickLag)
}

func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) {
Expand All @@ -281,14 +268,6 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel
collectionIDLabelName: fmt.Sprint(collectionID),
})

DataNodeProduceTimeTickLag.
Delete(
prometheus.Labels{
nodeIDLabelName: fmt.Sprint(nodeID),
collectionIDLabelName: fmt.Sprint(collectionID),
channelNameLabelName: channel,
})

for _, label := range []string{AllLabel, DeleteLabel, InsertLabel} {
DataNodeConsumeMsgCount.
Delete(
Expand Down
2 changes: 0 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ const (
indexTaskStatusLabelName = "index_task_status"
msgTypeLabelName = "msg_type"
collectionIDLabelName = "collection_id"
partitionIDLabelName = "partition_id"
channelNameLabelName = "channel_name"
functionLabelName = "function_name"
queryTypeLabelName = "query_type"
Expand All @@ -109,7 +108,6 @@ const (
roleNameLabelName = "role_name"
cacheNameLabelName = "cache_name"
cacheStateLabelName = "cache_state"
indexCountLabelName = "indexed_field_count"
dataSourceLabelName = "data_source"
importStageLabelName = "import_stage"
requestScope = "scope"
Expand Down
3 changes: 0 additions & 3 deletions pkg/metrics/querynode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ var (
}, []string{
nodeIDLabelName,
collectionIDLabelName,
partitionIDLabelName,
segmentStateLabelName,
indexCountLabelName,
segmentLevelLabelName,
})

Expand Down Expand Up @@ -455,7 +453,6 @@ var (
collectionName,
nodeIDLabelName,
collectionIDLabelName,
partitionIDLabelName,
segmentStateLabelName,
})

Expand Down

0 comments on commit f0b7446

Please sign in to comment.