forked from Yelp/paasta
-
Notifications
You must be signed in to change notification settings - Fork 0
/
firewall_logging.py
154 lines (118 loc) · 4.08 KB
/
firewall_logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import argparse
import logging
import os
import signal
import socket
import socketserver
import sys
import syslogmp
from paasta_tools.firewall import services_running_here
from paasta_tools.utils import _log
from paasta_tools.utils import configure_log
from paasta_tools.utils import load_system_paasta_config
DEFAULT_NUM_WORKERS = 5
log = logging.getLogger(__name__)
class SyslogUDPHandler(socketserver.BaseRequestHandler):
def setup(self):
configure_log()
self.cluster = load_system_paasta_config().get_cluster()
def handle(self):
data, socket = self.request
syslog_to_paasta_log(data, self.cluster)
def syslog_to_paasta_log(data, cluster):
iptables_log = parse_syslog(data)
if iptables_log is None:
return
service, instance = lookup_service_instance_by_ip(iptables_log["SRC"])
if service is None or instance is None:
return
# prepend hostname
log_line = iptables_log["hostname"] + ": " + iptables_log["message"]
_log(
service=service,
component="security",
level="debug",
cluster=cluster,
instance=instance,
line=log_line,
)
def parse_syslog(data):
parsed_data = syslogmp.parse(data)
try:
full_message = parsed_data.message.decode()
except UnicodeDecodeError:
return None
if not full_message.startswith("kernel: ["):
# Not a kernel message
return None
close_bracket = full_message.find("]")
if close_bracket == -1:
return None
iptables_message = full_message[close_bracket + 1 :].strip()
parts = iptables_message.split(" ")
# parts[0] is the log-prefix
# parts[1..] is either KEY=VALUE or just KEY
if not parts[1].startswith("IN="):
# not an iptables message
return None
fields = {k: v for k, _, v in (field.partition("=") for field in parts[1:])}
fields["hostname"] = parsed_data.hostname
fields["message"] = iptables_message
return fields
def lookup_service_instance_by_ip(ip_lookup):
for service, instance, mac, ip in services_running_here():
if ip == ip_lookup:
return (service, instance)
log.info(f"Unable to find container for ip {ip_lookup}")
return (None, None)
def parse_args(argv=None):
parser = argparse.ArgumentParser(
description="Adapts iptables syslog messages into scribe"
)
parser.add_argument(
"-v", "--verbose", action="store_true", dest="verbose", default=False
)
parser.add_argument(
"-l", "--listen-host", help="Default %(default)s", default="127.0.0.1"
)
parser.add_argument(
"-p", "--listen-port", type=int, help="Default %(default)s", default=1516
)
parser.add_argument(
"-w",
"--num-workers",
type=int,
help="Default %(default)s",
default=DEFAULT_NUM_WORKERS,
)
args = parser.parse_args(argv)
return args
def setup_logging(verbose):
if verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARNING)
class MultiUDPServer(socketserver.UDPServer):
# UDPServer with SO_REUSEPORT enabled so that incoming packets are
# load-balanced across listeners
def server_bind(self):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
# UDPServer is old-style class so can't use super
socketserver.UDPServer.server_bind(self)
def run_server(listen_host, listen_port):
server = MultiUDPServer((listen_host, listen_port), SyslogUDPHandler)
server.serve_forever()
def main(argv=None):
args = parse_args(argv)
setup_logging(args.verbose)
assert args.num_workers > 0
# start n-1 separate processes, then run_server() on this one
num_forks = args.num_workers - 1
for x in range(num_forks):
if os.fork() == 0:
run_server(args.listen_host, args.listen_port)
# propagate SIGTERM to all my children then exit
signal.signal(
signal.SIGTERM, lambda signum, _: os.killpg(os.getpid(), signum) or sys.exit(1)
)
run_server(args.listen_host, args.listen_port)