Skip to content

Commit

Permalink
Add debug option to prevent dropping of logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
fasaxc committed Apr 12, 2017
1 parent b6103e6 commit dddbb9a
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 6 deletions.
3 changes: 2 additions & 1 deletion config/config_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ type Config struct {
ClusterGUID string `config:"string;baddecaf"`
ClusterType string `config:"string;"`

DebugMemoryProfilePath string `config:"file;;"`
DebugMemoryProfilePath string `config:"file;;"`
DebugDisableLogDropping bool `config:"bool;false"`

// State tracking.

Expand Down
43 changes: 38 additions & 5 deletions logutils/logutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func ConfigureLogging(configParams *config.Config) {
var dests []*Destination
if configParams.LogSeverityScreen != "" {
screenDest := NewStreamDestination(
logLevelScreen, os.Stderr, make(chan QueuedLog, logQueueSize))
logLevelScreen,
os.Stderr,
make(chan QueuedLog, logQueueSize),
configParams.DebugDisableLogDropping,
)
dests = append(dests, screenDest)
}

Expand All @@ -127,7 +131,11 @@ func ConfigureLogging(configParams *config.Config) {
rotAwareFile, fileOpenErr = rfw.Open(configParams.LogFilePath, 0644)
if fileDirErr == nil && fileOpenErr == nil {
fileDest := NewStreamDestination(
logLevelFile, rotAwareFile, make(chan QueuedLog, logQueueSize))
logLevelFile,
rotAwareFile,
make(chan QueuedLog, logQueueSize),
configParams.DebugDisableLogDropping,
)
dests = append(dests, fileDest)
}
}
Expand All @@ -148,7 +156,11 @@ func ConfigureLogging(configParams *config.Config) {
w, sysErr := syslog.Dial(net, addr, priority, tag)
if sysErr == nil {
syslogDest := NewSyslogDestination(
logLevelSyslog, w, make(chan QueuedLog, logQueueSize))
logLevelSyslog,
w,
make(chan QueuedLog, logQueueSize),
configParams.DebugDisableLogDropping,
)
dests = append(dests, syslogDest)
}
}
Expand Down Expand Up @@ -327,7 +339,12 @@ func (ql QueuedLog) OnLogDone() {
}
}

func NewStreamDestination(level log.Level, writer io.Writer, c chan QueuedLog) *Destination {
func NewStreamDestination(
level log.Level,
writer io.Writer,
c chan QueuedLog,
disableLogDropping bool,
) *Destination {
return &Destination{
Level: level,
channel: c,
Expand All @@ -339,10 +356,16 @@ func NewStreamDestination(level log.Level, writer io.Writer, c chan QueuedLog) *
_, err := writer.Write(ql.Message)
return err
},
disableLogDropping: disableLogDropping,
}
}

func NewSyslogDestination(level log.Level, writer syslogWriter, c chan QueuedLog) *Destination {
func NewSyslogDestination(
level log.Level,
writer syslogWriter,
c chan QueuedLog,
disableLogDropping bool,
) *Destination {
return &Destination{
Level: level,
channel: c,
Expand All @@ -354,6 +377,7 @@ func NewSyslogDestination(level log.Level, writer syslogWriter, c chan QueuedLog
err := writeToSyslog(writer, ql)
return err
},
disableLogDropping: disableLogDropping,
}
}

Expand All @@ -367,6 +391,9 @@ type Destination struct {
// with a function that logs to a stream or to syslog, for example.
writeLog func(ql QueuedLog) error

// disableLogDropping forces all logs to be queued even if the destination blocks.
disableLogDropping bool

// lock protects the numDroppedLogs count.
lock sync.Mutex
numDroppedLogs uint
Expand All @@ -375,6 +402,12 @@ type Destination struct {
// Send sends a log to the background thread. It returns true on success or false if the channel
// is blocked.
func (d *Destination) Send(ql QueuedLog) (ok bool) {
if d.disableLogDropping {
d.channel <- ql
ok = true
return
}

d.lock.Lock()
ql.NumSkippedLogs = d.numDroppedLogs
select {
Expand Down
67 changes: 67 additions & 0 deletions logutils/logutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ var (
Message: []byte("Message"),
SyslogMessage: "syslog message",
}
message2 = QueuedLog{
Level: log.InfoLevel,
Message: []byte("Message2"),
SyslogMessage: "syslog message2",
}
)

var _ = Describe("Stream Destination", func() {
Expand All @@ -145,6 +150,7 @@ var _ = Describe("Stream Destination", func() {
log.InfoLevel,
pw,
c,
false,
)
})

Expand Down Expand Up @@ -172,6 +178,37 @@ var _ = Describe("Stream Destination", func() {
Expect(<-c).To(Equal(message1))
})

Describe("with dropping disabled", func() {
BeforeEach(func() {
c = make(chan QueuedLog, 1)
pr, pw = io.Pipe()
s = NewStreamDestination(
log.InfoLevel,
pw,
c,
true,
)
})

It("should not drop logs", func() {
// First message should be queued on the channel.
ok := s.Send(message1)
Expect(ok).To(BeTrue())
done := make(chan bool)
go func() {
// Second message should block so we do it in a goroutine.
ok = s.Send(message2)
done <- ok
}()
// Sleep so that the background goroutine has a chance to try to write.
time.Sleep(10 * time.Millisecond)
// Drain the queue.
Expect(<-c).To(Equal(message1))
Expect(<-c).To(Equal(message2))
Expect(<-done).To(BeTrue())
})
})

Describe("With real background thread", func() {
BeforeEach(func() {
go s.LoopWritingLogs()
Expand Down Expand Up @@ -239,9 +276,39 @@ var _ = Describe("Syslog Destination", func() {
log.InfoLevel,
(*mockSyslogWriter)(pw),
c,
false,
)
})

Describe("with dropping disabled", func() {
BeforeEach(func() {
s = NewSyslogDestination(
log.InfoLevel,
(*mockSyslogWriter)(pw),
c,
true,
)
})

It("should not drop logs", func() {
// First message should be queued on the channel.
ok := s.Send(message1)
Expect(ok).To(BeTrue())
done := make(chan bool)
go func() {
// Second message should block so we do it in a goroutine.
ok = s.Send(message2)
done <- ok
}()
// Sleep so that the background goroutine has a chance to try to write.
time.Sleep(10 * time.Millisecond)
// Drain the queue.
Expect(<-c).To(Equal(message1))
Expect(<-c).To(Equal(message2))
Expect(<-done).To(BeTrue())
})
})

Describe("With real background thread", func() {
BeforeEach(func() {
go s.LoopWritingLogs()
Expand Down

0 comments on commit dddbb9a

Please sign in to comment.