Skip to content

Commit

Permalink
fix: metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
imneov committed Jun 20, 2022
1 parent fdce9b5 commit 5726eb7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
9 changes: 8 additions & 1 deletion pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,11 @@ var CollectorDeviceTelemetry = prometheus.NewGaugeVec(
[]string{MetricsLabelTenant, MetricsLabelSchema, MetricsLabelEntity, MetricsLabelValueKey},
)

var Metrics = []prometheus.Collector{CollectorRawDataStorage, CollectorTimeseriesStorage, CollectorMsgCount, CollectorMsgStorageSpace, CollectorMsgStorageSeconds}
var Metrics = []prometheus.Collector{
CollectorRawDataStorage,
CollectorTimeseriesStorage,
CollectorMsgCount,
CollectorMsgStorageSpace,
CollectorMsgStorageSeconds,
CollectorDeviceTelemetry,
}
40 changes: 24 additions & 16 deletions pkg/runtime/nodeflush.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,20 @@ func (n *Node) FlushEntity(ctx context.Context, en Entity, feed *Feed) error {
if nil != err {
log.L().Warn("make TimeSeries error", logf.Error(err), logf.Eid(en.ID()))
} else {
// 2.3.1 msg count
metrics.CollectorMsgCount.WithLabelValues(tenantID, metrics.MsgTypeTimeseries).Add(float64(tsCount))
if _, err = n.resourceManager.TSDB().Write(ctx, flushData); nil != err {
log.L().Error("flush entity timeseries database", logf.Error(err), logf.Eid(en.ID()))
// return errors.Wrap(err, "flush entity into search engine")
}

// 2.3.2 flush metric
for _, tsData := range flushData.Data {
for key, value := range tsData.Fields {
metrics.CollectorDeviceTelemetry.
WithLabelValues(tenantID, templateID, entityID, key).Set(float64(value))
}
}
}

// 2.4 flush raw data.
Expand All @@ -90,14 +99,6 @@ func (n *Node) FlushEntity(ctx context.Context, en Entity, feed *Feed) error {
}
}

// 2.5 flush metric
for _, tsData := range flushData.Data {
for key, value := range tsData.Fields {
metrics.CollectorDeviceTelemetry.
WithLabelValues(tenantID, templateID, entityID, key).Set(float64(value))
}
}

return nil
}

Expand Down Expand Up @@ -137,18 +138,28 @@ func (n *Node) getTimeSeriesKey(patchs []Patch) []string {

func (n *Node) makeTimeSeriesData(ctx context.Context, en Entity, feed *Feed) (*tseries.TSeriesRequest, int, error) {
tsData := en.GetProp("telemetry")
var flushData []*tseries.TSeriesData
log.Info("tsData: ", tsData)
var res interface{}
var (
flushData []*tseries.TSeriesData
ret = &tseries.TSeriesRequest{
Data: flushData,
Metadata: map[string]string{},
}
res interface{}
tsCount = 0
)

needWriteKeys := n.getTimeSeriesKey(feed.Changes)
if len(needWriteKeys) == 0 {
return ret, tsCount, nil
}

err := json.Unmarshal(tsData.Raw(), &res)
if nil != err {
log.L().Warn("parse json type", logf.Error(err))
return nil, 0, errors.Wrap(err, "write ts db error")
}
tss, ok := res.(map[string]interface{})
needWriteKeys := n.getTimeSeriesKey(feed.Changes)
tsCount := 0
if ok {
for _, k := range needWriteKeys {
if v, ok := tss[k]; ok {
Expand Down Expand Up @@ -183,10 +194,7 @@ func (n *Node) makeTimeSeriesData(ctx context.Context, en Entity, feed *Feed) (*
}
}
}
return &tseries.TSeriesRequest{
Data: flushData,
Metadata: map[string]string{},
}, tsCount, errors.Wrap(err, "write ts db error")
return ret, tsCount, errors.Wrap(err, "write ts db error")
}

func (n *Node) makeSearchData(en Entity, feed *Feed) ([]byte, error) {
Expand Down

0 comments on commit 5726eb7

Please sign in to comment.