forked from wvdumper/dumper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Scanner.py
187 lines (171 loc) · 7.19 KB
/
Scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import os
import json
from Crypto.PublicKey import RSA
from google.protobuf import message
import logging
from Helpers.Keybox import Keybox
from Helpers.wv_proto2_pb2 import SignedLicenseRequest
class Scan:
def __init__(self, device_name):
self.logger = logging.getLogger(__name__)
self.KEY_DUMP_LOC = 'keydump/'
self.device_name = device_name
self.saved_keys = {}
self.frida_script = open('Helpers/script.js', 'r').read()
self.device = {
'device_id': None,
'device_token': None,
'device_key': os.urandom(16).hex(),
'security_level': ''
}
self.widevine_libraries = [
'libwvhidl.so',
'libwvdrmengine.so',
'liboemcrypto.so',
'libmediadrm.so',
'libwvdrm_L1.so',
'libWVStreamControlAPI_L1.so',
'libdrmwvmplugin.so',
'libwvm.so'
]
def export_key(self, k):
root = SignedLicenseRequest()
root.ParseFromString(k['id'])
cid = root.Msg.ClientId
system_id = cid.Token._DeviceCertificate.SystemId
save_dir = os.path.join('key_dumps', f'{self.device_name}/private_keys/{system_id}/{str(k["key"].n)[:10]}')
if not os.path.exists(save_dir):
os.makedirs(save_dir)
with open(os.path.join(save_dir, 'client_id.bin'), 'wb+') as writer:
writer.write(cid.SerializeToString())
with open(os.path.join(save_dir, 'private_key.pem'), 'wb+') as writer:
writer.write(k['key'].exportKey('PEM'))
self.logger.info('Key pairs saved at ' + save_dir)
def on_message(self, msg, data):
try:
if msg['payload'] == 'priv':
self.logger.debug('processing private key')
self.private_key_message(msg, data)
elif msg['payload'] == 'id':
self.logger.debug('processing id')
self.license_request_message(data)
elif msg['payload'] == 'device_id':
self.logger.debug('processing device id')
self.device_id_message(data)
elif msg['payload'] == 'device_token':
self.logger.debug('processing device token')
self.device_token_message(data)
elif msg['payload'] == 'security_level':
tag = data.decode()
if tag == 'L1':
self.device['security_level'] = 'LVL1'
else:
self.device['security_level'] = 'LVL3'
elif msg['payload'] == 'aes_key':
self.aes_key_message(data)
elif msg['payload'] == 'message':
payload = json.loads(data.decode())
self.logger.debug(
json.dumps(
payload,
indent=4
)
)
elif msg['payload'] == 'message_info':
self.logger.info(data.decode())
except:
self.logger.error('unable to process the message')
self.logger.error(msg)
self.logger.error(data)
def private_key_message(self, private_key_message, data):
try:
try:
key = RSA.importKey(data)
cur = self.saved_keys.get(key.n, {})
if 'id' in cur:
if 'key' not in cur:
cur['key'] = key
self.saved_keys[key.n] = cur
self.export_key(cur)
else:
self.saved_keys[key.n] = {'key': key}
except:
self.logger.error('unable to load private key')
self.logger.error(data)
pass
except:
self.logger.error('payload of type priv failed')
self.logger.error(private_key_message)
def license_request_message(self, data):
with open('license_request.bin', 'wb+') as f:
f.write(data)
root = SignedLicenseRequest()
try:
root.ParseFromString(data)
except message.DecodeError:
return
try:
key = RSA.importKey(root.Msg.ClientId.Token._DeviceCertificate.PublicKey)
cur = self.saved_keys.get(key.n, {})
if 'key' in cur:
if 'id' not in cur:
cur['id'] = data
self.saved_keys[key.n] = cur
self.export_key(cur)
else:
self.saved_keys[key.n] = {'id': data}
except Exception as error:
self.logger.error(error)
def device_id_message(self, data_buffer):
if not self.device['device_id']:
self.device['device_id'] = data_buffer.hex()
if self.device['device_id'] and self.device['device_token'] and self.device['device_key']:
self.save_key_box()
def device_token_message(self, data_buffer):
if not self.device['device_token']:
self.device['device_token'] = data_buffer.hex()
if self.device['device_id'] and self.device['device_token']:
self.save_key_box()
def aes_key_message(self, data_buffer):
if not self.device['device_key']:
self.device['device_key'] = data_buffer.hex()
if self.device['device_id'] and self.device['device_token']:
self.save_key_box()
def find_widevine_process(self, dev, process_name):
process = dev.attach(process_name)
script = process.create_script(self.frida_script)
script.load()
loaded = []
try:
for lib in self.widevine_libraries:
try:
loaded.append(script.exports.widevinelibrary(lib))
except:
pass
finally:
process.detach()
return loaded
def hook_to_process(self, device, process, library):
session = device.attach(process)
script = session.create_script(self.frida_script)
script.on('message', self.on_message)
script.load()
script.exports.inject(library, process)
return session
def save_key_box(self):
try:
if self.device['device_id'] is not None and self.device['device_token'] is not None:
self.logger.info('saving key box')
keybox = Keybox(self.device)
box = os.path.join('key_dumps', f'{self.device_name}/key_boxes/{keybox.system_id}')
self.logger.debug(f'saving to {box}')
if not os.path.exists(box):
os.makedirs(box)
with open(os.path.join(box, f'{keybox.system_id}.bin'), 'wb') as writer:
writer.write(keybox.get_keybox())
with open(os.path.join(box, f'{keybox.system_id}.json'), 'w') as writer:
writer.write(keybox.__repr__())
self.logger.info(f'saved keybox to {box}')
except Exception as error:
self.logger.error('unable to save keybox')
self.logger.error(error)