Skip to content

Commit

Permalink
Threadsafe ring and removed newline character endings
Browse files Browse the repository at this point in the history
  • Loading branch information
baspeti committed Apr 19, 2016
1 parent 29f4649 commit 7891e42
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
20 changes: 10 additions & 10 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ func Fatalln(v ...interface{}) {
}

func InitLog(
debugHandle io.Writer,
infoHandle io.Writer,
warningHandle io.Writer,
errorHandle io.Writer,
fatalHandle io.Writer) {
Debug = log.New(debugHandle, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile)
Info = log.New(infoHandle, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
Warning = log.New(warningHandle, "WARNING: ", log.Ldate|log.Ltime|log.Lshortfile)
Error = log.New(errorHandle, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile)
fatal = log.New(fatalHandle, "FATAL: ", log.Ldate|log.Ltime|log.Lshortfile)
debugHandle io.Writer,
infoHandle io.Writer,
warningHandle io.Writer,
errorHandle io.Writer,
fatalHandle io.Writer) {
Debug = log.New(debugHandle, "DEBUG: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds)
Info = log.New(infoHandle, "INFO: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds)
Warning = log.New(warningHandle, "WARNING: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds)
Error = log.New(errorHandle, "ERROR: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds)
fatal = log.New(fatalHandle, "FATAL: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds)
}

func Init() {
Expand Down
25 changes: 20 additions & 5 deletions splunk_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/palette-software/insight-server"
Expand All @@ -30,6 +31,7 @@ type SplunkTarget struct {
Ticker *time.Ticker
TickInterval int
Capacity int
ringMutex sync.Mutex
}

type Message struct {
Expand All @@ -47,14 +49,20 @@ func formatSplunkMessage(p string) []byte {
return jsonObject
}

func (t SplunkTarget) Write(p []byte) (n int, err error) {
func (t *SplunkTarget) Write(p []byte) (n int, err error) {
// Remove the newline characters from the end of the stream, if there are any, as Splunk does not need them.
p = bytes.TrimSuffix(p, []byte("\n"))

// This conversion is needed as otherwise we overwrite the enqueued items.
message := fmt.Sprintf("[OW:%s] %s", t.Owner, p)
fmt.Println("Enqueued message: ", message)
t.ringMutex.Lock()
defer t.ringMutex.Unlock()
t.Ring.Enqueue(message)
return n, nil
}

func (t SplunkTarget) Start() {
func (t *SplunkTarget) Start() {
t.Ring.SetCapacity(t.Capacity)
t.Ticker = time.NewTicker(time.Duration(t.TickInterval) * time.Millisecond)
go func(t *SplunkTarget) {
Expand All @@ -64,8 +72,9 @@ func (t SplunkTarget) Start() {
}(&t)
}

func (t SplunkTarget) SendLogs() {
var b bytes.Buffer
func (t *SplunkTarget) DequeueLines() (b bytes.Buffer) {
t.ringMutex.Lock()
defer t.ringMutex.Unlock()
for {
next := t.Ring.Dequeue()
if next == nil {
Expand All @@ -74,9 +83,15 @@ func (t SplunkTarget) SendLogs() {
line := fmt.Sprintf("%s", next)
formattedMessage := formatSplunkMessage(line)
if formattedMessage != nil {
b.Write(formatSplunkMessage(line))
b.Write(formattedMessage)
}
}

return b
}

func (t SplunkTarget) SendLogs() {
b := t.DequeueLines()
// Return if there's no new records
if b.Len() == 0 {
return
Expand Down

0 comments on commit 7891e42

Please sign in to comment.