forked from pganalyze/collector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstate.go
357 lines (294 loc) · 10.5 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
package state
import (
"strings"
"sync"
"sync/atomic"
"time"
raven "github.com/getsentry/raven-go"
"github.com/pganalyze/collector/config"
)
type SchemaStats struct {
RelationStats PostgresRelationStatsMap
ColumnStats PostgresColumnStatsMap
RelationStatsExtended PostgresRelationStatsExtendedMap
IndexStats PostgresIndexStatsMap
FunctionStats PostgresFunctionStatsMap
}
// PersistedState - State thats kept across collector runs to be used for diffs
type PersistedState struct {
CollectedAt time.Time
DatabaseStats PostgresDatabaseStatsMap
StatementStats PostgresStatementStatsMap
SchemaStats map[Oid]*SchemaStats
Relations []PostgresRelation
Functions []PostgresFunction
System SystemState
CollectorStats CollectorStats
// Incremented every run, indicates whether we should run a pg_stat_statements_reset()
// on behalf of the user. Only activates once it reaches GrantFeatures.StatementReset,
// and is reset afterwards.
StatementResetCounter int
// Keep track of when we last collected statement stats, to calculate time distance
LastStatementStatsAt time.Time
// All statement stats that have not been identified (will be cleared by the next full snapshot)
UnidentifiedStatementStats HistoricStatementStatsMap
}
// TransientState - State thats only used within a collector run (and not needed for diffs)
type TransientState struct {
// Databases we connected to and fetched local catalog data (e.g. schema)
DatabaseOidsWithLocalCatalog []Oid
Roles []PostgresRole
Databases []PostgresDatabase
Types []PostgresType
Statements PostgresStatementMap
StatementTexts PostgresStatementTextMap
HistoricStatementStats HistoricStatementStatsMap
// This is a new zero value that was recorded after a pg_stat_statements_reset(),
// in order to enable the next snapshot to be able to diff against something
ResetStatementStats PostgresStatementStatsMap
ServerStats PostgresServerStats
Replication PostgresReplication
Settings []PostgresSetting
BackendCounts []PostgresBackendCount
Extensions []PostgresExtension
Version PostgresVersion
SentryClient *raven.Client
CollectorConfig CollectorConfig
CollectorPlatform CollectorPlatform
}
type CollectorConfig struct {
SectionName string
DisableLogs bool
DisableActivity bool
EnableLogExplain bool
DbName string
DbUsername string
DbHost string
DbPort int32
DbSslmode string
DbHasSslrootcert bool
DbHasSslcert bool
DbHasSslkey bool
DbExtraNames []string
DbAllNames bool
DbURLRedacted string
AwsRegion string
AwsDbInstanceId string
AwsDbClusterID string
AwsDbClusterReadonly bool
AwsHasAccessKeyId bool
AwsHasAssumeRole bool
AwsHasAccountId bool
AwsHasWebIdentityTokenFile bool
AwsHasRoleArn bool
AzureDbServerName string
AzureEventhubNamespace string
AzureEventhubName string
AzureAdTenantId string
AzureAdClientId string
AzureHasAdCertificate bool
GcpCloudsqlInstanceId string
GcpAlloyDBClusterID string
GcpAlloyDBInstanceID string
GcpPubsubSubscription string
GcpHasCredentialsFile bool
GcpProjectId string
CrunchyBridgeClusterId string
AivenProjectId string
AivenServiceId string
ApiSystemId string
ApiSystemType string
ApiSystemScope string
ApiSystemIdFallback string
ApiSystemTypeFallback string
ApiSystemScopeFallback string
DbLogLocation string
DbLogDockerTail string
DbLogSyslogServer string
DbLogPgReadFile bool
IgnoreTablePattern string
IgnoreSchemaRegexp string
QueryStatsInterval int32
MaxCollectorConnections int32
SkipIfReplica bool
FilterLogSecret string
FilterQuerySample string
FilterQueryText string
HasProxy bool
ConfigFromEnv bool
OtelExporterOtlpEndpoint string
}
type CollectorPlatform struct {
StartedAt time.Time
Architecture string
Hostname string
OperatingSystem string
Platform string
PlatformFamily string
PlatformVersion string
VirtualizationSystem string
KernelVersion string
}
type DiffedSchemaStats struct {
RelationStats DiffedPostgresRelationStatsMap
IndexStats DiffedPostgresIndexStatsMap
FunctionStats DiffedPostgresFunctionStatsMap
}
// DiffState - Result of diff-ing two persistent state structs
type DiffState struct {
StatementStats DiffedPostgresStatementStatsMap
SchemaStats map[Oid]*DiffedSchemaStats
SystemCPUStats DiffedSystemCPUStatsMap
SystemNetworkStats DiffedNetworkStatsMap
SystemDiskStats DiffedDiskStatsMap
CollectorStats DiffedCollectorStats
DatabaseStats DiffedPostgresDatabaseStatsMap
}
// StateOnDiskFormatVersion - Increment this when an old state preserved to disk should be ignored
const StateOnDiskFormatVersion = 6
type StateOnDisk struct {
FormatVersion uint
PrevStateByServer map[config.ServerIdentifier]PersistedState
}
type CollectionOpts struct {
StartedAt time.Time
CollectPostgresRelations bool
CollectPostgresSettings bool
CollectPostgresLocks bool
CollectPostgresFunctions bool
CollectPostgresBloat bool
CollectPostgresViews bool
CollectLogs bool
CollectExplain bool
CollectSystemInformation bool
CollectorApplicationName string
DiffStatements bool
SubmitCollectedData bool
TestRun bool
TestRunLogs bool
TestExplain bool
TestSection string
GenerateStatsHelperSql string
DebugLogs bool
DiscoverLogLocation bool
StateFilename string
WriteStateUpdate bool
ForceEmptyGrant bool
OutputAsJson bool
}
type GrantConfig struct {
ServerID string `json:"server_id"`
ServerURL string `json:"server_url"`
SentryDsn string `json:"sentry_dsn"`
Features GrantFeatures `json:"features"`
EnableActivity bool `json:"enable_activity"`
EnableLogs bool `json:"enable_logs"`
SchemaTableLimit int `json:"schema_table_limit"` // Maximum number of tables that can be monitored per server
}
type GrantFeatures struct {
Logs bool `json:"logs"`
StatementResetFrequency int `json:"statement_reset_frequency"`
StatementTimeoutMs int32 `json:"statement_timeout_ms"` // Statement timeout for all SQL statements sent to the database (defaults to 30s)
StatementTimeoutMsQueryText int32 `json:"statement_timeout_ms_query_text"` // Statement timeout for pg_stat_statements query text requests (defaults to 120s)
}
type Grant struct {
Valid bool
Config GrantConfig `json:"config"`
S3URL string `json:"s3_url"`
S3Fields map[string]string `json:"s3_fields"`
LocalDir string `json:"local_dir"`
}
func (g Grant) S3() GrantS3 {
return GrantS3{S3URL: g.S3URL, S3Fields: g.S3Fields}
}
type GrantS3 struct {
S3URL string `json:"s3_url"`
S3Fields map[string]string `json:"s3_fields"`
}
type CollectionStatus struct {
CollectionDisabled bool
CollectionDisabledReason string
LogSnapshotDisabled bool
LogSnapshotDisabledReason string
}
type Server struct {
Config config.ServerConfig
RequestedSslMode string
Grant Grant
PGAnalyzeURL string
PrevState PersistedState
StateMutex *sync.Mutex
LogPrevState PersistedLogState
LogStateMutex *sync.Mutex
ActivityPrevState PersistedActivityState
ActivityStateMutex *sync.Mutex
CollectionStatus CollectionStatus
CollectionStatusMutex *sync.Mutex
SelfTest *SelfTestResult
// The LogParser for this server, updated as necessary whenever relevant
// settings (log_line_prefix and log_timezone) change
// The LogSettingsMutex should be held while updating this
LogParser LogParser
LogParseMutex *sync.RWMutex
// Boolean flags for which log lines should be ignored for processing
//
// Internally this uses atomics (not a mutex) due to noticable performance
// differences (see https://groups.google.com/g/golang-nuts/c/eIqkhXh9PLg),
// as we access this in high frequency log-related code paths.
LogIgnoreFlags uint32
// State to track compact snapshot submissions, and log them routinely
CompactLogStats map[string]uint8
CompactLogTime time.Time
}
func MakeServer(config config.ServerConfig, testRun bool) *Server {
server := &Server{
Config: config,
StateMutex: &sync.Mutex{},
LogStateMutex: &sync.Mutex{},
ActivityStateMutex: &sync.Mutex{},
CollectionStatusMutex: &sync.Mutex{},
LogParseMutex: &sync.RWMutex{},
}
if testRun {
server.SelfTest = MakeSelfTest()
}
return server
}
const (
LOG_IGNORE_STATEMENT uint32 = 1 << iota
LOG_IGNORE_DURATION
)
type LogParser interface {
Matches(prefix string, tz *time.Location, isSyslog bool) bool
GetOccurredAt(timePart string) time.Time
ParseLine(line string) (logLine LogLine, ok bool)
ValidatePrefix() error
GetPrefixAndContent(line string) (prefix string, content string, ok bool)
}
func (s *Server) SetLogIgnoreFlags(ignoreStatement bool, ignoreDuration bool) {
var newFlags uint32
if ignoreStatement {
newFlags |= LOG_IGNORE_STATEMENT
}
if ignoreDuration {
newFlags |= LOG_IGNORE_DURATION
}
atomic.StoreUint32(&s.LogIgnoreFlags, newFlags)
}
func (s *Server) GetLogParser() LogParser {
s.LogParseMutex.RLock()
defer s.LogParseMutex.RUnlock()
return s.LogParser
}
// IgnoreLogLine - helper function that lets callers determine whether a log
// line should be filtered out early (before any analysis)
//
// This is mainly intended to support Log Insights for servers that have very
// high log volume due to running with log_statement=all or log_duration=on
// (something we can't parse effectively with today's regexp-based log parsing),
// and allow other less frequent log events to be analyzed.
func (s *Server) IgnoreLogLine(content string) bool {
flags := atomic.LoadUint32(&s.LogIgnoreFlags)
return (flags&LOG_IGNORE_STATEMENT != 0 && (strings.HasPrefix(content, "statement: ") || strings.HasPrefix(content, "execute ") || strings.HasPrefix(content, "parameters: "))) ||
(flags&LOG_IGNORE_DURATION != 0 && strings.HasPrefix(content, "duration: ") && !strings.Contains(content, " ms plan:\n"))
}