Skip to content

Commit

Permalink
Resolve log out of order error
Browse files Browse the repository at this point in the history
Use datadog sender as intended
  • Loading branch information
glinton committed May 8, 2018
1 parent 69ce5b4 commit f25fcc1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 117 deletions.
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (l *Logvac) addDrain(tag string, drain DrainFunc) {
case <-channels.done:
return
case msg := <-channels.send:
// todo: ensure mist plays nice with goroutine
go drain(msg)
// don't goroutine to preserve log order
drain(msg)
}
}
}()
Expand Down
129 changes: 14 additions & 115 deletions drain/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package drain

import (
"fmt"
"io"
"net"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/logs/sender"

"github.com/nanopack/logvac/core"
)

// Datadog drain implements the publisher interface for publishing logs to datadog.
type Datadog struct {
connManager *ConnectionManager
Key string // api key
connManager *sender.ConnectionManager
Conn net.Conn
Key string // datadog api key
}

// NewDatadogClient creates a new mist publisher
Expand All @@ -23,11 +24,10 @@ func NewDatadogClient(key string) (*Datadog, error) {
return nil, fmt.Errorf("Failed to resolve datadog address - %s", err.Error())
}

cm := NewConnectionManager("intake.logs.datadoghq.com", 10514)
cm := sender.NewConnectionManager("intake.logs.datadoghq.com", 10514, true)
conn := cm.NewConnection()
cm.conn = conn

return &Datadog{Key: key, connManager: cm}, nil
return &Datadog{Key: key, connManager: cm, Conn: conn}, nil
}

// Init initializes a connection to mist
Expand All @@ -39,27 +39,13 @@ func (p Datadog) Init() error {
return nil
}

// handleServerClose detects when the server closes a connection then closes it for the client.
func handleServerClose(conn net.Conn) {
for {
buff := make([]byte, 1)
_, err := conn.Read(buff)
if err == io.EOF {
conn.Close()
return
} else if err != nil {
return
}
}
}

// Publish utilizes mist's Publish to "drain" a log message
func (p *Datadog) Publish(msg logvac.Message) {
msg.PubTries++

if p.connManager.conn == nil {
if p.Conn == nil {
fmt.Println("Redialing datadog")
p.connManager.conn = p.connManager.NewConnection() // blocks until a new conn is ready
p.Conn = p.connManager.NewConnection() // doesn't block (don't goroutine call to Publish)
}

var ms []byte
Expand All @@ -69,11 +55,11 @@ func (p *Datadog) Publish(msg logvac.Message) {
ms = append(append([]byte(p.Key+" "), msg.Raw...), []byte("\n")...)
}

_, err := p.connManager.conn.Write(ms)
_, err := p.Conn.Write(ms)
if err != nil {
fmt.Printf("Failed writing log - %s %d\n", err.Error(), msg.PubTries)
p.connManager.CloseConnection(p.connManager.conn)
p.connManager.conn = nil
p.connManager.CloseConnection(p.Conn)
p.Conn = nil
if msg.PubTries <= 3 {
time.Sleep(2 * time.Second)
p.Publish(msg)
Expand All @@ -83,93 +69,6 @@ func (p *Datadog) Publish(msg logvac.Message) {

// Close closes the connection to datadog.
func (p *Datadog) Close() error {
if p.connManager.conn == nil {
return nil
}
return p.connManager.conn.Close()
}

// Adapted from datadog-log-agent (github.com/DataDog/datadog-agent/pkg/logs)
const (
backoffSleepTimeUnit = 2 // in seconds
maxBackoffSleepTime = 30 // in seconds
timeout = 20 * time.Second
)

// A ConnectionManager manages connections
type ConnectionManager struct {
connectionString string
serverName string

mutex sync.Mutex
retries int

conn net.Conn
}

// NewConnectionManager returns an initialized ConnectionManager
func NewConnectionManager(ddUrl string, ddPort int) *ConnectionManager {
return &ConnectionManager{
connectionString: fmt.Sprintf("%s:%d", ddUrl, ddPort),
serverName: ddUrl,

mutex: sync.Mutex{},
}
}

// NewConnection returns an initialized connection to the intake.
// It blocks until a connection is available
func (cm *ConnectionManager) NewConnection() net.Conn {
cm.mutex.Lock()
defer cm.mutex.Unlock()

for {
if cm.conn != nil {
return cm.conn
}
fmt.Println("Connecting to the backend:", cm.connectionString)
cm.retries++
outConn, err := net.DialTimeout("tcp", cm.connectionString, timeout)
if err != nil {
fmt.Println(err)
cm.backoff()
continue
}

cm.retries = 0
go cm.handleServerClose(outConn)
return outConn
}
}

// CloseConnection closes a connection on the client side
func (cm *ConnectionManager) CloseConnection(conn io.Closer) {
cm.conn.Close()
conn.Close()
}

// handleServerClose lets the connection manager detect when a connection
// has been closed by the server, and closes it for the client.
func (cm *ConnectionManager) handleServerClose(conn net.Conn) {
for {
buff := make([]byte, 1)
_, err := conn.Read(buff)
if err == io.EOF {
cm.CloseConnection(conn)
return
} else if err != nil {
fmt.Println(err)
return
}
}
}

// backoff lets the connection mananger sleep a bit
func (cm *ConnectionManager) backoff() {
backoffDuration := backoffSleepTimeUnit * cm.retries
if backoffDuration > maxBackoffSleepTime {
backoffDuration = maxBackoffSleepTime
}
timer := time.NewTimer(time.Second * time.Duration(backoffDuration))
<-timer.C
p.connManager.CloseConnection(p.Conn)
return nil
}

0 comments on commit f25fcc1

Please sign in to comment.