-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscanner.py
115 lines (99 loc) · 3.45 KB
/
scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
MOBSF REST API Python Requests
"""
import json
# from msilib.schema import Directory
from turtle import pos
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
import argparse
import logging
import os
import urllib.error
import urllib.parse
import urllib.request
from dotenv import load_dotenv
import time
load_dotenv()
logger = logging.getLogger(__name__)
SERVER = os.environ.get("SERVER")
APIKEY = os.environ.get("API_KEY")
def upload(FILE, SERVER, APIKEY):
"""Upload File"""
print("Uploading file")
multipart_data = MultipartEncoder(
fields={'file': (FILE, open(FILE, 'rb'), 'application/octet-stream')})
headers = {'Content-Type': multipart_data.content_type,
'Authorization': APIKEY}
response = requests.post(SERVER + '/api/v1/upload',
data=multipart_data, headers=headers)
if response.status_code == 200 and 'hash' in response.json():
logger.info('[OK] Upload OK: %s', FILE)
else:
logger.error('Performing Upload: %s', FILE)
print(response.text)
return response.json()
def scan(data, APIKEY, SERVER):
"""Scan the file"""
print("Scanning file")
post_dict = data
print(post_dict)
headers = {'Authorization': APIKEY}
response = requests.post(SERVER + '/api/v1/scan',
data=post_dict, headers=headers)
print(response.text)
return response.json()
def pdf(data, APIKEY, SERVER):
"""Generate PDF Report"""
print("Generate PDF report")
headers = {'Authorization': APIKEY}
data = {"hash": json.loads(data)["hash"]}
response = requests.post(
SERVER + '/api/v1/download_pdf', data=data, headers=headers, stream=True)
with open("report.pdf", 'wb') as flip:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
flip.write(chunk)
print("Report saved as report.pdf")
return response.json()
def json_resp(data, APIKEY, SERVER, mins=0):
"""Generate JSON Report"""
print("Generate JSON report")
headers = {'Authorization': APIKEY}
data = {"hash":data['appsec']["hash"]}
time.sleep(mins*60)
response = requests.post(
SERVER + '/api/v1/report_json', data=data, headers=headers)
resp = response.json()
if "report" in resp and resp['report'] == "Report not Found":
print("waiting for "+str(mins)+" mins")
json_resp(data, APIKEY, SERVER, mins)
return response.json()
def delete(data, APIKEY, SERVER):
"""Delete Scan Result"""
print("Deleting Scan")
headers = {'Authorization': APIKEY}
data = {"hash": data["hash"]}
response = requests.post(
SERVER + '/api/v1/delete_scan', data=data, headers=headers)
print(response.text)
return response.json()
def start_function(DIRECTORY, APIKEY, SERVER, DELAY):
directory = DIRECTORY
print(directory)
uploaded = []
mimes = {
'.apk': 'application/octet-stream',
'.ipa': 'application/octet-stream',
'.appx': 'application/octet-stream',
'.zip': 'application/zip',
}
for filename in os.listdir(directory):
fpath = os.path.join(directory, filename)
_, ext = os.path.splitext(fpath)
if ext in mimes:
RESP = upload(fpath, SERVER, APIKEY)
RESP = scan(RESP, APIKEY, SERVER)
fp=open("/home/jaden/projects/NullCon23/results/"+filename+".json","w")
json.dump(RESP, fp,indent=4)
fp.close()