From 7891e42bd09ec0b9d3ae482993851b4a3c146520 Mon Sep 17 00:00:00 2001 From: Peter Bastyi Date: Tue, 19 Apr 2016 17:41:08 +0200 Subject: [PATCH] Threadsafe ring and removed newline character endings --- logger.go | 20 ++++++++++---------- splunk_target.go | 25 ++++++++++++++++++++----- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/logger.go b/logger.go index ddb6bcd..5a9809e 100644 --- a/logger.go +++ b/logger.go @@ -28,16 +28,16 @@ func Fatalln(v ...interface{}) { } func InitLog( - debugHandle io.Writer, - infoHandle io.Writer, - warningHandle io.Writer, - errorHandle io.Writer, - fatalHandle io.Writer) { - Debug = log.New(debugHandle, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile) - Info = log.New(infoHandle, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile) - Warning = log.New(warningHandle, "WARNING: ", log.Ldate|log.Ltime|log.Lshortfile) - Error = log.New(errorHandle, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile) - fatal = log.New(fatalHandle, "FATAL: ", log.Ldate|log.Ltime|log.Lshortfile) + debugHandle io.Writer, + infoHandle io.Writer, + warningHandle io.Writer, + errorHandle io.Writer, + fatalHandle io.Writer) { + Debug = log.New(debugHandle, "DEBUG: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds) + Info = log.New(infoHandle, "INFO: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds) + Warning = log.New(warningHandle, "WARNING: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds) + Error = log.New(errorHandle, "ERROR: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds) + fatal = log.New(fatalHandle, "FATAL: ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile|log.Lmicroseconds) } func Init() { diff --git a/splunk_target.go b/splunk_target.go index fd94a38..2814442 100644 --- a/splunk_target.go +++ b/splunk_target.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/palette-software/insight-server" @@ -30,6 +31,7 @@ type SplunkTarget struct { Ticker *time.Ticker TickInterval int Capacity int + ringMutex sync.Mutex } type Message struct { @@ -47,14 +49,20 @@ func formatSplunkMessage(p string) []byte { return jsonObject } -func (t SplunkTarget) Write(p []byte) (n int, err error) { +func (t *SplunkTarget) Write(p []byte) (n int, err error) { + // Remove the newline characters from the end of the stream, if there are any, as Splunk does not need them. + p = bytes.TrimSuffix(p, []byte("\n")) + // This conversion is needed as otherwise we overwrite the enqueued items. message := fmt.Sprintf("[OW:%s] %s", t.Owner, p) + fmt.Println("Enqueued message: ", message) + t.ringMutex.Lock() + defer t.ringMutex.Unlock() t.Ring.Enqueue(message) return n, nil } -func (t SplunkTarget) Start() { +func (t *SplunkTarget) Start() { t.Ring.SetCapacity(t.Capacity) t.Ticker = time.NewTicker(time.Duration(t.TickInterval) * time.Millisecond) go func(t *SplunkTarget) { @@ -64,8 +72,9 @@ func (t SplunkTarget) Start() { }(&t) } -func (t SplunkTarget) SendLogs() { - var b bytes.Buffer +func (t *SplunkTarget) DequeueLines() (b bytes.Buffer) { + t.ringMutex.Lock() + defer t.ringMutex.Unlock() for { next := t.Ring.Dequeue() if next == nil { @@ -74,9 +83,15 @@ func (t SplunkTarget) SendLogs() { line := fmt.Sprintf("%s", next) formattedMessage := formatSplunkMessage(line) if formattedMessage != nil { - b.Write(formatSplunkMessage(line)) + b.Write(formattedMessage) } } + + return b +} + +func (t SplunkTarget) SendLogs() { + b := t.DequeueLines() // Return if there's no new records if b.Len() == 0 { return