forked from NREL/api-umbrella
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reprocess-cloudwatch-logs.lua
160 lines (131 loc) · 4.26 KB
/
reprocess-cloudwatch-logs.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
local argparse = require "argparse"
local config = require("api-umbrella.utils.load_config")()
local icu_date = require "icu-date-ffi"
local json_decode = require("cjson").decode
local log_utils = require "api-umbrella.proxy.log_utils"
local shell_blocking_capture_combined = require("shell-games").capture_combined
local split = require("ngx.re").split
local table_new = require "table.new"
local format_iso8601 = icu_date.formats.iso8601()
local buffer_size = 100
local buffer_index = 0
local buffer = table_new(buffer_size, 0)
local function parse_time(string)
local date
if string then
date = icu_date.new()
local ok = pcall(date.parse, date, format_iso8601, string)
if not ok then
date = nil
end
end
return date
end
local function parse_args()
local parser = argparse("api-umbrella", "Open source API management")
parser:option("--profile", "AWS Profile"):count(1)
parser:option("--group", "CloudWatch group"):count(1)
parser:option("--start", "Migrate data starting at this time (YYYY-MM-DDThh:mm:ss.sssZ format)."):count(1)
parser:option("--stop", "Migrate data ending on this time (YYYY-MM-DDThh:mm:ss.sssZ format)."):count(1)
local parsed_args = parser:parse()
parsed_args["_start"] = parse_time(parsed_args["start"])
if not parsed_args["_start"] then
print("--start-date could not be parsed. YYYY-MM-DD format expected.")
os.exit(1)
end
parsed_args["_stop"] = parse_time(parsed_args["stop"])
if not parsed_args["_stop"] then
print("--start-date could not be parsed. YYYY-MM-DD format expected.")
os.exit(1)
end
return parsed_args
end
local function flush_buffer()
if buffer_index == 0 then
return
end
print("\nFlushing buffer to rsyslog (" .. buffer_index .. " records)...")
local packet = table.concat(buffer, "", 1, buffer_index)
local sock = ngx.socket.tcp()
local _, connect_err = sock:connect(config["rsyslog"]["host"], config["rsyslog"]["port"], {
pool_size = 10,
})
if connect_err then
ngx.log(ngx.ERR, "rsyslog connect error: ", connect_err)
os.exit(1)
end
sock:settimeouts(1000, 5000, 5000)
local _, send_err = sock:send(packet)
if send_err then
ngx.log(ngx.ERR, "rsyslog send error: ", send_err)
os.exit(1)
end
local _, keepalive_err = sock:setkeepalive(0)
if keepalive_err then
ngx.log(ngx.ERR, "rsyslog keepalive error: ", keepalive_err)
os.exit(1)
end
buffer_index = 0
buffer = table_new(buffer_size, 0)
end
local function process_minute(args, start, stop)
start = start:format(format_iso8601)
stop = stop:format(format_iso8601)
print("Fetching logs from " .. start .. " to " .. stop .. "...")
local saw_args = {
"saw",
"get",
args["group"],
"--start", start,
"--stop", stop,
"--filter", [[{ $.log = "*[rsyslog] {*" }]],
"--rawString",
}
if args["profile"] then
table.insert(saw_args, "--profile")
table.insert(saw_args, args["profile"])
end
local result, saw_err = shell_blocking_capture_combined(saw_args)
if saw_err then
ngx.log(ngx.ERR, saw_err)
os.exit(1)
end
local lines = split(result["output"], "[\r\n]+")
print("Processing logs from " .. start .. " to " .. stop .. " (" .. #lines .. " records)...")
for _, line in ipairs(lines) do
io.write(".")
local line_data = json_decode(line)
local match, match_err = ngx.re.match(line_data["log"], [[\[rsyslog\] (\{.+\})$]])
if match_err then
ngx.log(ngx.ERR, "regex error: ", match_err)
os.exit(1)
elseif not match then
ngx.log(ngx.ERR, "no matches found in line: ", line_data["log"])
else
local log_data = json_decode(match[1])
local syslog_message = log_utils.build_syslog_message(log_data)
buffer_index = buffer_index + 1
buffer[buffer_index] = syslog_message
end
if buffer_index >= buffer_size then
flush_buffer()
end
end
flush_buffer()
io.write("\n")
end
local function process(args)
local date = args["_start"]
while date:get_millis() <= args["_stop"]:get_millis() do
local next_minute = icu_date.new()
next_minute:set_millis(date:get_millis())
next_minute:add(icu_date.fields.MINUTE, 1)
process_minute(args, date, next_minute)
date = next_minute
end
end
local function run()
local args = parse_args()
process(args)
end
run()