Skip to content

Commit

Permalink
The flag ForceStopAsyncSend was added to fluent logger lib in v1.9.0
Browse files Browse the repository at this point in the history
 * When async is enabled, this option defines the interval (ms) at which the connection
to the fluentd-address is re-established. This option is useful if the address
may resolve to one or more IP addresses, e.g. a Consul service address.

While the change in moby#42979 resolves the issue where a Docker container can be stuck
if the fluentd-address is unavailable, this functionality adds an additional benefit
in that a new and healthy fluentd-address can be resolved, allowing logs to flow once again.

This adds a `fluentd-async-reconnect-interval` log-opt for the fluentd logging driver.

Signed-off-by: Sebastiaan van Stijn <[email protected]>
Signed-off-by: Conor Evans <[email protected]>

Co-authored-by: Sebastiaan van Stijn <[email protected]>
Co-authored-by: Conor Evans <[email protected]>
  • Loading branch information
conorevans and thaJeztah committed Dec 24, 2021
1 parent 3500d7e commit 5cbc08c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 20 deletions.
59 changes: 39 additions & 20 deletions daemon/logger/fluentd/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ const (
defaultMaxRetries = math.MaxInt32
defaultRetryWait = 1000

addressKey = "fluentd-address"
asyncKey = "fluentd-async"
asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
bufferLimitKey = "fluentd-buffer-limit"
maxRetriesKey = "fluentd-max-retries"
requestAckKey = "fluentd-request-ack"
retryWaitKey = "fluentd-retry-wait"
subSecondPrecisionKey = "fluentd-sub-second-precision"
minReconnectInterval = 100 * time.Millisecond
maxReconnectInterval = 10 * time.Second

addressKey = "fluentd-address"
asyncKey = "fluentd-async"
asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
asyncReconnectIntervalKey = "fluentd-async-reconnect-interval"
bufferLimitKey = "fluentd-buffer-limit"
maxRetriesKey = "fluentd-max-retries"
requestAckKey = "fluentd-request-ack"
retryWaitKey = "fluentd-retry-wait"
subSecondPrecisionKey = "fluentd-sub-second-precision"
)

func init() {
Expand Down Expand Up @@ -147,6 +151,7 @@ func ValidateLogOpt(cfg map[string]string) error {
case addressKey:
case asyncKey:
case asyncConnectKey:
case asyncReconnectIntervalKey:
case bufferLimitKey:
case maxRetriesKey:
case requestAckKey:
Expand Down Expand Up @@ -216,6 +221,19 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) {
}
}

asyncReconnectInterval := 0
if cfg[asyncReconnectIntervalKey] != "" {
interval, err := time.ParseDuration(cfg[asyncReconnectIntervalKey])
if err != nil {
return config, errors.Wrapf(err, "invalid value for %s", asyncReconnectIntervalKey)
}
if interval != 0 && (interval < minReconnectInterval || interval > maxReconnectInterval) {
return config, errors.Errorf("invalid value for %s: value (%q) must be between %s and %s",
asyncReconnectIntervalKey, interval, minReconnectInterval, maxReconnectInterval)
}
asyncReconnectInterval = int(interval.Milliseconds())
}

subSecondPrecision := false
if cfg[subSecondPrecisionKey] != "" {
if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
Expand All @@ -231,18 +249,19 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) {
}

config = fluent.Config{
FluentPort: loc.port,
FluentHost: loc.host,
FluentNetwork: loc.protocol,
FluentSocketPath: loc.path,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
Async: async,
AsyncConnect: asyncConnect,
SubSecondPrecision: subSecondPrecision,
RequestAck: requestAck,
ForceStopAsyncSend: async || asyncConnect,
FluentPort: loc.port,
FluentHost: loc.host,
FluentNetwork: loc.protocol,
FluentSocketPath: loc.path,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
Async: async,
AsyncConnect: asyncConnect,
AsyncReconnectInterval: asyncReconnectInterval,
SubSecondPrecision: subSecondPrecision,
RequestAck: requestAck,
ForceStopAsyncSend: async || asyncConnect,
}

return config, nil
Expand Down
24 changes: 24 additions & 0 deletions daemon/logger/fluentd/fluentd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
import (
"testing"

"gotest.tools/v3/assert"
)

func TestValidateLogOptReconnectInterval(t *testing.T) {
invalidIntervals := []string{"-1", "1", "-1s", "99ms", "11s"}
for _, v := range invalidIntervals {
t.Run("invalid "+v, func(t *testing.T) {
err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v})
assert.ErrorContains(t, err, "invalid value for fluentd-async-reconnect-interval:")
})
}

validIntervals := []string{"100ms", "10s"}
for _, v := range validIntervals {
t.Run("valid "+v, func(t *testing.T) {
err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v})
assert.NilError(t, err)
})
}
}

0 comments on commit 5cbc08c

Please sign in to comment.