Skip to content

Commit

Permalink
[ingester] fix application log level wrong
Browse files Browse the repository at this point in the history
also fill universal tags with agent_id
  • Loading branch information
lzf575 committed May 31, 2024
1 parent 9fd5b4c commit 441a552
Showing 1 changed file with 53 additions and 2 deletions.
55 changes: 53 additions & 2 deletions server/ingester/app_log/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package decoder

import (
"bytes"
"encoding/json"
"fmt"
"net"
"strconv"
"strings"
"time"

json "github.com/goccy/go-json"
logging "github.com/op/go-logging"

"github.com/deepflowio/deepflow/server/ingester/app_log/config"
Expand Down Expand Up @@ -231,6 +231,55 @@ func (d *Decoder) WriteAgentLog(agentId uint16, bs []byte) error {
s.AttributeValues = append(s.AttributeValues, host)
s.AppService = host

// If it is an old version of the Agent, or when the Agent is just started, the value of agentId will be 0.
if agentId != 0 {
var info *grpc.Info
s.L3EpcID = d.platformData.QueryVtapEpc0(agentId)
// if platformInfo cannot be obtained from PodId, finally fill with Vtap's platformInfo
vtapInfo := d.platformData.QueryVtapInfo(agentId)
if vtapInfo != nil {
ip := net.ParseIP(vtapInfo.Ip)
if ip != nil {
if ip4 := ip.To4(); ip4 != nil {
s.IsIPv4 = true
s.IP4 = utils.IpToUint32(ip4)
info = d.platformData.QueryIPV4Infos(s.L3EpcID, s.IP4)
} else {
s.IsIPv4 = false
s.IP6 = ip
info = d.platformData.QueryIPV6Infos(s.L3EpcID, s.IP6)
}
}
}

podGroupType := uint8(0)
if info != nil {
s.RegionID = uint16(info.RegionID)
s.AZID = uint16(info.AZID)
s.L3EpcID = info.EpcID
s.HostID = uint16(info.HostID)
s.PodID = info.PodID
s.PodNodeID = info.PodNodeID
s.PodNSID = uint16(info.PodNSID)
s.PodClusterID = uint16(info.PodClusterID)
s.PodGroupID = info.PodGroupID
podGroupType = info.PodGroupType
s.L3DeviceType = uint8(info.DeviceType)
s.L3DeviceID = info.DeviceID
s.SubnetID = uint16(info.SubnetID)
// if it is just Pod Node, there is no need to match the service
if ingestercommon.IsPodServiceIP(flow_metrics.DeviceType(s.L3DeviceType), s.PodID, 0) {
s.ServiceID = d.platformData.QueryService(
s.PodID, s.PodNodeID, uint32(s.PodClusterID), s.PodGroupID, s.L3EpcID, !s.IsIPv4, s.IP4, s.IP6, 0, 0)
}
} else if baseInfo := d.platformData.QueryEpcIDBaseInfo(s.L3EpcID); baseInfo != nil {
s.RegionID = uint16(baseInfo.RegionID)
}

s.AutoInstanceID, s.AutoInstanceType = ingestercommon.GetAutoInstance(s.PodID, 0, s.PodNodeID, s.L3DeviceID, uint8(s.L3DeviceType), s.L3EpcID)
s.AutoServiceID, s.AutoServiceType = ingestercommon.GetAutoService(s.ServiceID, s.PodGroupID, 0, s.PodNodeID, s.L3DeviceID, uint8(s.L3DeviceType), podGroupType, s.L3EpcID)
}

severityText := ""
switch string(columns[3]) {
case "[INFO]":
Expand Down Expand Up @@ -439,13 +488,15 @@ func (d *Decoder) handleAppLog(agentId uint16, decoder *codec.SimpleDecoder) {
d.counter.ErrorCount++
return
}
for _, appLogEntry := range d.appLogEntrysCache {
for i, appLogEntry := range d.appLogEntrysCache {
if err := d.WriteAppLog(agentId, &appLogEntry); err != nil {
if d.counter.ErrorCount == 0 {
log.Warningf("application log decode failed: %s", err)
}
d.counter.ErrorCount++
}
// need to reset, otherwise json parsing will use the last value as the default value
d.appLogEntrysCache[i] = AppLogEntry{}
}
}
}

0 comments on commit 441a552

Please sign in to comment.