diff --git a/README.md b/README.md
index c859cd4..0e61c6a 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,16 @@
# cloud_enum
+
Multi-cloud OSINT tool. Enumerate public resources in AWS, Azure, and Google Cloud.
Currently enumerates the following:
**Amazon Web Services**:
+
- Open / Protected S3 Buckets
- awsapps (WorkMail, WorkDocs, Connect, etc.)
**Microsoft Azure**:
+
- Storage Accounts
- Open Blob Storage Containers
- Hosted Databases
@@ -15,6 +18,7 @@ Currently enumerates the following:
- Web Apps
**Google Cloud Platform**
+
- Open / Protected GCP Buckets
- Open / Protected Firebase Realtime Databases
- Google App Engine sites
@@ -25,10 +29,10 @@ See it in action in [Codingo](https://github.com/codingo)'s video demo [here](ht
-
# Usage
## Setup
+
Several non-standard libaries are required to support threaded HTTP requests and dns lookups. You'll need to install the requirements as follows:
```sh
@@ -36,6 +40,7 @@ pip3 install -r ./requirements.txt
```
## Running
+
The only required argument is at least one keyword. You can use the built-in fuzzing strings, but you will get better results if you supply your own with `-m` and/or `-b`.
You can provide multiple keywords by specifying the `-k` argument multiple times.
@@ -57,6 +62,7 @@ HTTP scraping and DNS lookups use 5 threads each by default. You can try increas
**IMPORTANT**: Some resources (Azure Containers, GCP Functions) are discovered per-region. To save time scanning, there is a "REGIONS" variable defined in `cloudenum/azure_regions.py and cloudenum/gcp_regions.py` that is set by default to use only 1 region. You may want to look at these files and edit them to be relevant to your own work.
**Complete Usage Details**
+
```
usage: cloud_enum.py [-h] -k KEYWORD [-m MUTATIONS] [-b BRUTE]
@@ -77,7 +83,7 @@ optional arguments:
-ns NAMESERVER, --nameserver NAMESERVER
DNS server to use in brute-force.
-l LOGFILE, --logfile LOGFILE
- Will APPEND found items to specified file.
+ REMOVED Will APPEND found items to specified file.
-f FORMAT, --format FORMAT
Format for log file (text,json,csv - defaults to text)
--disable-aws Disable Amazon checks.
@@ -87,5 +93,7 @@ optional arguments:
```
# Thanks
+
So far, I have borrowed from:
+
- Some of the permutations from [GCPBucketBrute](https://github.com/RhinoSecurityLabs/GCPBucketBrute/blob/master/permutations.txt)
diff --git a/cloud_enum.py b/cloud_enum.py
index cb63f58..5f9ff69 100755
--- a/cloud_enum.py
+++ b/cloud_enum.py
@@ -13,18 +13,10 @@
import sys
import argparse
import re
-from enum_tools import aws_checks
-from enum_tools import azure_checks
-from enum_tools import gcp_checks
-from enum_tools import utils
-
-BANNER = '''
-##########################
- cloud_enum
- github.com/initstring
-##########################
-
-'''
+from enum_tools import aws_checks as aws
+from enum_tools import azure_checks as azure
+from enum_tools import gcp_checks as gcp
+from logger import logger
def parse_arguments():
@@ -47,32 +39,25 @@ def parse_arguments():
kw_group.add_argument('-kf', '--keyfile', type=str, action='store',
help='Input file with a single keyword per line.')
+ parser.add_argument('-l', '--log-level', type=str,
+ action='store', default='info', help='Log level')
+
# Use included mutations file by default, or let the user provide one
- parser.add_argument('-m', '--mutations', type=str, action='store',
- default=script_path + '/enum_tools/fuzz.txt',
- help='Mutations. Default: enum_tools/fuzz.txt')
+ parser.add_argument('-m', '--mutations', type=str, action='store', default=script_path +
+ '/enum_tools/fuzz.txt', help='Mutations. Default: enum_tools/fuzz.txt')
# Use include container brute-force or let the user provide one
- parser.add_argument('-b', '--brute', type=str, action='store',
- default=script_path + '/enum_tools/fuzz.txt',
- help='List to brute-force Azure container names.'
- ' Default: enum_tools/fuzz.txt')
+ parser.add_argument('-b', '--brute', type=str, action='store', default=script_path + '/enum_tools/fuzz.txt',
+ help='List to brute-force Azure container names. Default: enum_tools/fuzz.txt')
parser.add_argument('-t', '--threads', type=int, action='store',
- default=5, help='Threads for HTTP brute-force.'
- ' Default = 5')
+ default=5, help='Threads for HTTP brute-force. Default = 5')
parser.add_argument('-ns', '--nameserver', type=str, action='store',
- default='8.8.8.8',
- help='DNS server to use in brute-force.')
- parser.add_argument('-nsf', '--nameserverfile', type=str,
+ default='8.8.8.8', help='DNS server to use in brute-force.')
+
+ parser.add_argument('-nsf', '--nameserverfile', type=str,
help='Path to the file containing nameserver IPs')
- parser.add_argument('-l', '--logfile', type=str, action='store',
- help='Appends found items to specified file.')
- parser.add_argument('-f', '--format', type=str, action='store',
- default='text',
- help='Format for log file (text,json,csv)'
- ' - default: text')
parser.add_argument('--disable-aws', action='store_true',
help='Disable Amazon checks.')
@@ -86,51 +71,31 @@ def parse_arguments():
parser.add_argument('-qs', '--quickscan', action='store_true',
help='Disable all mutations and second-level scans')
+ parser.add_argument('-r', '--region', type=str,
+ action='store', help='Region to use for checks')
+
args = parser.parse_args()
# Ensure mutations file is readable
if not os.access(args.mutations, os.R_OK):
- print(f"[!] Cannot access mutations file: {args.mutations}")
+ log.new().error(f"Cannot access mutations file: {args.mutations}")
sys.exit()
# Ensure brute file is readable
if not os.access(args.brute, os.R_OK):
- print("[!] Cannot access brute-force file, exiting")
+ log.new().error("Cannot access brute-force file, exiting")
sys.exit()
# Ensure keywords file is readable
if args.keyfile:
if not os.access(args.keyfile, os.R_OK):
- print("[!] Cannot access keyword file, exiting")
+ log.new().error("Cannot access keyword file, exiting")
sys.exit()
# Parse keywords from input file
with open(args.keyfile, encoding='utf-8') as infile:
args.keyword = [keyword.strip() for keyword in infile]
- # Ensure log file is writeable
- if args.logfile:
- if os.path.isdir(args.logfile):
- print("[!] Can't specify a directory as the logfile, exiting.")
- sys.exit()
- if os.path.isfile(args.logfile):
- target = args.logfile
- else:
- target = os.path.dirname(args.logfile)
- if target == '':
- target = '.'
-
- if not os.access(target, os.W_OK):
- print("[!] Cannot write to log file, exiting")
- sys.exit()
-
- # Set up logging format
- if args.format not in ('text', 'json', 'csv'):
- print("[!] Sorry! Allowed log formats: 'text', 'json', or 'csv'")
- sys.exit()
- # Set the global in the utils file, where logging needs to happen
- utils.init_logfile(args.logfile, args.format)
-
return args
@@ -138,13 +103,12 @@ def print_status(args):
"""
Print a short pre-run status message
"""
- print(f"Keywords: {', '.join(args.keyword)}")
+ log.new().debug(f"Keywords: {', '.join(args.keyword)}")
if args.quickscan:
- print("Mutations: NONE! (Using quickscan)")
+ log.new().debug("Mutations: NONE! (Using quickscan)")
else:
- print(f"Mutations: {args.mutations}")
- print(f"Brute-list: {args.brute}")
- print("")
+ log.new().debug(f"Mutations: {args.mutations}")
+ log.new().debug(f"Brute-list: {args.brute}")
def check_windows():
@@ -157,8 +121,8 @@ def check_windows():
import colorama
colorama.init()
except ModuleNotFoundError:
- print("[!] Yo, Windows user - if you want pretty colors, you can"
- " install the colorama python package.")
+ log.new().debug("Yo, Windows user - if you want pretty colors, you can"
+ " install the colorama python package.")
def read_mutations(mutations_file):
@@ -168,7 +132,7 @@ def read_mutations(mutations_file):
with open(mutations_file, encoding="utf8", errors="ignore") as infile:
mutations = infile.read().splitlines()
- print(f"[+] Mutations list imported: {len(mutations)} items")
+ log.new().debug(f"Mutations list imported: {len(mutations)} items")
return mutations
@@ -218,10 +182,11 @@ def build_names(base_list, mutations):
append_name(f"{mutation}.{base}", names)
append_name(f"{mutation}-{base}", names)
- print(f"[+] Mutated results: {len(names)} items")
+ log.new().debug(f"Mutated results: {len(names)} items")
return names
+
def read_nameservers(file_path):
try:
with open(file_path, 'r') as file:
@@ -230,18 +195,22 @@ def read_nameservers(file_path):
raise ValueError("Nameserver file is empty")
return nameservers
except FileNotFoundError:
- print(f"Error: File '{file_path}' not found.")
+ log.new().error(f"Error: File '{file_path}' not found.")
exit(1)
except ValueError as e:
- print(e)
+ log.new().error(e)
exit(1)
+
def main():
"""
Main program function.
"""
args = parse_arguments()
- print(BANNER)
+
+ # Set up logging
+ global log
+ log = logger.Logger(args.log_level.upper())
# Generate a basic status on targets and parameters
print_status(args)
@@ -254,22 +223,23 @@ def main():
mutations = []
else:
mutations = read_mutations(args.mutations)
+
names = build_names(args.keyword, mutations)
# All the work is done in the individual modules
try:
if not args.disable_aws:
- aws_checks.run_all(names, args)
+ aws.AWSChecks(log, args, names).run_all()
if not args.disable_azure:
- azure_checks.run_all(names, args)
+ azure.AzureChecks(log, args, names).run_all()
if not args.disable_gcp:
- gcp_checks.run_all(names, args)
+ gcp.GCPChecks(log, args, names).run_all()
except KeyboardInterrupt:
- print("Thanks for playing!")
+ log.new().trace("Thanks for playing!")
sys.exit()
# Best of luck to you!
- print("\n[+] All done, happy hacking!\n")
+ log.new().trace("All done, happy hacking!")
sys.exit()
diff --git a/enum_tools/aws_checks.py b/enum_tools/aws_checks.py
index de8b10e..314b5b5 100644
--- a/enum_tools/aws_checks.py
+++ b/enum_tools/aws_checks.py
@@ -4,12 +4,7 @@
"""
from enum_tools import utils
-
-BANNER = '''
-++++++++++++++++++++++++++
- amazon checks
-++++++++++++++++++++++++++
-'''
+from logger import logger
# Known S3 domain names
S3_URL = 's3.amazonaws.com'
@@ -39,108 +34,113 @@
'sa-east-1.amazonaws.com']
-def print_s3_response(reply):
- """
- Parses the HTTP reply of a brute-force attempt
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'aws', 'msg': '', 'target': '', 'access': ''}
-
- if reply.status_code == 404:
- pass
- elif 'Bad Request' in reply.reason:
- pass
- elif reply.status_code == 200:
- data['msg'] = 'OPEN S3 BUCKET'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- utils.list_bucket_contents(reply.url)
- elif reply.status_code == 403:
- data['msg'] = 'Protected S3 Bucket'
- data['target'] = reply.url
- data['access'] = 'protected'
- utils.fmt_output(data)
- elif 'Slow Down' in reply.reason:
- print("[!] You've been rate limited, skipping rest of check...")
- return 'breakout'
- else:
- print(f" Unknown status codes being received from {reply.url}:\n"
- " {reply.status_code}: {reply.reason}")
-
- return None
-
-
-def check_s3_buckets(names, threads):
- """
- Checks for open and restricted Amazon S3 buckets
- """
- print("[+] Checking for S3 buckets")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of correctly formatted urls
- candidates = []
-
- # Take each mutated keyword craft a url with the correct format
- for name in names:
- candidates.append(f'{name}.{S3_URL}')
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(candidates, use_ssl=False,
- callback=print_s3_response,
- threads=threads)
-
- # Stop the time
- utils.stop_timer(start_time)
-
-
-def check_awsapps(names, threads, nameserver, nameserverfile=False):
- """
- Checks for existence of AWS Apps
- (ie. WorkDocs, WorkMail, Connect, etc.)
- """
- data = {'platform': 'aws', 'msg': 'AWS App Found:', 'target': '', 'access': ''}
-
- print("[+] Checking for AWS Apps")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of domain names to look up
- candidates = []
-
- # Initialize the list of valid hostnames
- valid_names = []
-
- # Take each mutated keyword craft a domain name to lookup.
- for name in names:
- candidates.append(f'{name}.{APPS_URL}')
-
- # AWS Apps use DNS sub-domains. First, see which are valid.
- valid_names = utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile, threads=threads)
-
- for name in valid_names:
- data['target'] = f'https://{name}'
- data['access'] = 'protected'
- utils.fmt_output(data)
-
- # Stop the timer
- utils.stop_timer(start_time)
-
-
-def run_all(names, args):
- """
- Function is called by main program
- """
- print(BANNER)
-
- # Use user-supplied AWS region if provided
- # if not regions:
- # regions = AWS_REGIONS
- check_s3_buckets(names, args.threads)
- check_awsapps(names, args.threads, args.nameserver, args.nameserverfile)
+class AWSChecks:
+ def __init__(self, log: logger.Logger, args, names):
+ self.log = log
+ self.args = args
+ self.names = names
+
+ def print_s3_response(self, reply):
+ """
+ Parses the HTTP reply of a brute-force attempt
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'aws', 'target': '', 'access': '', 'key': ''}
+
+ if reply.status_code == 404:
+ pass
+ elif 'Bad Request' in reply.reason:
+ pass
+ elif reply.status_code == 200:
+ data['key'] = 'bucket_open'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info('OPEN S3 BUCKET')
+ utils.list_bucket_contents(self.log, reply.url)
+ elif reply.status_code == 403:
+ data['key'] = 'bucket_protected'
+ data['target'] = reply.url
+ data['access'] = 'protected'
+ self.log.new().extra(map=data).info('Protected S3 Bucket')
+ elif 'Slow Down' in reply.reason:
+ self.log.new().warning("Rate limited by AWS")
+ return 'breakout'
+ else:
+ self.log.new().extra("status_code", reply.status_code).extra(
+ "reason", reply.reason).warning(f"Unknown status code from: {reply.url}")
+
+ return None
+
+ def check_s3_buckets(self):
+ """
+ Checks for open and restricted Amazon S3 buckets
+ """
+ self.log.new().trace("Checking for S3 buckets")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of correctly formatted urls
+ candidates = []
+
+ # Take each mutated keyword craft a url with the correct format
+ for name in self.names:
+ candidates.append(f'{name}.{S3_URL}')
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, candidates, use_ssl=False,
+ callback=self.print_s3_response,
+ threads=self.args.threads)
+
+ # Stop the time
+ self.log.new().trace(
+ f"Checking for S3 buckets took {utils.stop_timer(start_time)}")
+
+ def check_awsapps(self):
+ """
+ Checks for existence of AWS Apps
+ (ie. WorkDocs, WorkMail, Connect, etc.)
+ """
+ data = {'platform': 'aws', 'target': '', 'access': '', 'key': ''}
+
+ self.log.new().trace("Checking for AWS Apps")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of domain names to look up
+ candidates = []
+
+ # Initialize the list of valid hostnames
+ valid_names = []
+
+ # Take each mutated keyword craft a domain name to lookup.
+ for name in self.names:
+ candidates.append(f'{name}.{APPS_URL}')
+
+ # AWS Apps use DNS sub-domains. First, see which are valid.
+ valid_names = utils.fast_dns_lookup(
+ self.log, candidates, self.args.nameserver, self.args.nameserverfile, threads=self.args.threads)
+
+ for name in valid_names:
+ data['key'] = 'aws_app'
+ data['target'] = f'https://{name}'
+ data['access'] = 'protected'
+ self.log.new().extra(map=data).info('AWS App Found')
+
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for AWS Apps took {utils.stop_timer(start_time)}")
+
+ def run_all(self):
+ """
+ Function is called by main program
+ """
+
+ # Use user-supplied AWS region if provided
+ # if not regions:
+ # regions = AWS_REGIONS
+ self.check_s3_buckets()
+ self.check_awsapps()
diff --git a/enum_tools/azure_checks.py b/enum_tools/azure_checks.py
index 87b325b..f10ac69 100644
--- a/enum_tools/azure_checks.py
+++ b/enum_tools/azure_checks.py
@@ -7,16 +7,11 @@
import requests
from enum_tools import utils
from enum_tools import azure_regions
-
-BANNER = '''
-++++++++++++++++++++++++++
- azure checks
-++++++++++++++++++++++++++
-'''
+from logger import logger
# Known Azure domain names
BLOB_URL = 'blob.core.windows.net'
-FILE_URL= 'file.core.windows.net'
+FILE_URL = 'file.core.windows.net'
QUEUE_URL = 'queue.core.windows.net'
TABLE_URL = 'table.core.windows.net'
MGMT_URL = 'scm.azurewebsites.net'
@@ -29,501 +24,503 @@
VM_URL = 'cloudapp.azure.com'
-def print_account_response(reply):
- """
- Parses the HTTP reply of a brute-force attempt
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'azure', 'msg': '', 'target': '', 'access': ''}
-
- if reply.status_code == 404 or 'The requested URI does not represent' in reply.reason:
- pass
- elif 'Server failed to authenticate the request' in reply.reason:
- data['msg'] = 'Auth-Only Account'
- data['target'] = reply.url
- data['access'] = 'protected'
- utils.fmt_output(data)
- elif 'The specified account is disabled' in reply.reason:
- data['msg'] = 'Disabled Account'
- data['target'] = reply.url
- data['access'] = 'disabled'
- utils.fmt_output(data)
- elif 'Value for one of the query' in reply.reason:
- data['msg'] = 'HTTP-OK Account'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- elif 'The account being accessed' in reply.reason:
- data['msg'] = 'HTTPS-Only Account'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- elif 'Unauthorized' in reply.reason:
- data['msg'] = 'Unathorized Account'
- data['target'] = reply.url
- data['access'] = 'public'
- else:
- print(" Unknown status codes being received from " + reply.url +":\n"
- " "+ str(reply.status_code)+" : "+ reply.reason)
-
-def check_storage_accounts(names, threads, nameserver, nameserverfile=False):
- """
- Checks storage account names
- """
- print("[+] Checking for Azure Storage Accounts")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of domain names to look up
- candidates = []
-
- # Initialize the list of valid hostnames
- valid_names = []
-
- # Take each mutated keyword craft a domain name to lookup.
- # As Azure Storage Accounts can contain only letters and numbers,
- # discard those not matching to save time on the DNS lookups.
- regex = re.compile('[^a-zA-Z0-9]')
- for name in names:
- if not re.search(regex, name):
- candidates.append(f'{name}.{BLOB_URL}')
-
- # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
- valid_names = utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile, threads=threads)
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(valid_names, use_ssl=False,
- callback=print_account_response,
- threads=threads)
-
- # Stop the timer
- utils.stop_timer(start_time)
-
- # de-dupe the results and return
- return list(set(valid_names))
-
-def check_file_accounts(names, threads, nameserver, nameserverfile=False):
- """
- Checks File account names
- """
- print("[+] Checking for Azure File Accounts")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of domain names to look up
- candidates = []
-
- # Initialize the list of valid hostnames
- valid_names = []
-
- # Take each mutated keyword craft a domain name to lookup.
- # As Azure Storage Accounts can contain only letters and numbers,
- # discard those not matching to save time on the DNS lookups.
- regex = re.compile('[^a-zA-Z0-9]')
- for name in names:
- if not re.search(regex, name):
- candidates.append(f'{name}.{FILE_URL}')
-
- # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
- valid_names = utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile, threads=threads)
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(valid_names, use_ssl=False,
- callback=print_account_response,
- threads=threads)
-
- # Stop the timer
- utils.stop_timer(start_time)
-
- # de-dupe the results and return
- return list(set(valid_names))
-
-def check_queue_accounts(names, threads, nameserver, nameserverfile=False):
- """
- Checks Queue account names
- """
- print("[+] Checking for Azure Queue Accounts")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of domain names to look up
- candidates = []
-
- # Initialize the list of valid hostnames
- valid_names = []
-
- # Take each mutated keyword craft a domain name to lookup.
- # As Azure Storage Accounts can contain only letters and numbers,
- # discard those not matching to save time on the DNS lookups.
- regex = re.compile('[^a-zA-Z0-9]')
- for name in names:
- if not re.search(regex, name):
- candidates.append(f'{name}.{QUEUE_URL}')
-
- # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
- valid_names = utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile, threads=threads)
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(valid_names, use_ssl=False,
- callback=print_account_response,
- threads=threads)
-
- # Stop the timer
- utils.stop_timer(start_time)
-
- # de-dupe the results and return
- return list(set(valid_names))
-
-def check_table_accounts(names, threads, nameserver, nameserverfile=False):
- """
- Checks Table account names
- """
- print("[+] Checking for Azure Table Accounts")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of domain names to look up
- candidates = []
-
- # Initialize the list of valid hostnames
- valid_names = []
-
- # Take each mutated keyword craft a domain name to lookup.
- # As Azure Storage Accounts can contain only letters and numbers,
- # discard those not matching to save time on the DNS lookups.
- regex = re.compile('[^a-zA-Z0-9]')
- for name in names:
- if not re.search(regex, name):
- candidates.append(f'{name}.{TABLE_URL}')
-
- # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
- valid_names = utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile, threads=threads)
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(valid_names, use_ssl=False,
- callback=print_account_response,
- threads=threads)
-
- # Stop the timer
- utils.stop_timer(start_time)
-
- # de-dupe the results and return
- return list(set(valid_names))
-
-def check_mgmt_accounts(names, threads, nameserver, nameserverfile=False):
- """
- Checks App Management account names
- """
- print("[+] Checking for Azure App Management Accounts")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of domain names to look up
- candidates = []
-
- # Initialize the list of valid hostnames
- valid_names = []
-
- # Take each mutated keyword craft a domain name to lookup.
- # As Azure Storage Accounts can contain only letters and numbers,
- # discard those not matching to save time on the DNS lookups.
- regex = re.compile('[^a-zA-Z0-9]')
- for name in names:
- if not re.search(regex, name):
- candidates.append(f'{name}.{MGMT_URL}')
-
- # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
- valid_names = utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile, threads=threads)
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(valid_names, use_ssl=False,
- callback=print_account_response,
- threads=threads)
-
- # Stop the timer
- utils.stop_timer(start_time)
-
- # de-dupe the results and return
- return list(set(valid_names))
-
-def check_vault_accounts(names, threads, nameserver, nameserverfile=False):
- """
- Checks Key Vault account names
- """
- print("[+] Checking for Azure Key Vault Accounts")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of domain names to look up
- candidates = []
-
- # Initialize the list of valid hostnames
- valid_names = []
-
- # Take each mutated keyword craft a domain name to lookup.
- # As Azure Storage Accounts can contain only letters and numbers,
- # discard those not matching to save time on the DNS lookups.
- regex = re.compile('[^a-zA-Z0-9]')
- for name in names:
- if not re.search(regex, name):
- candidates.append(f'{name}.{VAULT_URL}')
-
- # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
- valid_names = utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile, threads=threads)
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(valid_names, use_ssl=False,
- callback=print_account_response,
- threads=threads)
-
- # Stop the timer
- utils.stop_timer(start_time)
-
- # de-dupe the results and return
- return list(set(valid_names))
-
-
-def print_container_response(reply):
- """
- Parses the HTTP reply of a brute-force attempt
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'azure', 'msg': '', 'target': '', 'access': ''}
-
- # Stop brute forcing disabled accounts
- if 'The specified account is disabled' in reply.reason:
- print(" [!] Breaking out early, account disabled.")
- return 'breakout'
-
- # Stop brute forcing accounts without permission
- if ('not authorized to perform this operation' in reply.reason or
- 'not have sufficient permissions' in reply.reason or
- 'Public access is not permitted' in reply.reason or
- 'Server failed to authenticate the request' in reply.reason):
- print(" [!] Breaking out early, auth required.")
- return 'breakout'
-
- # Stop brute forcing unsupported accounts
- if 'Blob API is not yet supported' in reply.reason:
- print(" [!] Breaking out early, Hierarchical namespace account")
- return 'breakout'
-
- # Handle other responses
- if reply.status_code == 404:
- pass
- elif reply.status_code == 200:
- data['msg'] = 'OPEN AZURE CONTAINER'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- utils.list_bucket_contents(reply.url)
- elif 'One of the request inputs is out of range' in reply.reason:
- pass
- elif 'The request URI is invalid' in reply.reason:
- pass
- else:
- print(f" Unknown status codes being received from {reply.url}:\n"
- " {reply.status_code}: {reply.reason}")
-
- return None
-
-
-def brute_force_containers(storage_accounts, brute_list, threads):
- """
- Attempts to find public Blob Containers in valid Storage Accounts
-
- Here is the URL format to list Azure Blog Container contents:
- .blob.core.windows.net//?restype=container&comp=list
- """
-
- # We have a list of valid DNS names that might not be worth scraping,
- # such as disabled accounts or authentication required. Let's quickly
- # weed those out.
- print(f"[*] Checking {len(storage_accounts)} accounts for status before brute-forcing")
- valid_accounts = []
- for account in storage_accounts:
- try:
- reply = requests.get(f'https://{account}/')
- if 'Server failed to authenticate the request' in reply.reason:
- storage_accounts.remove(account)
- elif 'The specified account is disabled' in reply.reason:
- storage_accounts.remove(account)
- else:
- valid_accounts.append(account)
- except requests.exceptions.ConnectionError as error_msg:
- print(f" [!] Connection error on https://{account}:")
- print(error_msg)
-
- # Read the brute force file into memory
- clean_names = utils.get_brute(brute_list, mini=3)
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- print(f"[*] Brute-forcing container names in {len(valid_accounts)} storage accounts")
- for account in valid_accounts:
- print(f"[*] Brute-forcing {len(clean_names)} container names in {account}")
-
- # Initialize the list of correctly formatted urls
+class AzureChecks:
+ def __init__(self, log: logger.Logger, args, names):
+ self.log = log
+ self.args = args
+ self.names = names
+
+ def print_account_response(self, reply):
+ """
+ Parses the HTTP reply of a brute-force attempt
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'azure', 'target': '', 'access': '', 'key': ''}
+
+ if reply.status_code == 404 or 'The requested URI does not represent' in reply.reason:
+ pass
+ elif 'Server failed to authenticate the request' in reply.reason:
+ data['key'] = 'account_auth'
+ data['target'] = reply.url
+ data['access'] = 'protected'
+ self.log.new().extra(map=data).info("Azure Auth-Only Account")
+ elif 'The specified account is disabled' in reply.reason:
+ data['key'] = 'account_disabled'
+ data['target'] = reply.url
+ data['access'] = 'disabled'
+ self.log.new().extra(map=data).info("Azure Disabled Account")
+ elif 'Value for one of the query' in reply.reason:
+ data['key'] = 'account_http_ok'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("Azure HTTP-OK Account")
+ elif 'The account being accessed' in reply.reason:
+ data['key'] = 'account_https_only'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("Azure HTTPS-Only Account")
+ elif 'Unauthorized' in reply.reason:
+ data['key'] = 'account_unauthorized'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).debug("Azure Unauthorized Account")
+ else:
+ self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
+ f"Unknown status code from: {reply.url}")
+
+ def check_storage_accounts(self):
+ """
+ Checks storage account names
+ """
+ self.log.new().trace("Checking for Azure Storage Accounts")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of domain names to look up
+ candidates = []
+
+ # Initialize the list of valid hostnames
+ valid_names = []
+
+ # Take each mutated keyword craft a domain name to lookup.
+ # As Azure Storage Accounts can contain only letters and numbers,
+ # discard those not matching to save time on the DNS lookups.
+ regex = re.compile('[^a-zA-Z0-9]')
+ for name in self.names:
+ if not re.search(regex, name):
+ candidates.append(f'{name}.{BLOB_URL}')
+
+ # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
+ valid_names = utils.fast_dns_lookup(
+ self.log, candidates, self.args.nameserver, self.args.nameserverfile, threads=self.args.threads)
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, valid_names, use_ssl=False,
+ callback=self.print_account_response, threads=self.args.threads)
+
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure Storage Accounts took {utils.stop_timer(start_time)}")
+
+ # de-dupe the results and return
+ return list(set(valid_names))
+
+ def check_file_accounts(self):
+ """
+ Checks File account names
+ """
+ self.log.new().trace("Checking for Azure File Accounts")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of domain names to look up
+ candidates = []
+
+ # Initialize the list of valid hostnames
+ valid_names = []
+
+ # Take each mutated keyword craft a domain name to lookup.
+ # As Azure Storage Accounts can contain only letters and numbers,
+ # discard those not matching to save time on the DNS lookups.
+ regex = re.compile('[^a-zA-Z0-9]')
+ for name in self.names:
+ if not re.search(regex, name):
+ candidates.append(f'{name}.{FILE_URL}')
+
+ # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
+ valid_names = utils.fast_dns_lookup(
+ self.log, candidates, self.args.nameserver, self.args.nameserverfile, threads=self.args.threads)
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, valid_names, use_ssl=False,
+ callback=self.print_account_response, threads=self.args.threads)
+
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure File Accounts took {utils.stop_timer(start_time)}")
+
+ # de-dupe the results and return
+ return list(set(valid_names))
+
+ def check_queue_accounts(self):
+ """
+ Checks Queue account names
+ """
+ self.log.new().trace("Checking for Azure Queue Accounts")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of domain names to look up
candidates = []
- # Take each mutated keyword and craft a url with correct format
- for name in clean_names:
- candidates.append(f'{account}/{name}/?restype=container&comp=list')
+ # Initialize the list of valid hostnames
+ valid_names = []
+
+ # Take each mutated keyword craft a domain name to lookup.
+ # As Azure Storage Accounts can contain only letters and numbers,
+ # discard those not matching to save time on the DNS lookups.
+ regex = re.compile('[^a-zA-Z0-9]')
+ for name in self.names:
+ if not re.search(regex, name):
+ candidates.append(f'{name}.{QUEUE_URL}')
+
+ # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
+ valid_names = utils.fast_dns_lookup(
+ self.log, candidates, self.args.nameserver, self.args.nameserverfile, threads=self.args.threads)
# Send the valid names to the batch HTTP processor
- utils.get_url_batch(candidates, use_ssl=True,
- callback=print_container_response,
- threads=threads)
+ utils.get_url_batch(self.log, valid_names, use_ssl=False,
+ callback=self.print_account_response, threads=self.args.threads)
- # Stop the timer
- utils.stop_timer(start_time)
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure Queue Accounts took {utils.stop_timer(start_time)}")
+ # de-dupe the results and return
+ return list(set(valid_names))
-def print_website_response(hostname):
- """
- This function is passed into the DNS brute force as a callback,
- so we can get real-time results.
- """
- data = {'platform': 'azure', 'msg': '', 'target': '', 'access': ''}
+ def check_table_accounts(self):
+ """
+ Checks Table account names
+ """
+ self.log.new().trace("Checking for Azure Table Accounts")
- data['msg'] = 'Registered Azure Website DNS Name'
- data['target'] = hostname
- data['access'] = 'public'
- utils.fmt_output(data)
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+ # Initialize the list of domain names to look up
+ candidates = []
-def check_azure_websites(names, nameserver, threads, nameserverfile=False):
- """
- Checks for Azure Websites (PaaS)
- """
- print("[+] Checking for Azure Websites")
+ # Initialize the list of valid hostnames
+ valid_names = []
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
+ # Take each mutated keyword craft a domain name to lookup.
+ # As Azure Storage Accounts can contain only letters and numbers,
+ # discard those not matching to save time on the DNS lookups.
+ regex = re.compile('[^a-zA-Z0-9]')
+ for name in self.names:
+ if not re.search(regex, name):
+ candidates.append(f'{name}.{TABLE_URL}')
- # Initialize the list of domain names to look up
- candidates = [name + '.' + WEBAPP_URL for name in names]
+ # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
+ valid_names = utils.fast_dns_lookup(
+ self.log, candidates, self.args.nameserver, self.args.nameserverfile, threads=self.args.threads)
- # Azure Websites use DNS sub-domains. If it resolves, it is registered.
- utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile,
- callback=print_website_response,
- threads=threads)
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, valid_names, use_ssl=False,
+ callback=self.print_account_response, threads=self.args.threads)
- # Stop the timer
- utils.stop_timer(start_time)
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure Table Accounts took {utils.stop_timer(start_time)}")
+ # de-dupe the results and return
+ return list(set(valid_names))
-def print_database_response(hostname):
- """
- This function is passed into the DNS brute force as a callback,
- so we can get real-time results.
- """
- data = {'platform': 'azure', 'msg': '', 'target': '', 'access': ''}
+ def check_mgmt_accounts(self):
+ """
+ Checks App Management account names
+ """
+ self.log.new().trace("Checking for Azure App Management Accounts")
- data['msg'] = 'Registered Azure Database DNS Name'
- data['target'] = hostname
- data['access'] = 'public'
- utils.fmt_output(data)
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+ # Initialize the list of domain names to look up
+ candidates = []
-def check_azure_databases(names, nameserver, threads, nameserverfile=False):
- """
- Checks for Azure Databases
- """
- print("[+] Checking for Azure Databases")
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
+ # Initialize the list of valid hostnames
+ valid_names = []
- # Initialize the list of domain names to look up
- candidates = [name + '.' + DATABASE_URL for name in names]
+ # Take each mutated keyword craft a domain name to lookup.
+ # As Azure Storage Accounts can contain only letters and numbers,
+ # discard those not matching to save time on the DNS lookups.
+ regex = re.compile('[^a-zA-Z0-9]')
+ for name in self.names:
+ if not re.search(regex, name):
+ candidates.append(f'{name}.{MGMT_URL}')
- # Azure databases use DNS sub-domains. If it resolves, it is registered.
- utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile,
- callback=print_database_response,
- threads=threads)
+ # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
+ valid_names = utils.fast_dns_lookup(
+ self.log, candidates, self.args.nameserver, self.args.nameserverfile, threads=self.args.threads)
- # Stop the timer
- utils.stop_timer(start_time)
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, valid_names, use_ssl=False,
+ callback=self.print_account_response, threads=self.args.threads)
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure App Management Accounts took {utils.stop_timer(start_time)}")
-def print_vm_response(hostname):
- """
- This function is passed into the DNS brute force as a callback,
- so we can get real-time results.
- """
- data = {'platform': 'azure', 'msg': '', 'target': '', 'access': ''}
+ # de-dupe the results and return
+ return list(set(valid_names))
- data['msg'] = 'Registered Azure Virtual Machine DNS Name'
- data['target'] = hostname
- data['access'] = 'public'
- utils.fmt_output(data)
+ def check_vault_accounts(self):
+ """
+ Checks Key Vault account names
+ """
+ self.log.new().trace("Checking for Azure Key Vault Accounts")
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
-def check_azure_vms(names, nameserver, threads, nameserverfile=False):
- """
- Checks for Azure Virtual Machines
- """
- print("[+] Checking for Azure Virtual Machines")
+ # Initialize the list of domain names to look up
+ candidates = []
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
+ # Initialize the list of valid hostnames
+ valid_names = []
- # Pull the regions from a config file
- regions = azure_regions.REGIONS
+ # Take each mutated keyword craft a domain name to lookup.
+ # As Azure Storage Accounts can contain only letters and numbers,
+ # discard those not matching to save time on the DNS lookups.
+ regex = re.compile('[^a-zA-Z0-9]')
+ for name in self.names:
+ if not re.search(regex, name):
+ candidates.append(f'{name}.{VAULT_URL}')
- print(f"[*] Testing across {len(regions)} regions defined in the config file")
+ # Azure Storage Accounts use DNS sub-domains. First, see which are valid.
+ valid_names = utils.fast_dns_lookup(
+ self.log, candidates, self.args.nameserver, self.args.nameserverfile, threads=self.args.threads)
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, valid_names, use_ssl=False,
+ callback=self.print_account_response, threads=self.args.threads)
+
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure Key Vault Accounts took {utils.stop_timer(start_time)}")
+
+ # de-dupe the results and return
+ return list(set(valid_names))
+
+ def print_container_response(self, reply):
+ """
+ Parses the HTTP reply of a brute-force attempt
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'azure', 'target': '', 'access': '', 'key': ''}
+
+ # Stop brute forcing disabled accounts
+ if 'The specified account is disabled' in reply.reason:
+ self.log.new().trace("Azure account disabled, breaking out early")
+ return 'breakout'
+
+ # Stop brute forcing accounts without permission
+ if ('not authorized to perform this operation' in reply.reason or
+ 'not have sufficient permissions' in reply.reason or
+ 'Public access is not permitted' in reply.reason or
+ 'Server failed to authenticate the request' in reply.reason):
+ self.log.new().trace("Azure account requires auth, breaking out early")
+ return 'breakout'
+
+ # Stop brute forcing unsupported accounts
+ if 'Blob API is not yet supported' in reply.reason:
+ self.log.new().trace("Azure account is Hierarchical namespace, breaking out early")
+ return 'breakout'
+
+ # Handle other responses
+ if reply.status_code == 404:
+ pass
+ elif reply.status_code == 200:
+ data['key'] = 'container_open'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info('Open Azure Container')
+ utils.list_bucket_contents(self.log, reply.url)
+ elif 'One of the request inputs is out of range' in reply.reason:
+ pass
+ elif 'The request URI is invalid' in reply.reason:
+ pass
+ else:
+ self.log.new().extra("status_code", reply.status_code).extra(
+ "reason", reply.reason).warning(f"Unknown status code from: {reply.url}")
+
+ return None
+
+ def brute_force_containers(self, storage_accounts: list):
+ """
+ Attempts to find public Blob Containers in valid Storage Accounts
+
+ Here is the URL format to list Azure Blog Container contents:
+ .blob.core.windows.net//?restype=container&comp=list
+ """
+
+ # We have a list of valid DNS names that might not be worth scraping,
+ # such as disabled accounts or authentication required. Let's quickly
+ # weed those out.
+ self.log.new().trace(
+ f"Checking {len(storage_accounts)} accounts for status before brute-forcing")
+ valid_accounts = []
+ for account in storage_accounts:
+ try:
+ reply = requests.get(f'https://{account}/')
+ if 'Server failed to authenticate the request' in reply.reason:
+ storage_accounts.remove(account)
+ elif 'The specified account is disabled' in reply.reason:
+ storage_accounts.remove(account)
+ else:
+ valid_accounts.append(account)
+ except requests.exceptions.ConnectionError as error_msg:
+ self.log.new().warning(
+ f"Connection error on https://{account}: {error_msg}")
+
+ # Read the brute force file into memory
+ clean_names = utils.get_brute(self.args.brute, mini=3)
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ self.log.new().trace(
+ f"Brute-forcing container names in {len(valid_accounts)} storage accounts")
+ for account in valid_accounts:
+ self.log.new().trace(
+ f"Brute-forcing {len(clean_names)} container names in {account}")
+
+ # Initialize the list of correctly formatted urls
+ candidates = []
+
+ # Take each mutated keyword and craft a url with correct format
+ for name in clean_names:
+ candidates.append(
+ f'{account}/{name}/?restype=container&comp=list')
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log,
+ candidates, use_ssl=True, callback=self.print_container_response, threads=self.args.threads)
+
+ # Stop the timer
+ self.log.new().trace(
+ f"Brute-forcing Azure Containers took {utils.stop_timer(start_time)}")
+
+ def print_website_response(self, hostname):
+ """
+ This function is passed into the DNS brute force as a callback,
+ so we can get real-time results.
+ """
+ data = {'platform': 'azure', 'target': '', 'access': '', 'key': ''}
+
+ data['key'] = 'registered_website_dns'
+ data['target'] = hostname
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info('Registered Azure Website DNS Name')
- for region in regions:
+ def check_azure_websites(self):
+ """
+ Checks for Azure Websites (PaaS)
+ """
+ self.log.new().trace("Checking for Azure Websites")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of domain names to look up
+ candidates = [name + '.' + WEBAPP_URL for name in self.names]
+
+ # Azure Websites use DNS sub-domains. If it resolves, it is registered.
+ utils.fast_dns_lookup(self.log, candidates, self.args.nameserver, self.args.nameserverfile,
+ callback=self.print_website_response, threads=self.args.threads)
+
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure Websites took {utils.stop_timer(start_time)}")
+
+ def print_database_response(self, hostname):
+ """
+ This function is passed into the DNS brute force as a callback,
+ so we can get real-time results.
+ """
+ data = {'platform': 'azure', 'target': '', 'access': '', 'key': ''}
+
+ data['key'] = 'registered_database_dns'
+ data['target'] = hostname
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info('Registered Azure Database DNS Name')
+
+ def check_azure_databases(self):
+ """
+ Checks for Azure Databases
+ """
+ self.log.new().trace("Checking for Azure Databases")
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
# Initialize the list of domain names to look up
- candidates = [name + '.' + region + '.' + VM_URL for name in names]
-
- # Azure VMs use DNS sub-domains. If it resolves, it is registered.
- utils.fast_dns_lookup(candidates, nameserver,
- nameserverfile,
- callback=print_vm_response,
- threads=threads)
-
- # Stop the timer
- utils.stop_timer(start_time)
-
-
-def run_all(names, args):
- """
- Function is called by main program
- """
- print(BANNER)
-
- valid_accounts = check_storage_accounts(names, args.threads,
- args.nameserver, args.nameserverfile)
- if valid_accounts and not args.quickscan:
- brute_force_containers(valid_accounts, args.brute, args.threads)
-
- check_file_accounts(names, args.threads, args.nameserver, args.nameserverfile)
- check_queue_accounts(names, args.threads, args.nameserver, args.nameserverfile)
- check_table_accounts(names, args.threads, args.nameserver, args.nameserverfile)
- check_mgmt_accounts(names, args.threads, args.nameserver, args.nameserverfile)
- check_vault_accounts(names, args.threads, args.nameserver, args.nameserverfile)
-
- check_azure_websites(names, args.nameserver, args.threads, args.nameserverfile)
- check_azure_databases(names, args.nameserver, args.threads, args.nameserverfile)
- check_azure_vms(names, args.nameserver, args.threads, args.nameserverfile)
+ candidates = [name + '.' + DATABASE_URL for name in self.names]
+
+ # Azure databases use DNS sub-domains. If it resolves, it is registered.
+ utils.fast_dns_lookup(self.log, candidates, self.args.nameserver, self.args.nameserverfile,
+ callback=self.print_database_response, threads=self.args.threads)
+
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure Databases took {utils.stop_timer(start_time)}")
+
+ def print_vm_response(self, hostname):
+ """
+ This function is passed into the DNS brute force as a callback,
+ so we can get real-time results.
+ """
+ data = {'platform': 'azure', 'target': '', 'access': '', 'key': ''}
+
+ data['key'] = 'registered_vm_dns'
+ data['target'] = hostname
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info('Registered Azure Virtual Machine DNS Name')
+
+ def check_azure_vms(self):
+ """
+ Checks for Azure Virtual Machines
+ """
+ self.log.new().trace("Checking for Azure Virtual Machines")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Pull the regions from a config file
+ regions = azure_regions.REGIONS
+
+ # If a region is specified, use that instead
+ if self.args.region:
+ regions = [self.args.region]
+
+ self.log.new().trace(
+ f"Testing across {len(regions)} regions defined in the config file or command line")
+
+ for region in regions:
+ # Initialize the list of domain names to look up
+ candidates = [name + '.' + region +
+ '.' + VM_URL for name in self.names]
+
+ # Azure VMs use DNS sub-domains. If it resolves, it is registered.
+ utils.fast_dns_lookup(self.log, candidates, self.args.nameserver, self.args.nameserverfile,
+ callback=self.print_vm_response, threads=self.args.threads)
+
+ # Stop the timer
+ self.log.new().trace(
+ f"Checking for Azure Virtual Machines took {utils.stop_timer(start_time)}")
+
+ def run_all(self):
+ """
+ Function is called by main program
+ """
+
+ valid_accounts = self.check_storage_accounts()
+ if valid_accounts and not self.args.quickscan:
+ self.brute_force_containers(self, valid_accounts)
+
+ self.check_file_accounts()
+ self.check_queue_accounts()
+ self.check_table_accounts()
+ self.check_mgmt_accounts()
+ self.check_vault_accounts()
+
+ self.check_azure_websites()
+ self.check_azure_databases()
+ self.check_azure_vms()
diff --git a/enum_tools/gcp_checks.py b/enum_tools/gcp_checks.py
index b31c2ca..0ab48e5 100644
--- a/enum_tools/gcp_checks.py
+++ b/enum_tools/gcp_checks.py
@@ -5,12 +5,7 @@
from enum_tools import utils
from enum_tools import gcp_regions
-
-BANNER = '''
-++++++++++++++++++++++++++
- google checks
-++++++++++++++++++++++++++
-'''
+from logger import logger
# Known GCP domain names
GCP_URL = 'storage.googleapis.com'
@@ -24,367 +19,367 @@
HAS_FUNCS = []
-def print_bucket_response(reply):
- """
- Parses the HTTP reply of a brute-force attempt
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'gcp', 'msg': '', 'target': '', 'access': ''}
-
- if reply.status_code == 404:
- pass
- elif reply.status_code == 200:
- data['msg'] = 'OPEN GOOGLE BUCKET'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- utils.list_bucket_contents(reply.url + '/')
- elif reply.status_code == 403:
- data['msg'] = 'Protected Google Bucket'
- data['target'] = reply.url
- data['access'] = 'protected'
- utils.fmt_output(data)
- else:
- print(f" Unknown status codes being received from {reply.url}:\n"
- " {reply.status_code}: {reply.reason}")
-
-
-def check_gcp_buckets(names, threads):
- """
- Checks for open and restricted Google Cloud buckets
- """
- print("[+] Checking for Google buckets")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of correctly formatted urls
- candidates = []
-
- # Take each mutated keyword craft a url with the correct format
- for name in names:
- candidates.append(f'{GCP_URL}/{name}')
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(candidates, use_ssl=False,
- callback=print_bucket_response,
- threads=threads)
-
- # Stop the time
- utils.stop_timer(start_time)
-
-
-def print_fbrtdb_response(reply):
- """
- Parses the HTTP reply of a brute-force attempt
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'gcp', 'msg': '', 'target': '', 'access': ''}
-
- if reply.status_code == 404:
- pass
- elif reply.status_code == 200:
- data['msg'] = 'OPEN GOOGLE FIREBASE RTDB'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- elif reply.status_code == 401:
- data['msg'] = 'Protected Google Firebase RTDB'
- data['target'] = reply.url
- data['access'] = 'protected'
- utils.fmt_output(data)
- elif reply.status_code == 402:
- data['msg'] = 'Payment required on Google Firebase RTDB'
- data['target'] = reply.url
- data['access'] = 'disabled'
- utils.fmt_output(data)
- elif reply.status_code == 423:
- data['msg'] = 'The Firebase database has been deactivated.'
- data['target'] = reply.url
- data['access'] = 'disabled'
- utils.fmt_output(data)
- else:
- print(f" Unknown status codes being received from {reply.url}:\n"
- " {reply.status_code}: {reply.reason}")
-
-
-def check_fbrtdb(names, threads):
- """
- Checks for Google Firebase RTDB
- """
- print("[+] Checking for Google Firebase Realtime Databases")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of correctly formatted urls
- candidates = []
-
- # Take each mutated keyword craft a url with the correct format
- for name in names:
- # Firebase RTDB names cannot include a period. We'll exlcude
- # those from the global candidates list
- if '.' not in name:
- candidates.append(f'{name}.{FBRTDB_URL}/.json')
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(candidates, use_ssl=True,
- callback=print_fbrtdb_response,
- threads=threads,
- redir=False)
-
- # Stop the time
- utils.stop_timer(start_time)
-
-
-def print_fbapp_response(reply):
- """
- Parses the HTTP reply of a brute-force attempt
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'gcp', 'msg': '', 'target': '', 'access': ''}
-
- if reply.status_code == 404:
- pass
- elif reply.status_code == 200:
- data['msg'] = 'OPEN GOOGLE FIREBASE APP'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- else:
- print(f" Unknown status codes being received from {reply.url}:\n"
- " {reply.status_code}: {reply.reason}")
-
-def check_fbapp(names, threads):
- """
- Checks for Google Firebase Applications
- """
- print("[+] Checking for Google Firebase Applications")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of correctly formatted urls
- candidates = []
-
- # Take each mutated keyword craft a url with the correct format
- for name in names:
- # Firebase App names cannot include a period. We'll exlcude
- # those from the global candidates list
- if '.' not in name:
- candidates.append(f'{name}.{FBAPP_URL}')
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(candidates, use_ssl=True,
- callback=print_fbapp_response,
- threads=threads,
- redir=False)
-
- # Stop the time
- utils.stop_timer(start_time)
-
-def print_appspot_response(reply):
- """
- Parses the HTTP reply of a brute-force attempt
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'gcp', 'msg': '', 'target': '', 'access': ''}
-
- if reply.status_code == 404:
- pass
- elif str(reply.status_code)[0] == 5:
- data['msg'] = 'Google App Engine app with a 50x error'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- elif reply.status_code in (200, 302, 404):
- if 'accounts.google.com' in reply.url:
- data['msg'] = 'Protected Google App Engine app'
- data['target'] = reply.history[0].url
+class GCPChecks:
+ def __init__(self, log: logger.Logger, args, names):
+ self.log = log
+ self.args = args
+ self.names = names
+
+ def print_bucket_response(self, reply):
+ """
+ Parses the HTTP reply of a brute-force attempt
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'gcp', 'target': '', 'access': '', 'key': ''}
+
+ if reply.status_code == 404:
+ pass
+ elif reply.status_code == 200:
+ data['key'] = 'bucket_open'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("Open Google Bucket")
+ utils.list_bucket_contents(self.log, reply.url + '/')
+ elif reply.status_code == 403:
+ data['key'] = 'bucket_protected'
+ data['target'] = reply.url
data['access'] = 'protected'
- utils.fmt_output(data)
+ self.log.new().extra(map=data).info("Protected Google Bucket")
+ else:
+ self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
+ f"Unknown status code from: {reply.url}")
+
+ def check_gcp_buckets(self):
+ """
+ Checks for open and restricted Google Cloud buckets
+ """
+ self.log.new().trace("Checking for Google buckets")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of correctly formatted urls
+ candidates = []
+
+ # Take each mutated keyword craft a url with the correct format
+ for name in self.names:
+ candidates.append(f'{GCP_URL}/{name}')
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, candidates, use_ssl=False,
+ callback=self.print_bucket_response, threads=self.args.threads)
+
+ # Stop the time
+ self.log.new().trace(
+ f"Checking for Google buckets took {utils.stop_timer(start_time)}")
+
+ def print_fbrtdb_response(self, reply):
+ """
+ Parses the HTTP reply of a brute-force attempt
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'gcp', 'target': '', 'access': '', 'key': ''}
+
+ if reply.status_code == 404:
+ pass
+ elif reply.status_code == 200:
+ data['key'] = 'firebase_open'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("Open Google Firebase RTDB")
+ elif reply.status_code == 401:
+ data['key'] = 'firebase_protected'
+ data['target'] = reply.url
+ data['access'] = 'protected'
+ self.log.new().extra(map=data).info("Protected Google Firebase RTDB")
+ elif reply.status_code == 402:
+ data['key'] = 'firebase_payment_required'
+ data['target'] = reply.url
+ data['access'] = 'disabled'
+ self.log.new().extra(map=data).info("Payment required on Google Firebase RTDB")
+ elif reply.status_code == 423:
+ data['key'] = 'firebase_disabled'
+ data['target'] = reply.url
+ data['access'] = 'disabled'
+ self.log.new().extra(map=data).info("Deactivated Google Firebase RTDB")
+ else:
+ self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
+ f"Unknown status code from: {reply.url}")
+
+ def check_fbrtdb(self):
+ """
+ Checks for Google Firebase RTDB
+ """
+ self.log.new().trace("Checking for Google Firebase Realtime Databases")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of correctly formatted urls
+ candidates = []
+
+ # Take each mutated keyword craft a url with the correct format
+ for name in self.names:
+ # Firebase RTDB names cannot include a period. We'll exclude
+ # those from the global candidates list
+ if '.' not in name:
+ candidates.append(f'{name}.{FBRTDB_URL}/.json')
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, candidates, use_ssl=True, callback=self.print_fbrtdb_response,
+ threads=self.args.threads, redir=False)
+
+ # Stop the time
+ self.log.new().trace(
+ f"Checking for Google Firebase RTDB took {utils.stop_timer(start_time)}")
+
+ def print_fbapp_response(self, reply):
+ """
+ Parses the HTTP reply of a brute-force attempt
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'gcp', 'target': '', 'access': '', 'key': ''}
+
+ if reply.status_code == 404:
+ pass
+ elif reply.status_code == 200:
+ data['key'] = 'firebase_open'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("Open Google Firebase App")
else:
- data['msg'] = 'Open Google App Engine app'
+ self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
+ f"Unknown status code from: {reply.url}")
+
+ def check_fbapp(self):
+ """
+ Checks for Google Firebase Applications
+ """
+ self.log.new().trace("Checking for Google Firebase Applications")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of correctly formatted urls
+ candidates = []
+
+ # Take each mutated keyword craft a url with the correct format
+ for name in self.names:
+ # Firebase App names cannot include a period. We'll exclude
+ # those from the global candidates list
+ if '.' not in name:
+ candidates.append(f'{name}.{FBAPP_URL}')
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, candidates, use_ssl=True, callback=self.print_fbapp_response,
+ threads=self.args.threads, redir=False)
+
+ # Stop the time
+ self.log.new().trace(
+ f"Checking for Google Firebase Applications took {utils.stop_timer(start_time)}")
+
+ def print_appspot_response(self, reply):
+ """
+ Parses the HTTP reply of a brute-force attempt
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'gcp', 'target': '', 'access': '', 'key': ''}
+
+ if reply.status_code == 404:
+ pass
+ elif str(reply.status_code)[0] == 5:
+ data['key'] = 'app_engine_error'
data['target'] = reply.url
data['access'] = 'public'
- utils.fmt_output(data)
- else:
- print(f" Unknown status codes being received from {reply.url}:\n"
- " {reply.status_code}: {reply.reason}")
-
-
-def check_appspot(names, threads):
- """
- Checks for Google App Engine sites running on appspot.com
- """
- print("[+] Checking for Google App Engine apps")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of correctly formatted urls
- candidates = []
-
- # Take each mutated keyword craft a url with the correct format
- for name in names:
- # App Engine project names cannot include a period. We'll exlcude
- # those from the global candidates list
- if '.' not in name:
- candidates.append(f'{name}.{APPSPOT_URL}')
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(candidates, use_ssl=False,
- callback=print_appspot_response,
- threads=threads)
-
- # Stop the time
- utils.stop_timer(start_time)
-
-
-def print_functions_response1(reply):
- """
- Parses the HTTP reply the initial Cloud Functions check
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'gcp', 'msg': '', 'target': '', 'access': ''}
-
- if reply.status_code == 404:
- pass
- elif reply.status_code == 302:
- data['msg'] = 'Contains at least 1 Cloud Function'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- HAS_FUNCS.append(reply.url)
- else:
- print(f" Unknown status codes being received from {reply.url}:\n"
- " {reply.status_code}: {reply.reason}")
-
-
-def print_functions_response2(reply):
- """
- Parses the HTTP reply from the secondary, brute-force Cloud Functions check
-
- This function is passed into the class object so we can view results
- in real-time.
- """
- data = {'platform': 'gcp', 'msg': '', 'target': '', 'access': ''}
-
- if 'accounts.google.com/ServiceLogin' in reply.url:
- pass
- elif reply.status_code in (403, 401):
- data['msg'] = 'Auth required Cloud Function'
- data['target'] = reply.url
- data['access'] = 'protected'
- utils.fmt_output(data)
- elif reply.status_code == 405:
- data['msg'] = 'UNAUTHENTICATED Cloud Function (POST-Only)'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- elif reply.status_code in (200, 404):
- data['msg'] = 'UNAUTHENTICATED Cloud Function (GET-OK)'
- data['target'] = reply.url
- data['access'] = 'public'
- utils.fmt_output(data)
- else:
- print(f" Unknown status codes being received from {reply.url}:\n"
- " {reply.status_code}: {reply.reason}")
-
-
-def check_functions(names, brute_list, quickscan, threads):
- """
- Checks for Google Cloud Functions running on cloudfunctions.net
-
- This is a two-part process. First, we want to find region/project combos
- that have existing Cloud Functions. The URL for a function looks like this:
- https://[ZONE]-[PROJECT-ID].cloudfunctions.net/[FUNCTION-NAME]
-
- We look for a 302 in [ZONE]-[PROJECT-ID].cloudfunctions.net. That means
- there are some functions defined in that region. Then, we brute force a list
- of possible function names there.
-
- See gcp_regions.py to define which regions to check. The tool currently
- defaults to only 1 region, so you should really modify it for best results.
- """
- print("[+] Checking for project/zones with Google Cloud Functions.")
-
- # Start a counter to report on elapsed time
- start_time = utils.start_timer()
-
- # Initialize the list of correctly formatted urls
- candidates = []
-
- # Pull the regions from a config file
- regions = gcp_regions.REGIONS
-
- print(f"[*] Testing across {len(regions)} regions defined in the config file")
-
- # Take each mutated keyword craft a url with the correct format
- for region in regions:
- candidates += [region + '-' + name + '.' + FUNC_URL for name in names]
-
- # Send the valid names to the batch HTTP processor
- utils.get_url_batch(candidates, use_ssl=False,
- callback=print_functions_response1,
- threads=threads,
- redir=False)
-
- # Retun from function if we have not found any valid combos
- if not HAS_FUNCS:
- utils.stop_timer(start_time)
- return
-
- # Also bail out if doing a quick scan
- if quickscan:
- return
-
- # If we did find something, we'll use the brute list. This will allow people
- # to provide a separate fuzzing list if they choose.
- print(f"[*] Brute-forcing function names in {len(HAS_FUNCS)} project/region combos")
-
- # Load brute list in memory, based on allowed chars/etc
- brute_strings = utils.get_brute(brute_list)
-
- # The global was built in a previous function. We only want to brute force
- # project/region combos that we know have existing functions defined
- for func in HAS_FUNCS:
- print(f"[*] Brute-forcing {len(brute_strings)} function names in {func}")
- # Initialize the list of initial URLs to check. Strip out the HTTP
- # protocol first, as that is handled in the utility
- func = func.replace("http://", "")
-
- # Noticed weird behaviour with functions when a slash is not appended.
- # Works for some, but not others. However, appending a slash seems to
- # get consistent results. Might need further validation.
- candidates = [func + brute + '/' for brute in brute_strings]
+ self.log.new().extra(map=data).info("Google App Engine app with a 50x error")
+ elif reply.status_code in (200, 302, 404):
+ if 'accounts.google.com' in reply.url:
+ data['key'] = 'app_engine_protected'
+ data['target'] = reply.history[0].url
+ data['access'] = 'protected'
+ self.log.new().extra(map=data).info("Protected Google App Engine app")
+ else:
+ data['key'] = 'app_engine_open'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("Open Google App Engine app")
+ else:
+ self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
+ f"Unknown status code from: {reply.url}")
+
+ def check_appspot(self):
+ """
+ Checks for Google App Engine sites running on appspot.com
+ """
+ self.log.new().trace("Checking for Google App Engine apps")
+
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of correctly formatted urls
+ candidates = []
+
+ # Take each mutated keyword craft a url with the correct format
+ for name in self.names:
+ # App Engine project names cannot include a period. We'll exlcude
+ # those from the global candidates list
+ if '.' not in name:
+ candidates.append(f'{name}.{APPSPOT_URL}')
# Send the valid names to the batch HTTP processor
- utils.get_url_batch(candidates, use_ssl=False,
- callback=print_functions_response2,
- threads=threads)
+ utils.get_url_batch(self.log, candidates, use_ssl=False,
+ callback=self.print_appspot_response, threads=self.args.threads)
+
+ # Stop the time
+ self.log.new().trace(
+ f"Checking for Google App Engine apps took {utils.stop_timer(start_time)}")
+
+ def print_functions_response1(self, reply):
+ """
+ Parses the HTTP reply the initial Cloud Functions check
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'gcp', 'target': '', 'access': '', 'key': ''}
+
+ if reply.status_code == 404:
+ pass
+ elif reply.status_code == 302:
+ data['key'] = 'has_cloud_functions'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("Contains at least 1 Cloud Function")
+ HAS_FUNCS.append(reply.url)
+ else:
+ self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
+ f"Unknown status code from: {reply.url}")
+
+ def print_functions_response2(self, reply):
+ """
+ Parses the HTTP reply from the secondary, brute-force Cloud Functions check
+
+ This function is passed into the class object so we can view results
+ in real-time.
+ """
+ data = {'platform': 'gcp', 'target': '', 'access': '', 'key': ''}
+
+ if 'accounts.google.com/ServiceLogin' in reply.url:
+ pass
+ elif reply.status_code in (403, 401):
+ data['key'] = 'cloud_function_auth_required'
+ data['target'] = reply.url
+ data['access'] = 'protected'
+ self.log.new().extra(map=data).info("Auth required Cloud Function")
+ elif reply.status_code == 405:
+ data['key'] = 'cloud_function_post_only'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("UNAUTHENTICATED Cloud Function (POST-Only)")
+ elif reply.status_code in (200, 404):
+ data['key'] = 'cloud_function_get_ok'
+ data['target'] = reply.url
+ data['access'] = 'public'
+ self.log.new().extra(map=data).info("UNAUTHENTICATED Cloud Function (GET-OK)")
+ else:
+ self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
+ f"Unknown status code from: {reply.url}")
+
+ def check_functions(self):
+ """
+ Checks for Google Cloud Functions running on cloudfunctions.net
- # Stop the time
- utils.stop_timer(start_time)
+ This is a two-part process. First, we want to find region/project combos
+ that have existing Cloud Functions. The URL for a function looks like this:
+ https://[ZONE]-[PROJECT-ID].cloudfunctions.net/[FUNCTION-NAME]
+ We look for a 302 in [ZONE]-[PROJECT-ID].cloudfunctions.net. That means
+ there are some functions defined in that region. Then, we brute force a list
+ of possible function names there.
-def run_all(names, args):
- """
- Function is called by main program
- """
- print(BANNER)
+ See gcp_regions.py to define which regions to check. The tool currently
+ defaults to only 1 region, so you should really modify it for best results.
+ """
+ self.log.new().trace("Checking for project/zones with Google Cloud Functions.")
- check_gcp_buckets(names, args.threads)
- check_fbrtdb(names, args.threads)
- check_appspot(names, args.threads)
- check_functions(names, args.brute, args.quickscan, args.threads)
+ # Start a counter to report on elapsed time
+ start_time = utils.start_timer()
+
+ # Initialize the list of correctly formatted urls
+ candidates = []
+
+ # Pull the regions from a config file
+ regions = gcp_regions.REGIONS
+
+ # If a region is specified, use that instead
+ if self.args.region:
+ regions = [self.args.region]
+
+ self.log.new().trace(
+ f"Testing across {len(regions)} regions defined in the config file or command line")
+
+ # Take each mutated keyword craft a url with the correct format
+ for region in regions:
+ candidates += [region + '-' + name +
+ '.' + FUNC_URL for name in self.names]
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log, candidates, use_ssl=False,
+ callback=self.print_functions_response1, threads=self.args.threads, redir=False)
+
+ # Retun from function if we have not found any valid combos
+ if not HAS_FUNCS:
+ utils.stop_timer(start_time)
+ return
+
+ # Also bail out if doing a quick scan
+ if self.args.quickscan:
+ return
+
+ # If we did find something, we'll use the brute list. This will allow people
+ # to provide a separate fuzzing list if they choose.
+ self.log.new().trace(
+ f"Brute-forcing function names in {len(HAS_FUNCS)} project/region combos")
+
+ # Load brute list in memory, based on allowed chars/etc
+ brute_strings = utils.get_brute(self.args.brute)
+
+ # The global was built in a previous function. We only want to brute force
+ # project/region combos that we know have existing functions defined
+ for func in HAS_FUNCS:
+ self.log.new().trace(
+ f"Brute-forcing {len(brute_strings)} function names in {func}")
+ # Initialize the list of initial URLs to check. Strip out the HTTP
+ # protocol first, as that is handled in the utility
+ func = func.replace("http://", "")
+
+ # Noticed weird behaviour with functions when a slash is not appended.
+ # Works for some, but not others. However, appending a slash seems to
+ # get consistent results. Might need further validation.
+ candidates = [func + brute + '/' for brute in brute_strings]
+
+ # Send the valid names to the batch HTTP processor
+ utils.get_url_batch(self.log,
+ candidates, use_ssl=False, callback=self.print_functions_response2, threads=self.args.threads)
+
+ # Stop the time
+ self.log.new().trace(
+ f"Checking for project/zones with Google Cloud Functions took {utils.stop_timer(start_time)}")
+
+ def run_all(self):
+ """
+ Function is called by main program
+ """
+
+ self.check_gcp_buckets()
+ self.check_fbrtdb()
+ self.check_appspot()
+ self.check_functions()
diff --git a/enum_tools/utils.py b/enum_tools/utils.py
index 66e5f6e..e18e030 100644
--- a/enum_tools/utils.py
+++ b/enum_tools/utils.py
@@ -4,14 +4,11 @@
import time
import sys
-import datetime
import re
-import csv
-import json
import ipaddress
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
-from urllib.parse import urlparse
+from logger import logger
try:
import requests
import dns
@@ -23,25 +20,6 @@
print("[!] Please pip install requirements.txt.")
sys.exit()
-LOGFILE = False
-LOGFILE_FMT = ''
-
-
-def init_logfile(logfile, fmt):
- """
- Initialize the global logfile if specified as a user-supplied argument
- """
- if logfile:
- global LOGFILE
- LOGFILE = logfile
-
- global LOGFILE_FMT
- LOGFILE_FMT = fmt
-
- now = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
- with open(logfile, 'a', encoding='utf-8') as log_writer:
- log_writer.write(f"\n\n#### CLOUD_ENUM {now} ####\n")
-
def is_valid_domain(domain):
"""
@@ -56,21 +34,16 @@ def is_valid_domain(domain):
# Each label should be between 1 and 63 characters long
if not (1 <= len(label) <= 63):
return False
-
+
return True
-def get_url_batch(url_list, use_ssl=False, callback='', threads=5, redir=True):
+def get_url_batch(log: logger.Logger, url_list, use_ssl=False, callback='', threads=5, redir=True):
"""
Processes a list of URLs, sending the results back to the calling
function in real-time via the `callback` parameter
"""
- # Start a counter for a status message
- tick = {}
- tick['total'] = len(url_list)
- tick['current'] = 0
-
# Filter out invalid URLs
url_list = [url for url in url_list if is_valid_domain(url)]
@@ -92,13 +65,15 @@ def get_url_batch(url_list, use_ssl=False, callback='', threads=5, redir=True):
# happening. Putting it in here fixes the issue.
# There is an unresolved discussion here:
# https://github.com/ross/requests-futures/issues/20
- session = FuturesSession(executor=ThreadPoolExecutor(max_workers=threads+5))
+ session = FuturesSession(
+ executor=ThreadPoolExecutor(max_workers=threads+5))
batch_pending = {}
batch_results = {}
# First, grab the pending async request and store it in a dict
for url in batch:
- batch_pending[url] = session.get(proto + url, allow_redirects=redir)
+ batch_pending[url] = session.get(
+ proto + url, allow_redirects=redir)
# Then, grab all the results from the queue.
# This is where we need to catch exceptions that occur with large
@@ -109,11 +84,10 @@ def get_url_batch(url_list, use_ssl=False, callback='', threads=5, redir=True):
# hanging forever with no exception raised.
batch_results[url] = batch_pending[url].result(timeout=30)
except requests.exceptions.ConnectionError as error_msg:
- print(f" [!] Connection error on {url}:")
- print(error_msg)
+ log.new().warning(f"Connection error on {url}: {error_msg}")
except TimeoutError:
- print(f" [!] Timeout on {url}. Investigate if there are"
- " many of these")
+ log.new().warning(
+ f"Timeout on {url}. Investigate if there are many of these")
# Now, send all the results to the callback function for analysis
# We need a way to stop processing unnecessary brute-forces, so the
@@ -123,16 +97,8 @@ def get_url_batch(url_list, use_ssl=False, callback='', threads=5, redir=True):
if check == 'breakout':
return
- # Refresh a status message
- tick['current'] += threads
- sys.stdout.flush()
- sys.stdout.write(f" {tick['current']}/{tick['total']} complete...")
- sys.stdout.write('\r')
-
- # Clear the status message
- sys.stdout.write(' \r')
-def read_nameservers(file_path):
+def read_nameservers(log: logger.Logger, file_path):
"""
Reads nameservers from a given file.
Each line in the file should contain one nameserver IP address.
@@ -140,17 +106,20 @@ def read_nameservers(file_path):
"""
try:
with open(file_path, 'r') as file:
- nameservers = [line.strip() for line in file if line.strip() and not line.startswith('#')]
+ nameservers = [line.strip() for line in file if line.strip()
+ and not line.startswith('#')]
if not nameservers:
- raise ValueError("Nameserver file is empty or only contains comments")
+ raise ValueError(
+ "Nameserver file is empty or only contains comments")
return nameservers
except FileNotFoundError:
- print(f"Error: File '{file_path}' not found.")
+ log.new().error(f"Error: File '{file_path}' not found.")
exit(1)
except ValueError as e:
- print(e)
+ log.new().error(e)
exit(1)
+
def is_valid_ip(address):
try:
ipaddress.ip_address(address)
@@ -158,7 +127,8 @@ def is_valid_ip(address):
except ValueError:
return False
-def dns_lookup(nameserver, name):
+
+def dns_lookup(log: logger.Logger, nameserver, name):
"""
This function performs the actual DNS lookup when called in a threadpool
by the fast_dns_lookup function.
@@ -170,7 +140,7 @@ def dns_lookup(nameserver, name):
res = dns.resolver.Resolver()
res.timeout = 10
if nameserverfile:
- nameservers = read_nameservers(nameserverfile)
+ nameservers = read_nameservers(log, nameserverfile)
res.nameservers = nameservers
else:
res.nameservers = [nameserver]
@@ -182,27 +152,22 @@ def dns_lookup(nameserver, name):
except dns.resolver.NXDOMAIN:
return ''
except dns.resolver.NoNameservers as exc_text:
- print(" [!] Error querying nameservers! This could be a problem.")
- print(" [!] If you're using a VPN, try setting --ns to your VPN's nameserver.")
- print(" [!] Bailing because you need to fix this")
- print(" [!] More Info:")
- print(exc_text)
+ log.new().error(f"Error querying nameservers: {exc_text}")
return '-#BREAKOUT_DNS_ERROR#-'
except dns.exception.Timeout:
- print(f" [!] DNS Timeout on {name}. Investigate if there are many"
- " of these.")
+ log.new().warning(
+ f"DNS Timeout on {name}. Investigate if there are many of these")
return ''
-def fast_dns_lookup(names, nameserver, nameserverfile, callback='', threads=5):
+def fast_dns_lookup(log: logger.Logger, names, nameserver, nameserverfile, callback='', threads=5):
"""
Helper function to resolve DNS names. Uses multithreading.
"""
- total = len(names)
- current = 0
valid_names = []
- print(f"[*] Brute-forcing a list of {total} possible DNS names")
+ log.new().trace(
+ f"Brute-forcing a list of {len(names)} possible DNS names")
# Filter out invalid domains
names = [name for name in names if is_valid_domain(name)]
@@ -216,9 +181,9 @@ def fast_dns_lookup(names, nameserver, nameserverfile, callback='', threads=5):
# Because pool.map takes only a single function arg, we need to
# define this partial so that each iteration uses the same ns
if nameserverfile:
- dns_lookup_params = partial(dns_lookup, nameserverfile)
+ dns_lookup_params = partial(dns_lookup, log, nameserverfile)
else:
- dns_lookup_params = partial(dns_lookup, nameserver)
+ dns_lookup_params = partial(dns_lookup, log, nameserver)
results = pool.map(dns_lookup_params, batch)
@@ -230,22 +195,12 @@ def fast_dns_lookup(names, nameserver, nameserverfile, callback='', threads=5):
if callback:
callback(name)
valid_names.append(name)
-
- current += threads
-
- # Update the status message
- sys.stdout.flush()
- sys.stdout.write(f" {current}/{total} complete...")
- sys.stdout.write('\r')
pool.close()
- # Clear the status message
- sys.stdout.write(' \r')
-
return valid_names
-def list_bucket_contents(bucket):
+def list_bucket_contents(log: logger.Logger, bucket):
"""
Provides a list of full URLs to each open bucket
"""
@@ -262,40 +217,11 @@ def list_bucket_contents(bucket):
# Format them to full URLs and print to console
if keys:
- print(" FILES:")
for key in keys:
url = bucket + key
- print(f" ->{url}")
+ log.new().extra("bucket", bucket).debug(f"File: {url}")
else:
- print(" ...empty bucket, so sad. :(")
-
-
-def fmt_output(data):
- """
- Handles the output - printing and logging based on a specified format
- """
- # ANSI escape sequences are set based on accessibility of target
- # (basically, how public it is))
- bold = '\033[1m'
- end = '\033[0m'
- if data['access'] == 'public':
- ansi = bold + '\033[92m' # green
- if data['access'] == 'protected':
- ansi = bold + '\033[33m' # orange
- if data['access'] == 'disabled':
- ansi = bold + '\033[31m' # red
-
- sys.stdout.write(' ' + ansi + data['msg'] + ': ' + data['target'] + end + '\n')
-
- if LOGFILE:
- with open(LOGFILE, 'a', encoding='utf-8') as log_writer:
- if LOGFILE_FMT == 'text':
- log_writer.write(f'{data["msg"]}: {data["target"]}\n')
- if LOGFILE_FMT == 'csv':
- writer = csv.DictWriter(log_writer, data.keys())
- writer.writerow(data)
- if LOGFILE_FMT == 'json':
- log_writer.write(json.dumps(data) + '\n')
+ log.new().extra("bucket", bucket).debug("Empty bucket")
def get_brute(brute_file, mini=1, maxi=63, banned='[^a-z0-9_-]'):
@@ -330,13 +256,10 @@ def start_timer():
def stop_timer(start_time):
"""
- Stops timer and prints a status
+ Stops timer and returns difference
"""
# Stop the timer
elapsed_time = time.time() - start_time
formatted_time = time.strftime("%H:%M:%S", time.gmtime(elapsed_time))
- # Print some statistics
- print("")
- print(f" Elapsed time: {formatted_time}")
- print("")
+ return formatted_time
diff --git a/logger/logger.py b/logger/logger.py
new file mode 100644
index 0000000..1da3242
--- /dev/null
+++ b/logger/logger.py
@@ -0,0 +1,81 @@
+"""
+JSON logger
+"""
+
+import datetime
+from distutils.log import Log
+import json
+
+TRACE = 0
+DEBUG = 1
+INFO = 2
+WARNING = 3
+ERROR = 4
+
+
+class Logger:
+ def __init__(self, level):
+ self._level = self.__level_int(level)
+ self._extra_data = {}
+
+ def new(self):
+ return Logger(self.__level_str(self._level))
+
+ def extra(self, key=None, value=None, map=None):
+ if map:
+ self._extra_data.update(map)
+ elif key and value:
+ self._extra_data[key] = value
+ return self
+
+ def trace(self, msg):
+ self.__log(TRACE, msg)
+
+ def debug(self, msg):
+ self.__log(DEBUG, msg)
+
+ def info(self, msg):
+ self.__log(INFO, msg)
+
+ def warning(self, msg):
+ self.__log(WARNING, msg)
+
+ def error(self, msg):
+ self.__log(ERROR, msg)
+
+ def __log(self, level, msg):
+ if self._level > level:
+ return
+ entry = {
+ 'time': datetime.datetime.now().isoformat(),
+ 'level': self.__level_str(level),
+ 'message': msg
+ }
+ entry.update(self._extra_data)
+ print(json.dumps(entry))
+
+ def __level_str(self, level):
+ if level == TRACE:
+ return "TRACE"
+ if level == DEBUG:
+ return "DEBUG"
+ if level == INFO:
+ return "INFO"
+ if level == WARNING:
+ return "WARNING"
+ if level == ERROR:
+ return "ERROR"
+ return "INFO"
+
+ def __level_int(self, level):
+ if level == "TRACE":
+ return TRACE
+ if level == "DEBUG":
+ return DEBUG
+ if level == "INFO":
+ return INFO
+ if level == "WARNING":
+ return WARNING
+ if level == "ERROR":
+ return ERROR
+ return INFO
diff --git a/manpage/cloud_enum.txt b/manpage/cloud_enum.txt
index 4cfe598..8729a15 100644
--- a/manpage/cloud_enum.txt
+++ b/manpage/cloud_enum.txt
@@ -33,8 +33,8 @@ OPTIONS
-b BRUTE, --brute BRUTE List to brute-force Azure container names. Default: /usr/lib/cloud-enum/enum_tools/fuzz.txt.
-t THREADS, --threads THREADS Threads for HTTP brute-force. Default = 5.
-ns NAMESERVER, --nameserver NAMESERVER DNS server to use in brute-force.
- -l LOGFILE, --logfile LOGFILE Will APPEND found items to specified file.
- -f FORMAT, --format Format Format for log file (text,json,csv - defaults to text)
+ -l LOGFILE, --logfile LOGFILE REMOVED: Will APPEND found items to specified file.
+ -f FORMAT, --format Format REMOVED: Format for log file (text,json,csv - defaults to text)
--disable-aws Disable Amazon checks.
--disable-azure Disable Azure checks.
--disable-gcp Disable Google checks.