Skip to content

Commit

Permalink
Code review markups.
Browse files Browse the repository at this point in the history
  • Loading branch information
fasaxc committed Apr 11, 2017
1 parent f0e102d commit b8aaa5f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 99 deletions.
100 changes: 36 additions & 64 deletions logutils/logutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/gavv/monotime"
"github.com/mipearson/rfw"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -113,7 +112,7 @@ func ConfigureLogging(configParams *config.Config) {
// Screen target.
var dests []Destination
if configParams.LogSeverityScreen != "" {
screenDest := NewStreamDestination(logLevelScreen, os.Stderr, realMonotime{})
screenDest := NewStreamDestination(logLevelScreen, os.Stderr)
dests = append(dests, screenDest)
}

Expand All @@ -125,7 +124,7 @@ func ConfigureLogging(configParams *config.Config) {
var rotAwareFile io.Writer
rotAwareFile, fileOpenErr = rfw.Open(configParams.LogFilePath, 0644)
if fileDirErr == nil && fileOpenErr == nil {
fileDest := NewStreamDestination(logLevelFile, rotAwareFile, realMonotime{})
fileDest := NewStreamDestination(logLevelFile, rotAwareFile)
dests = append(dests, fileDest)
}
}
Expand All @@ -145,7 +144,7 @@ func ConfigureLogging(configParams *config.Config) {
tag := "calico-felix"
w, sysErr := syslog.Dial(net, addr, priority, tag)
if sysErr == nil {
syslogDest := NewSyslogDestination(logLevelSyslog, w, realMonotime{})
syslogDest := NewSyslogDestination(logLevelSyslog, w)
dests = append(dests, syslogDest)
}
}
Expand Down Expand Up @@ -173,7 +172,7 @@ func ConfigureLogging(configParams *config.Config) {
}
if sysErr != nil {
// We don't bail out if we can't connect to syslog because our default is to try to
// connect but ti's very common for syslog to be disabled when we're run in a
// connect but it's very common for syslog to be disabled when we're run in a
// container.
log.WithError(sysErr).Error(
"Failed to connect to syslog. To prevent this error, either set config " +
Expand Down Expand Up @@ -260,7 +259,10 @@ func appendKVsAndNewLine(b *bytes.Buffer, entry *log.Entry) {
fmt.Fprintf(b, " %v=%#v", key, value)
continue
}
fmt.Fprintf(b, " %v=%v", key, stringifiedValue)
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(stringifiedValue)
}
b.WriteByte('\n')
}
Expand Down Expand Up @@ -311,13 +313,17 @@ type QueuedLog struct {
WaitGroup *sync.WaitGroup
}

func NewStreamDestination(level log.Level, writer io.Writer, mt monotimeIface) *StreamDestination {
func (ql QueuedLog) OnLogDone() {
if ql.WaitGroup != nil {
ql.WaitGroup.Done()
}
}

func NewStreamDestination(level log.Level, writer io.Writer) *StreamDestination {
return &StreamDestination{
level: level,
writer: writer,
channel: make(chan QueuedLog, 10000),
lastDropLogTime: mt.Now(),
mt: mt,
level: level,
writer: writer,
channel: make(chan QueuedLog, 10000),
}
}

Expand All @@ -328,11 +334,7 @@ type StreamDestination struct {

// Our own copy of the dropped logs counter, used for logging out when we drop logs.
// Must be read/updated using atomic.XXX.
numDroppedLogs uint64
lastDropLogTime time.Duration

// mt is our shim for the monotime package.
mt monotimeIface
numDroppedLogs uint64
}

func (d *StreamDestination) Level() log.Level {
Expand All @@ -350,27 +352,19 @@ func (d *StreamDestination) OnLogDropped() {
func (d *StreamDestination) LoopWritingLogs() {
var numSeenDroppedLogs uint64
for ql := range d.channel {
// If it's been a while since our last check, see if we're dropping logs.
timeSinceLastCheck := d.mt.Since(d.lastDropLogTime)
if timeSinceLastCheck > time.Second {
currentNumDroppedLogs := atomic.LoadUint64(&d.numDroppedLogs)
if currentNumDroppedLogs > numSeenDroppedLogs {
fmt.Fprintf(d.writer, "... dropped %d logs in %v ...\n",
currentNumDroppedLogs-numSeenDroppedLogs,
timeSinceLastCheck)
numSeenDroppedLogs = currentNumDroppedLogs
}
d.lastDropLogTime = d.mt.Now()
currentNumDroppedLogs := atomic.LoadUint64(&d.numDroppedLogs)
if currentNumDroppedLogs > numSeenDroppedLogs {
fmt.Fprintf(d.writer, "... dropped %d logs ...\n",
currentNumDroppedLogs-numSeenDroppedLogs)
}
numSeenDroppedLogs = currentNumDroppedLogs

_, err := d.writer.Write(ql.Message)
if err != nil {
counterLogErrors.Inc()
fmt.Fprintf(os.Stderr, "Failed to write to log: %v", err)
}
if ql.WaitGroup != nil {
ql.WaitGroup.Done()
}
ql.OnLogDone()
}
}

Expand All @@ -379,22 +373,11 @@ type monotimeIface interface {
Since(time.Duration) time.Duration
}

type realMonotime struct{}

func (_ realMonotime) Now() time.Duration {
return monotime.Now()
}
func (_ realMonotime) Since(t time.Duration) time.Duration {
return monotime.Since(t)
}

func NewSyslogDestination(level log.Level, writer syslogWriter, mt monotimeIface) *SyslogDestination {
func NewSyslogDestination(level log.Level, writer syslogWriter) *SyslogDestination {
return &SyslogDestination{
level: level,
writer: writer,
channel: make(chan QueuedLog, 10000),
lastDropLogTime: mt.Now(),
mt: mt,
level: level,
writer: writer,
channel: make(chan QueuedLog, 10000),
}
}

Expand All @@ -413,11 +396,7 @@ type SyslogDestination struct {

// Our own copy of the dropped logs counter, used for logging out when we drop logs.
// Must be read/updated using atomic.XXX.
numDroppedLogs uint64
lastDropLogTime time.Duration

// mt is our shim for the monotime package.
mt monotimeIface
numDroppedLogs uint64
}

func (d *SyslogDestination) Level() log.Level {
Expand All @@ -436,26 +415,19 @@ func (d *SyslogDestination) LoopWritingLogs() {
var numSeenDroppedLogs uint64
for ql := range d.channel {
// If it's been a while since our last check, see if we're dropping logs.
timeSinceLastCheck := d.mt.Since(d.lastDropLogTime)
if timeSinceLastCheck > time.Second {
currentNumDroppedLogs := atomic.LoadUint64(&d.numDroppedLogs)
if currentNumDroppedLogs > numSeenDroppedLogs {
d.writer.Warning(fmt.Sprintf("... dropped %d logs in %v ...\n",
currentNumDroppedLogs-numSeenDroppedLogs,
timeSinceLastCheck))
numSeenDroppedLogs = currentNumDroppedLogs
}
d.lastDropLogTime = d.mt.Now()
currentNumDroppedLogs := atomic.LoadUint64(&d.numDroppedLogs)
if currentNumDroppedLogs > numSeenDroppedLogs {
d.writer.Warning(fmt.Sprintf("... dropped %d logs ...\n",
currentNumDroppedLogs-numSeenDroppedLogs))
}
numSeenDroppedLogs = currentNumDroppedLogs

err := d.write(ql)
if err != nil {
counterLogErrors.Inc()
fmt.Fprintf(os.Stderr, "Failed to write to syslog: %v", err)
}
if ql.WaitGroup != nil {
ql.WaitGroup.Done()
}
ql.OnLogDone()
}
}

Expand Down
49 changes: 14 additions & 35 deletions logutils/logutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ var _ = DescribeTable("Formatter",
Expect(err).NotTo(HaveOccurred())
expectedLog = strings.Replace(expectedLog, "<PID>", fmt.Sprintf("%v", os.Getpid()), 1)
Expect(string(out)).To(Equal(expectedLog))
expectedSyslog = strings.Replace(expectedSyslog, "<PID>", fmt.Sprintf("%v", os.Getpid()), 1)
Expect(FormatForSyslog(&entry)).To(Equal(expectedSyslog))
},
Entry("Empty", log.Entry{},
Expand Down Expand Up @@ -129,14 +128,10 @@ var _ = Describe("StreamDestination", func() {
var pr *io.PipeReader
var pw *io.PipeWriter
var c chan<- QueuedLog
var mt *mockMonotime

BeforeEach(func() {
pr, pw = io.Pipe()
mt = &mockMonotime{
time: 100 * time.Hour,
}
s = NewStreamDestination(log.InfoLevel, pw, mt)
s = NewStreamDestination(log.InfoLevel, pw)
go s.LoopWritingLogs()
c = s.Channel()
})
Expand All @@ -156,25 +151,28 @@ var _ = Describe("StreamDestination", func() {
Expect(string(b[:n])).To(Equal("Message"))
})

It("should log dropped logs after time advances far enough", func() {
It("should log number of dropped logs", func() {
// Increment the dropped logs counter.
s.OnLogDropped()
// Next log should emit a drop warning.
c <- QueuedLog{
Level: log.InfoLevel,
Message: []byte("Message"),
SyslogMessage: "syslog message",
}
// But log after that shouldn't.
c <- QueuedLog{
Level: log.InfoLevel,
Message: []byte("Message 2"),
SyslogMessage: "syslog message 2",
SyslogMessage: "syslog message",
}
b := make([]byte, 1024)
n, err := pr.Read(b)
Expect(err).NotTo(HaveOccurred())
Expect(string(b[:n])).To(Equal("Message"))
Expect(string(b[:n])).To(Equal("... dropped 1 logs ...\n"))
n, err = pr.Read(b)
Expect(err).NotTo(HaveOccurred())
Expect(string(b[:n])).To(Equal("... dropped 1 logs in 1.002s ...\n"))
Expect(string(b[:n])).To(Equal("Message"))
n, err = pr.Read(b)
Expect(err).NotTo(HaveOccurred())
Expect(string(b[:n])).To(Equal("Message 2"))
Expand Down Expand Up @@ -202,14 +200,10 @@ var _ = Describe("SyslogDestination", func() {
var pr *io.PipeReader
var pw *io.PipeWriter
var c chan<- QueuedLog
var mt *mockMonotime

BeforeEach(func() {
pr, pw = io.Pipe()
mt = &mockMonotime{
time: 100 * time.Hour,
}
s = NewSyslogDestination(log.InfoLevel, (*mockSyslogWriter)(pw), mt)
s = NewSyslogDestination(log.InfoLevel, (*mockSyslogWriter)(pw))
go s.LoopWritingLogs()
c = s.Channel()
})
Expand All @@ -229,13 +223,15 @@ var _ = Describe("SyslogDestination", func() {
Expect(string(b[:n])).To(Equal("INFO syslog message"))
})

It("should log dropped logs after time advances far enough", func() {
It("should log number of dropped logs", func() {
s.OnLogDropped()
// Log after that should emit a drop warning.
c <- QueuedLog{
Level: log.InfoLevel,
Message: []byte("Message"),
SyslogMessage: "syslog message",
}
// But log after that shouldn't.
c <- QueuedLog{
Level: log.InfoLevel,
Message: []byte("Message"),
Expand All @@ -244,10 +240,10 @@ var _ = Describe("SyslogDestination", func() {
b := make([]byte, 1024)
n, err := pr.Read(b)
Expect(err).NotTo(HaveOccurred())
Expect(string(b[:n])).To(Equal("INFO syslog message"))
Expect(string(b[:n])).To(Equal("WARNING ... dropped 1 logs ...\n"))
n, err = pr.Read(b)
Expect(err).NotTo(HaveOccurred())
Expect(string(b[:n])).To(Equal("WARNING ... dropped 1 logs in 1.002s ...\n"))
Expect(string(b[:n])).To(Equal("INFO syslog message"))
n, err = pr.Read(b)
Expect(err).NotTo(HaveOccurred())
Expect(string(b[:n])).To(Equal("INFO syslog message 2"))
Expand Down Expand Up @@ -292,20 +288,3 @@ func (s *mockSyslogWriter) Crit(m string) error {
_, err := fmt.Fprintf((*io.PipeWriter)(s), "CRITICAL %s", m)
return err
}

type mockMonotime struct {
time time.Duration
}

func (m *mockMonotime) Now() time.Duration {
t := m.time
fmt.Fprintf(GinkgoWriter, "mockMonotime Now() = %v\n", t)
m.time += 501 * time.Millisecond
return t
}

func (m *mockMonotime) Since(t time.Duration) time.Duration {
since := m.Now() - t
fmt.Fprintf(GinkgoWriter, "mockMonotime Since() = %v\n", since)
return since
}

0 comments on commit b8aaa5f

Please sign in to comment.