forked from argoproj/argo-cd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogs.go
151 lines (132 loc) · 3.55 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package application
import (
"bufio"
"io"
"strings"
"sync"
"sync/atomic"
"time"
)
type logEntry struct {
line string
timeStamp time.Time
podName string
err error
}
// parseLogsStream converts given ReadCloser into channel that emits log entries
func parseLogsStream(podName string, stream io.ReadCloser, ch chan logEntry) {
bufReader := bufio.NewReader(stream)
eof := false
for !eof {
line, err := bufReader.ReadString('\n')
if err == io.EOF {
eof = true
// stop if we reached end of stream and the next line is empty
if line == "" {
break
}
} else if err != nil && err != io.EOF {
ch <- logEntry{err: err}
break
}
line = strings.TrimSpace(line) // Remove trailing line ending
parts := strings.Split(line, " ")
timeStampStr := parts[0]
logTime, err := time.Parse(time.RFC3339Nano, timeStampStr)
if err != nil {
ch <- logEntry{err: err}
break
}
lines := strings.Join(parts[1:], " ")
for _, line := range strings.Split(lines, "\r") {
ch <- logEntry{line: line, timeStamp: logTime, podName: podName}
}
}
}
// mergeLogStreams merge two stream of logs and ensures that merged logs are sorted by timestamp.
// The implementation uses merge sort: method reads next log entry from each stream if one of streams is empty
// it waits for no longer than specified duration and then merges available entries.
func mergeLogStreams(streams []chan logEntry, bufferingDuration time.Duration) chan logEntry {
merged := make(chan logEntry)
// buffer of received log entries for each stream
entriesPerStream := make([][]logEntry, len(streams))
process := make(chan struct{})
var lock sync.Mutex
streamsCount := int32(len(streams))
// start goroutine per stream that continuously put new log entries into buffer and triggers processing
for i := range streams {
go func(index int) {
for next := range streams[index] {
lock.Lock()
entriesPerStream[index] = append(entriesPerStream[index], next)
lock.Unlock()
process <- struct{}{}
}
// stop processing after all streams got closed
if atomic.AddInt32(&streamsCount, -1) == 0 {
close(process)
}
}(i)
}
// send moves log entries from buffer into merged stream
// if flush=true then sends log entries into merged stream even if buffer of some streams are empty
send := func(flush bool) bool {
var entries []logEntry
lock.Lock()
for {
oldest := -1
someEmpty := false
allEmpty := true
for i := range entriesPerStream {
entries := entriesPerStream[i]
if len(entries) > 0 {
if oldest == -1 || entriesPerStream[oldest][0].timeStamp.After(entries[0].timeStamp) {
oldest = i
}
allEmpty = false
} else {
someEmpty = true
}
}
if allEmpty || someEmpty && !flush {
break
}
if oldest > -1 {
entries = append(entries, entriesPerStream[oldest][0])
entriesPerStream[oldest] = entriesPerStream[oldest][1:]
}
}
lock.Unlock()
for i := range entries {
merged <- entries[i]
}
return len(entries) > 0
}
var sentAtLock sync.Mutex
var sentAt time.Time
ticker := time.NewTicker(bufferingDuration)
go func() {
for range ticker.C {
sentAtLock.Lock()
// waited long enough for logs from each streams, send everything accumulated
if sentAt.Add(bufferingDuration).Before(time.Now()) {
_ = send(true)
sentAt = time.Now()
}
sentAtLock.Unlock()
}
}()
go func() {
for range process {
if send(false) {
sentAtLock.Lock()
sentAt = time.Now()
sentAtLock.Unlock()
}
}
_ = send(true)
close(merged)
ticker.Stop()
}()
return merged
}