forked from TURROKS/CVE_Prioritizer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcve_prioritizer.py
120 lines (105 loc) · 4.5 KB
/
cve_prioritizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python3
__author__ = "Mario Rojas"
__license__ = "BSD 3-clause"
__version__ = "1.3.0"
__maintainer__ = "Mario Rojas"
__status__ = "Production"
import argparse
import json
import os
import re
import threading
import time
from threading import Semaphore
from dotenv import load_dotenv
from scripts.constants import LOGO
from scripts.constants import SIMPLE_HEADER
from scripts.constants import VERBOSE_HEADER
from scripts.helpers import worker
from scripts.helpers import cve_trends
load_dotenv()
Throttle_msg = ""
# argparse setup
parser = argparse.ArgumentParser(description="CVE Prioritizer", epilog='Happy Patching',
usage='cve_prioritizer.py -c CVE-XXXX-XXXX')
parser.add_argument('-c', '--cve', type=str, help='Unique CVE-ID', required=False, metavar='')
parser.add_argument('-d', '--demo', help='Top 10 CVEs of the last 7days from cvetrends.com', action='store_true')
parser.add_argument('-e', '--epss', type=float, help='EPSS threshold (Default 0.2)', default=0.2, metavar='')
parser.add_argument('-f', '--file', type=argparse.FileType('r'), help='TXT file with CVEs (One per Line)',
required=False, metavar='')
parser.add_argument('-n', '--cvss', type=float, help='CVSS threshold (Default 6.0)', default=6.0, metavar='')
parser.add_argument('-o', '--output', type=str, help='Output filename', required=False, metavar='')
parser.add_argument('-t', '--threads', type=int, help='Number of concurrent threads', required=False, metavar='',
default=100)
parser.add_argument('-v', '--verbose', help='Verbose mode', action='store_true')
parser.add_argument('-l', '--list', help='Space separated list of CVEs', nargs='+', required=False, metavar='')
# Global Arguments
args = parser.parse_args()
if __name__ == '__main__':
# standard args
header = SIMPLE_HEADER
epss_threshold = args.epss
cvss_threshold = args.cvss
sem = Semaphore(args.threads)
# Temporal lists
cve_list = []
threads = []
if args.verbose:
header = VERBOSE_HEADER
if args.cve:
cve_list.append(args.cve)
# print(LOGO+header)
if not os.getenv('NIST_API'):
print(LOGO + 'Warning: Using this tool without specifying a NIST API may result in errors'
+ '\n\n' + header)
else:
print(LOGO + header)
elif args.list:
cve_list = args.list
if not os.getenv('NIST_API'):
if len(cve_list) > 75:
Throttle_msg = "Large number of CVEs detected, requests will be throttle to avoid API issues"
print(LOGO + Throttle_msg + '\n'
+ 'Warning: Using this tool without specifying a NIST API may result in errors' + '\n\n' + header)
else:
print(LOGO + header)
elif args.file:
cve_list = [line.rstrip() for line in args.file]
if not os.getenv('NIST_API'):
if len(cve_list) > 75:
Throttle_msg = "Large number of CVEs detected, requests will be throttle to avoid API issues"
print(LOGO + Throttle_msg + '\n'
+ 'Warning: Using this tool without specifying a NIST API may result in errors' + '\n\n' + header)
else:
print(LOGO + header)
elif args.demo:
try:
trends = cve_trends()
if trends:
cve_list = trends
if not os.getenv('NIST_API'):
print(
LOGO + 'Warning: Using this tool without specifying a NIST API may result in errors'
+ '\n\n' + header)
else:
print(LOGO + header)
except json.JSONDecodeError:
print(f"Unable to connect to CVE Trends")
if args.output:
with open(args.output, 'w') as output_file:
output_file.write("cve_id,priority,epss,cvss,cvss_version,cvss_severity,cisa_kev"+"\n")
for cve in cve_list:
throttle = 1
if len(cve_list) > 75 and not os.getenv('NIST_API'):
throttle = 6
if not re.match("(CVE|cve-\d{4}-\d+$)", cve):
print(f"{cve} Error: CVEs should be provided in the standard format CVE-0000-0000*")
else:
sem.acquire()
t = threading.Thread(target=worker, args=(cve.upper().strip(), cvss_threshold, epss_threshold, args.verbose,
sem, args.output))
threads.append(t)
t.start()
time.sleep(throttle)
for t in threads:
t.join()