Skip to content

Commit

Permalink
[sc-25981] Refactor gcp checks to use logger
Browse files Browse the repository at this point in the history
  • Loading branch information
drunkenplatypus committed Jul 5, 2024
1 parent 58dc37a commit f0a5a9e
Showing 1 changed file with 56 additions and 68 deletions.
124 changes: 56 additions & 68 deletions enum_tools/gcp_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

from enum_tools import utils
from enum_tools import gcp_regions

BANNER = '''
++++++++++++++++++++++++++
google checks
++++++++++++++++++++++++++
'''
from logger import logger

# Known GCP domain names
GCP_URL = 'storage.googleapis.com'
Expand All @@ -25,7 +20,7 @@


class GCPChecks:
def __init__(self, log, args, names):
def __init__(self, log: logger.Logger, args, names):
self.log = log
self.args = args
self.names = names
Expand All @@ -43,27 +38,25 @@ def print_bucket_response(self, reply):
if reply.status_code == 404:
pass
elif reply.status_code == 200:
data['key'] = 'BUCKET_OPEN'
data['msg'] = 'OPEN GOOGLE BUCKET'
data['key'] = 'bucket_open'
data['target'] = reply.url
data['access'] = 'public'
utils.fmt_output(data)
utils.list_bucket_contents(reply.url + '/')
self.log.new().extra(map=data).info("Open Google Bucket")
utils.list_bucket_contents(self.log, reply.url + '/')
elif reply.status_code == 403:
data['key'] = 'BUCKET_PROTECTED'
data['msg'] = 'Protected Google Bucket'
data['key'] = 'bucket_protected'
data['target'] = reply.url
data['access'] = 'protected'
utils.fmt_output(data)
self.log.new().extra(map=data).info("Protected Google Bucket")
else:
print(f" Unknown status codes being received from {reply.url}:\n"
f" {reply.status_code}: {reply.reason}")
self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
f"Unknown status code from: {reply.url}")

def check_gcp_buckets(self):
"""
Checks for open and restricted Google Cloud buckets
"""
print("Checking for Google buckets")
self.log.new().trace("Checking for Google buckets")

# Start a counter to report on elapsed time
start_time = utils.start_timer()
Expand All @@ -80,7 +73,8 @@ def check_gcp_buckets(self):
callback=self.print_bucket_response, threads=self.args.threads)

# Stop the time
utils.stop_timer(start_time)
self.log.new().trace(
f"Checking for Google buckets took {utils.stop_timer(start_time)}")

def print_fbrtdb_response(self, reply):
"""
Expand All @@ -95,38 +89,34 @@ def print_fbrtdb_response(self, reply):
if reply.status_code == 404:
pass
elif reply.status_code == 200:
data['key'] = 'FIREBASE_OPEN'
data['msg'] = 'OPEN GOOGLE FIREBASE RTDB'
data['key'] = 'firebase_open'
data['target'] = reply.url
data['access'] = 'public'
utils.fmt_output(data)
self.log.new().extra(map=data).info("Open Google Firebase RTDB")
elif reply.status_code == 401:
data['key'] = 'FIREBASE_PROTECTED'
data['msg'] = 'Protected Google Firebase RTDB'
data['key'] = 'firebase_protected'
data['target'] = reply.url
data['access'] = 'protected'
utils.fmt_output(data)
self.log.new().extra(map=data).info("Protected Google Firebase RTDB")
elif reply.status_code == 402:
data['key'] = 'FIREBASE_PAYMENT_REQUIRED'
data['msg'] = 'Payment required on Google Firebase RTDB'
data['key'] = 'firebase_payment_required'
data['target'] = reply.url
data['access'] = 'disabled'
utils.fmt_output(data)
self.log.new().extra(map=data).info("Payment required on Google Firebase RTDB")
elif reply.status_code == 423:
data['key'] = 'FIREBASE_DISABLED'
data['msg'] = 'The Firebase database has been deactivated.'
data['key'] = 'firebase_disabled'
data['target'] = reply.url
data['access'] = 'disabled'
utils.fmt_output(data)
self.log.new().extra(map=data).info("Deactivated Google Firebase RTDB")
else:
print(f" Unknown status codes being received from {reply.url}:\n"
f" {reply.status_code}: {reply.reason}")
self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
f"Unknown status code from: {reply.url}")

def check_fbrtdb(self):
"""
Checks for Google Firebase RTDB
"""
print("Checking for Google Firebase Realtime Databases")
self.log.new().trace("Checking for Google Firebase Realtime Databases")

# Start a counter to report on elapsed time
start_time = utils.start_timer()
Expand All @@ -146,7 +136,8 @@ def check_fbrtdb(self):
threads=self.args.threads, redir=False)

# Stop the time
utils.stop_timer(start_time)
self.log.new().trace(
f"Checking for Google Firebase RTDB took {utils.stop_timer(start_time)}")

def print_fbapp_response(self, reply):
"""
Expand All @@ -161,20 +152,19 @@ def print_fbapp_response(self, reply):
if reply.status_code == 404:
pass
elif reply.status_code == 200:
data['key'] = 'FIREBASE_OPEN'
data['msg'] = 'OPEN GOOGLE FIREBASE APP'
data['key'] = 'firebase_open'
data['target'] = reply.url
data['access'] = 'public'
utils.fmt_output(data)
self.log.new().extra(map=data).info("Open Google Firebase App")
else:
print(f" Unknown status codes being received from {reply.url}:\n"
f" {reply.status_code}: {reply.reason}")
self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
f"Unknown status code from: {reply.url}")

def check_fbapp(self):
"""
Checks for Google Firebase Applications
"""
print("Checking for Google Firebase Applications")
self.log.new().trace("Checking for Google Firebase Applications")

# Start a counter to report on elapsed time
start_time = utils.start_timer()
Expand All @@ -194,7 +184,8 @@ def check_fbapp(self):
threads=self.args.threads, redir=False)

# Stop the time
utils.stop_timer(start_time)
self.log.new().trace(
f"Checking for Google Firebase Applications took {utils.stop_timer(start_time)}")

def print_appspot_response(self, reply):
"""
Expand All @@ -209,20 +200,20 @@ def print_appspot_response(self, reply):
if reply.status_code == 404:
pass
elif str(reply.status_code)[0] == 5:
data['key'] = 'APP_ENGINE_ERROR'
data['key'] = 'app_engine_error'
data['msg'] = 'Google App Engine app with a 50x error'
data['target'] = reply.url
data['access'] = 'public'
utils.fmt_output(data)
elif reply.status_code in (200, 302, 404):
if 'accounts.google.com' in reply.url:
data['key'] = 'APP_ENGINE_PROTECTED'
data['key'] = 'app_engine_protected'
data['msg'] = 'Protected Google App Engine app'
data['target'] = reply.history[0].url
data['access'] = 'protected'
utils.fmt_output(data)
else:
data['key'] = 'APP_ENGINE_OPEN'
data['key'] = 'app_engine_open'
data['msg'] = 'Open Google App Engine app'
data['target'] = reply.url
data['access'] = 'public'
Expand Down Expand Up @@ -270,15 +261,14 @@ def print_functions_response1(self, reply):
if reply.status_code == 404:
pass
elif reply.status_code == 302:
data['key'] = 'HAS_CLOUD_FUNCTIONS'
data['msg'] = 'Contains at least 1 Cloud Function'
data['key'] = 'has_cloud_functions'
data['target'] = reply.url
data['access'] = 'public'
utils.fmt_output(data)
self.log.new().extra(map=data).info("Contains at least 1 Cloud Function")
HAS_FUNCS.append(reply.url)
else:
print(f" Unknown status codes being received from {reply.url}:\n"
f" {reply.status_code}: {reply.reason}")
self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
f"Unknown status code from: {reply.url}")

def print_functions_response2(self, reply):
"""
Expand All @@ -293,26 +283,24 @@ def print_functions_response2(self, reply):
if 'accounts.google.com/ServiceLogin' in reply.url:
pass
elif reply.status_code in (403, 401):
data['key'] = 'CLOUD_FUNCTION_AUTH_REQUIRED'
data['key'] = 'cloud_function_auth_required'
data['msg'] = 'Auth required Cloud Function'
data['target'] = reply.url
data['access'] = 'protected'
utils.fmt_output(data)
self.log.new().extra(map=data).info("Auth required Cloud Function")
elif reply.status_code == 405:
data['key'] = 'CLOUD_FUNCTION_POST_ONLY'
data['msg'] = 'UNAUTHENTICATED Cloud Function (POST-Only)'
data['key'] = 'cloud_function_post_only'
data['target'] = reply.url
data['access'] = 'public'
utils.fmt_output(data)
self.log.new().extra(map=data).info("UNAUTHENTICATED Cloud Function (POST-Only)")
elif reply.status_code in (200, 404):
data['key'] = 'CLOUD_FUNCTION_GET_OK'
data['msg'] = 'UNAUTHENTICATED Cloud Function (GET-OK)'
data['key'] = 'cloud_function_get_ok'
data['target'] = reply.url
data['access'] = 'public'
utils.fmt_output(data)
self.log.new().extra(map=data).info("UNAUTHENTICATED Cloud Function (GET-OK)")
else:
print(f" Unknown status codes being received from {reply.url}:\n"
f" {reply.status_code}: {reply.reason}")
self.log.new().extra("status_code", reply.status_code).extra("reason", reply.reason).warning(
f"Unknown status code from: {reply.url}")

def check_functions(self):
"""
Expand All @@ -329,7 +317,7 @@ def check_functions(self):
See gcp_regions.py to define which regions to check. The tool currently
defaults to only 1 region, so you should really modify it for best results.
"""
print("Checking for project/zones with Google Cloud Functions.")
self.log.new().trace("Checking for project/zones with Google Cloud Functions.")

# Start a counter to report on elapsed time
start_time = utils.start_timer()
Expand All @@ -344,8 +332,8 @@ def check_functions(self):
if region:
regions = [region]

print(
f"[*] Testing across {len(regions)} regions defined in the config file or command line")
self.log.new().trace(
f"Testing across {len(regions)} regions defined in the config file or command line")

# Take each mutated keyword craft a url with the correct format
for region in regions:
Expand All @@ -367,17 +355,17 @@ def check_functions(self):

# If we did find something, we'll use the brute list. This will allow people
# to provide a separate fuzzing list if they choose.
print(
f"[*] Brute-forcing function names in {len(HAS_FUNCS)} project/region combos")
self.log.new().trace(
f"Brute-forcing function names in {len(HAS_FUNCS)} project/region combos")

# Load brute list in memory, based on allowed chars/etc
brute_strings = utils.get_brute(self.args.brute)

# The global was built in a previous function. We only want to brute force
# project/region combos that we know have existing functions defined
for func in HAS_FUNCS:
print(
f"[*] Brute-forcing {len(brute_strings)} function names in {func}")
self.log.new().trace(
f"Brute-forcing {len(brute_strings)} function names in {func}")
# Initialize the list of initial URLs to check. Strip out the HTTP
# protocol first, as that is handled in the utility
func = func.replace("http://", "")
Expand All @@ -392,13 +380,13 @@ def check_functions(self):
candidates, use_ssl=False, callback=self.print_functions_response2, threads=self.args.threads)

# Stop the time
utils.stop_timer(start_time)
self.log.new().trace(
f"Checking for project/zones with Google Cloud Functions took {utils.stop_timer(start_time)}")

def run_all(self):
"""
Function is called by main program
"""
print(BANNER)

self.check_gcp_buckets()
self.check_fbrtdb()
Expand Down

0 comments on commit f0a5a9e

Please sign in to comment.