-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcheck_health_probes.py
131 lines (94 loc) · 4.21 KB
/
check_health_probes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import argparse
import datetime
import email
import logging
import os
import re
from email.message import Message
from imaplib import IMAP4, IMAP4_SSL
from types import ModuleType
from collections.abc import Sequence
from conf import constants, settings
logger = logging.getLogger()
CHECK_METHODS = ('dkim', 'spf', 'dmarc')
AuthHeaders = list[str]
AuthResults = dict[str, AuthHeaders]
def enrich_settings_from_env(settings: ModuleType) -> None:
params = ('IMAP_USERNAME', 'IMAP_PASSWORD')
for param in params:
setattr(settings, param, os.environ.get(param, getattr(settings, param)))
def failed_methods(from_address: str, auth_headers: AuthHeaders) -> set[str]:
methods: set[str] = set()
for header in auth_headers:
_, results = header.split(';', 1)
check_methods = '|'.join(CHECK_METHODS)
pattern = f'({check_methods})=(?!pass)'
for method in re.findall(pattern, results):
methods.add(method)
return methods
def headers_summary(message: Message) -> str:
summary = 'From: {sender}, Subject: {subject}'
return summary.format(sender=message['From'], subject=message['Subject'])
def imap_recent_health_messages(imap: IMAP4, current_date_time: datetime.datetime) -> tuple[list[Message], list[str]]:
rc, msgnums = imap.uid('search', '', 'UNSEEN')
if rc != 'OK':
raise Exception(f'Unexpected response while searching: {rc}')
messages: list[Message] = []
uids = msgnums[0].split()
for uid in uids:
rc, mail_data = imap.uid('fetch', uid, '(BODY[HEADER])')
if rc != 'OK':
raise Exception(f'Unexpected response while fetching message {uid}: {rc}')
message = email.message_from_bytes(mail_data[0][1])
if constants.HEADER_DATE_TIME not in message:
logger.info(f'Ignoring (no header): {headers_summary(message)}')
continue
sent_date_time = datetime.datetime.fromisoformat(message[constants.HEADER_DATE_TIME])
delta = current_date_time - sent_date_time
if delta > datetime.timedelta(seconds=settings.CHECK_ACCEPT_AGE_SECONDS):
logger.info(f'Ignoring (too old): {headers_summary(message)}')
continue
messages.append(message)
return messages, uids
def auth_results(messages: list[Message]) -> AuthResults:
headers: AuthResults = {}
for message in messages:
headers[message['From']] = message.get_all('Authentication-Results')
return headers
def log_failed_messages(auth_results: AuthResults, addresses: Sequence[str]) -> int:
address_failed = len(addresses)
for expected_address in addresses:
if expected_address not in auth_results:
logger.error(f'Did not find {expected_address} in emails, did something go wrong there?')
continue
methods = failed_methods(expected_address, auth_results[expected_address])
if methods:
logger.error(f'{expected_address} failed methods: {", ".join(methods)}')
headers = '\n'.join(auth_results[expected_address])
logger.error(f'Headers:\n{headers}')
continue
address_failed -= 1
return address_failed
def setup_logging(args: argparse.Namespace):
level = logging.WARNING
if args.verbose:
level = logging.DEBUG
logging.basicConfig(format='%(asctime)s %(levelname)8s: %(message)s', level=level)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='count', default=0)
args = parser.parse_args()
setup_logging(args)
enrich_settings_from_env(settings)
current_date_time = datetime.datetime.utcnow()
with IMAP4_SSL(host=settings.IMAP_HOSTNAME) as imap:
imap.login(settings.IMAP_USERNAME, settings.IMAP_PASSWORD)
imap.select(settings.IMAP_LIST_FOLDER)
messages, uids = imap_recent_health_messages(imap, current_date_time)
addresses_failed = log_failed_messages(auth_results(messages), settings.SMTP_FROM_ADDRESSES)
# If there were problems, we leave emails for debugging, otherwise we delete the successful ones
if not addresses_failed:
for uid in uids:
imap.uid('store', uid, '+FLAGS', '\\Deleted')
imap.expunge()
main()