Skip to content

Commit

Permalink
add syslog_rfc3164 format for parse_log processor (redpanda-data#369)
Browse files Browse the repository at this point in the history
* add syslog_rfc3164 format for parse_log processor

* parse_log refatctor parameters
  • Loading branch information
Maksim Kuvshinov authored Feb 22, 2020
1 parent 1584584 commit 50a83a6
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 44 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/influxdata/go-syslog v1.0.1
github.com/influxdata/go-syslog/v3 v3.0.0
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/jtolds/gls v4.20.0+incompatible // indirect
Expand Down
196 changes: 168 additions & 28 deletions lib/processor/parse_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package processor

import (
"fmt"
"strconv"
"time"

"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/Jeffail/benthos/v3/lib/x/docs"
"github.com/influxdata/go-syslog/rfc5424"
syslog "github.com/influxdata/go-syslog/v3"
"github.com/influxdata/go-syslog/v3/rfc3164"
"github.com/influxdata/go-syslog/v3/rfc5424"

"github.com/opentracing/opentracing-go"
)

Expand All @@ -20,11 +24,21 @@ Parses common log [formats](#formats) into [structured data](#codecs). This is
easier and often much faster than ` + "[`grok`](/docs/components/processors/grok)" + `.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("format", "A common log [format](#formats) to parse.").HasOptions(
"syslog_rfc5424",
"syslog_rfc5424", "syslog_rfc3164",
),
docs.FieldCommon("codec", "Specifies the structured format to parse a log into.").HasOptions(
"json",
),
docs.FieldAdvanced("best_effort", "Still returns parsed message if an error occurred."+
"Applied to `syslog_rfc3164` and `syslog_rfc5424` formats."),
docs.FieldAdvanced("allow_rfc3339", "Allows syslog parser to expect timestamp to be rfc3339-formatted"+
"Applied to `syslog_rfc3164`."),
docs.FieldAdvanced("default_year", "Sets the strategy to decide the year for the Stamp timestamp of RFC 3164"+
"Applied to `syslog_rfc3164`. Could be `current` to set the system's current year or specific year."+
"Leave an empty string to not use such option at all."),
docs.FieldAdvanced("default_timezone", "Sets the strategy to decide the timezone to apply to the Stamp timestamp of RFC 3164"+
"Applied to `syslog_rfc3164`. Given value handled by [time.LoadLocation](https://golang.org/pkg/time/#LoadLocation) method."),

partsFieldSpec,
},
Footnotes: `
Expand All @@ -36,17 +50,41 @@ Currently the only supported structured data codec is ` + "`json`" + `.
### ` + "`syslog_rfc5424`" + `
Makes a best effort to parses a log following the
Makes a best effort(default behaviour) to parse a log following the
[Syslog rfc5424](https://tools.ietf.org/html/rfc5424) spec. The resulting
structured document may contain any of the following fields:
- ` + "`message`" + ` (string)
- ` + "`timestamp`" + ` (string, RFC3339)
- ` + "`facility`" + ` (int)
- ` + "`severity`" + ` (int)
- ` + "`priority`" + ` (int)
- ` + "`version`" + ` (int)
- ` + "`hostname`" + ` (string)
- ` + "`procid`" + ` (string)
- ` + "`appname`" + ` (string)
- ` + "`msgid`" + ` (string)
- ` + "`structureddata`" + ` (object)
### ` + "`syslog_rfc3164`" + `
Makes a best effort(default behaviour) to parse a log following the
[Syslog rfc3164](https://tools.ietf.org/html/rfc3164) spec. Since
transfered information could be omitted by some vendors, parameters
` + "`allow_rfc3339`" + `,` + "`default_year`" + `,
` + "`default_timezone`" + `,` + "`best_effort`" + `
should be applied. The resulting structured document may contain any of
the following fields:
- ` + "`message`" + ` (string)
- ` + "`timestamp`" + ` (string, RFC3339)
- ` + "`facility`" + ` (int)
- ` + "`severity`" + ` (int)
- ` + "`priority`" + ` (int)
- ` + "`hostname`" + ` (string)
- ` + "`procid`" + ` (string)
- ` + "`appname`" + ` (string)
- ` + "`msgid`" + ` (string)
`,
}
}
Expand All @@ -55,9 +93,13 @@ structured document may contain any of the following fields:

// ParseLogConfig contains configuration fields for the ParseLog processor.
type ParseLogConfig struct {
Parts []int `json:"parts" yaml:"parts"`
Format string `json:"format" yaml:"format"`
Codec string `json:"codec" yaml:"codec"`
Parts []int `json:"parts" yaml:"parts"`
Format string `json:"format" yaml:"format"`
Codec string `json:"codec" yaml:"codec"`
BestEffort bool `json:"best_effort" yaml:"best_effort"`
WithRFC3339 bool `json:"allow_rfc3339" yaml:"allow_rfc3339"`
WithYear string `json:"default_year" yaml:"default_year"`
WithTimezone string `json:"default_timezone" yaml:"default_timezone"`
}

// NewParseLogConfig returns a ParseLogConfig with default values.
Expand All @@ -66,53 +108,149 @@ func NewParseLogConfig() ParseLogConfig {
Parts: []int{},
Format: "syslog_rfc5424",
Codec: "json",

BestEffort: true,
WithRFC3339: true,
WithYear: "current",
WithTimezone: "UTC",
}
}

//------------------------------------------------------------------------------

type parserFormat func(body []byte) (map[string]interface{}, error)

func parserRFC5424() parserFormat {
func parserRFC5424(bestEffort bool) parserFormat {
return func(body []byte) (map[string]interface{}, error) {
var opts []syslog.MachineOption
if bestEffort {
opts = append(opts, rfc5424.WithBestEffort())
}

p := rfc5424.NewParser(opts...)
resGen, err := p.Parse(body)
if err != nil {
return nil, err
}
res := resGen.(*rfc5424.SyslogMessage)

resMap := make(map[string]interface{})
if res.Message != nil {
resMap["message"] = *res.Message
}
if res.Timestamp != nil {
resMap["timestamp"] = res.Timestamp.Format(time.RFC3339Nano)
// resMap["timestamp_unix"] = res.Timestamp().Unix()
}
if res.Facility != nil {
resMap["facility"] = *res.Facility
}
if res.Severity != nil {
resMap["severity"] = *res.Severity
}
if res.Priority != nil {
resMap["priority"] = *res.Priority
}
if res.Version != 0 {
resMap["version"] = res.Version
}
if res.Hostname != nil {
resMap["hostname"] = *res.Hostname
}
if res.ProcID != nil {
resMap["procid"] = *res.ProcID
}
if res.Appname != nil {
resMap["appname"] = *res.Appname
}
if res.MsgID != nil {
resMap["msgid"] = *res.MsgID
}
if res.StructuredData != nil {
resMap["structureddata"] = *res.StructuredData
}

return resMap, nil
}
}

func parserRFC3164(bestEffort, wrfc3339 bool, year, tz string) parserFormat {
return func(body []byte) (map[string]interface{}, error) {
p := rfc5424.NewParser()
// TODO: Potentially expose this as a config field later.
be := true
res, err := p.Parse(body, &be)
var opts []syslog.MachineOption
if bestEffort {
opts = append(opts, rfc3164.WithBestEffort())
}
if wrfc3339 {
opts = append(opts, rfc3164.WithRFC3339())
}
switch year {
case "current":
opts = append(opts, rfc3164.WithYear(rfc3164.CurrentYear{}))
case "":
// do nothing
default:
iYear, err := strconv.Atoi(year)
if err != nil {
return nil, fmt.Errorf("failed to convert year %s into integer: %v", year, err)
}
opts = append(opts, rfc3164.WithYear(rfc3164.Year{YYYY: iYear}))
}
if tz != "" {
loc, err := time.LoadLocation(tz)
if err != nil {
return nil, fmt.Errorf("failed to lookup timezone %s - %v", loc, err)
}
opts = append(opts, rfc3164.WithTimezone(loc))
}

p := rfc3164.NewParser(opts...)

resGen, err := p.Parse(body)
if err != nil {
return nil, err
}
res := resGen.(*rfc3164.SyslogMessage)

resMap := make(map[string]interface{})
if res.Message() != nil {
resMap["message"] = *res.Message()
if res.Message != nil {
resMap["message"] = *res.Message
}
if res.Timestamp() != nil {
resMap["timestamp"] = res.Timestamp().Format(time.RFC3339Nano)
if res.Timestamp != nil {
resMap["timestamp"] = res.Timestamp.Format(time.RFC3339Nano)
// resMap["timestamp_unix"] = res.Timestamp().Unix()
}
if res.Hostname() != nil {
resMap["hostname"] = *res.Hostname()
if res.Facility != nil {
resMap["facility"] = *res.Facility
}
if res.Severity != nil {
resMap["severity"] = *res.Severity
}
if res.ProcID() != nil {
resMap["procid"] = *res.ProcID()
if res.Priority != nil {
resMap["priority"] = *res.Priority
}
if res.Appname() != nil {
resMap["appname"] = *res.Appname()
if res.Hostname != nil {
resMap["hostname"] = *res.Hostname
}
if res.MsgID() != nil {
resMap["msgid"] = *res.MsgID()
if res.ProcID != nil {
resMap["procid"] = *res.ProcID
}
if res.StructuredData() != nil {
resMap["structureddata"] = *res.StructuredData()
if res.Appname != nil {
resMap["appname"] = *res.Appname
}
if res.MsgID != nil {
resMap["msgid"] = *res.MsgID
}

return resMap, nil
}
}

func getParseFormat(parser string) (parserFormat, error) {
func getParseFormat(parser string, bestEffort, rfc3339 bool, defYear, defTZ string) (parserFormat, error) {
switch parser {
case "syslog_rfc5424":
return parserRFC5424(), nil
return parserRFC5424(bestEffort), nil
case "syslog_rfc3164":
return parserRFC3164(bestEffort, rfc3339, defYear, defTZ), nil
}
return nil, fmt.Errorf("format not recognised: %s", parser)
}
Expand Down Expand Up @@ -151,7 +289,8 @@ func NewParseLog(
mBatchSent: stats.GetCounter("batch.sent"),
}
var err error
if s.format, err = getParseFormat(conf.ParseLog.Format); err != nil {
if s.format, err = getParseFormat(conf.ParseLog.Format, conf.ParseLog.BestEffort, conf.ParseLog.WithRFC3339,
conf.ParseLog.WithYear, conf.ParseLog.WithTimezone); err != nil {
return nil, err
}
return s, nil
Expand All @@ -172,6 +311,7 @@ func (s *ParseLog) ProcessMessage(msg types.Message) ([]types.Message, types.Res
s.log.Debugf("Failed to parse message as %s: %v\n", s.conf.ParseLog.Format, err)
return err
}

if err := newMsg.Get(index).SetJSON(dataMap); err != nil {
s.mErrJSONS.Incr(1)
s.mErr.Incr(1)
Expand Down
42 changes: 27 additions & 15 deletions lib/processor/parse_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,45 @@ import (

func TestParseLogCases(t *testing.T) {
type testCase struct {
name string
input string
output string
format string
codec string
name string
input string
output string
format string
codec string
bestEff bool
}
tests := []testCase{
{
name: "valid input, valid json output",
format: "syslog_rfc5424",
codec: "json",
input: `<42>4 2049-10-11T22:14:15.003Z toaster.smarthome myapp - 2 [home01 device_id="43"] failed to make a toast.`,
output: `{"appname":"myapp","hostname":"toaster.smarthome","message":"failed to make a toast.","msgid":"2","structureddata":{"home01":{"device_id":"43"}},"timestamp":"2049-10-11T22:14:15.003Z"}`,
name: "valid syslog_rfc5424 input, valid json output",
format: "syslog_rfc5424",
codec: "json",
bestEff: true,
input: `<42>4 2049-10-11T22:14:15.003Z toaster.smarthome myapp - 2 [home01 device_id="43"] failed to make a toast.`,
output: `{"appname":"myapp","facility":5,"hostname":"toaster.smarthome","message":"failed to make a toast.","msgid":"2","priority":42,"severity":2,"structureddata":{"home01":{"device_id":"43"}},"timestamp":"2049-10-11T22:14:15.003Z","version":4}`,
},
{
name: "invalid input, invalid json output",
format: "syslog_rfc5424",
codec: "json",
input: `not a syslog at all.`,
output: `not a syslog at all.`,
name: "invalid syslog_rfc5424 input, invalid json output",
format: "syslog_rfc5424",
codec: "json",
bestEff: true,
input: `not a syslog at all.`,
output: `not a syslog at all.`,
},
{
name: "valid syslog_rfc3164 input, valid json output",
format: "syslog_rfc3164",
codec: "json",
bestEff: true,
input: `<28>Dec 2 16:49:23 host app[23410]: Test`,
output: `{"appname":"app","facility":3,"hostname":"host","message":"Test","priority":28,"procid":"23410","severity":4,"timestamp":"2020-12-02T16:49:23Z"}`,
},
}

for _, test := range tests {
conf := NewConfig()
conf.ParseLog.Format = test.format
conf.ParseLog.Codec = test.codec
conf.ParseLog.BestEffort = test.bestEff
proc, err := NewParseLog(conf, nil, log.Noop(), metrics.Noop())
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 50a83a6

Please sign in to comment.