forked from dillonfranke/protoburp
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathextender.py
150 lines (113 loc) · 5.84 KB
/
extender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import json
import inspect
import base64
import os
import sys
from burp import IBurpExtender
from burp import IMessageEditorTabFactory
from burp import IHttpListener
from burp import ITab
from javax.swing import JPanel
from javax.swing import JButton
from javax.swing import JFileChooser
from java.awt import FlowLayout
from tab import Tab
import Bridge # bridge between python2 and python3
import ProtobufTab # custom tab
import utils
# Add correct directory to sys.path
# legacy code to be removed since we are using system python now
_BASE_DIR = os.path.abspath(
os.path.dirname(inspect.getfile(inspect.currentframe()))
)
sys.path.insert(0, _BASE_DIR + "/deps/protobuf/python/")
EXTENSION_NAME = "ProtoBurp++"
class BurpExtender(IBurpExtender, IHttpListener, IMessageEditorTabFactory):
def __init__(self):
self.Bridge = Bridge.Bridge(_BASE_DIR)
# Implement IBurpExtender methods
def registerExtenderCallbacks(self, callbacks):
# keep a reference to our callbacks object (Burp Extensibility Feature)
self._callbacks = callbacks
# set our extension name that will display in the Extender tool when loaded
self._callbacks.setExtensionName(EXTENSION_NAME)
# register ourselves as an HTTP listener
self._callbacks.registerHttpListener(self)
# register ourselves as a custom IMessageEditorTabFactory
self._callbacks.registerMessageEditorTabFactory(self)
# get burp helper functions
self._helpers = self._callbacks.getHelpers()
self.suite_tab = Tab(self, callbacks)
# Add the custom tab
callbacks.addSuiteTab(self.suite_tab)
def getTabCaption(self):
return "ProtoBurp++"
def getUiComponent(self):
return self._jPanel
def file_chooser(self, event):
chooser = JFileChooser()
action = chooser.showOpenDialog(self._jPanel)
if action == JFileChooser.APPROVE_OPTION:
file = chooser.getSelectedFile()
#TODO: create functions to retrieve data from request and response objects to keep the code clean eg. getResponseBody
# Implement IHttpListener methods
def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo):
# Only continue if the extension is enabled
if not self.suite_tab.protoburp_enabled:
return
# Get the HTTP service for the request
httpService = messageInfo.getHttpService()
# Convert the request to a IRequestInfo object
requestInfo = self._helpers.analyzeRequest(httpService, messageInfo.getRequest())
# requestInfo is an IRequestInfo object
headers = requestInfo.getHeaders()
# Convert header names to lower case for case-insensitive comparison
header_names = [header.split(":")[0].lower() for header in headers]
# Only process if the ProtoBurp header exists in the request, this trick is need for the scanner/repeater
if not "protoburp" in header_names:
return
# Only process requests for JSON content
if messageIsRequest:
# Get the body of the request
# body = messageInfo.getRequest()[requestInfo.getBodyOffset():]
# Convert the body from bytes to string
#body_string = body.tostring().decode()
body_string = utils.getRequestBody(messageInfo)
# Convert the string to a JSON object
json_body = json.loads(body_string)
# Convert the JSON to Protobuf
protobuf = self.Bridge.json_to_protobuf_in_python3(json_body, requestInfo.getUrl().getPath(), str(self.suite_tab.selectedFilePath))
# Check if the request is gRPC-Web-Text, if so, don't encode the body.
if any("grpc-web-text" in header.lower() for header in headers):
protobuf = base64.b64encode(protobuf)
# Create a new HTTP message with the Protobuf body
new_message = self._helpers.buildHttpMessage(headers, protobuf)
# Update the request in the messageInfo object
messageInfo.setRequest(new_message)
else:
# Convert the request to a IRequestInfo object
responseInfo = self._helpers.analyzeRequest(httpService, messageInfo.getResponse())
# responseInfo is an IRequestInfo object
headers = responseInfo.getHeaders()
# Convert the request to a IRequestInfo object to get the URL fot the API path
requestInfo = self._helpers.analyzeRequest(httpService, messageInfo.getRequest())
# Check if the response is Protobuf
parse = any(
(header.split(":")[1].strip().lower() if ":" in header else header.lower())
in utils.CONTENT_PROTOBUF for header in headers
)
if parse:
body = self.getResponseBody(messageInfo)
# Check if any header has "grpc-web-text". If so, don't encode the body.
encode = not any("grpc-web-text" in header.lower() for header in headers)
if encode:
body = base64.b64encode(body)
json_body = self.Bridge.protobuf_to_json_in_python3(body, requestInfo.getUrl().getPath(), str(self.suite_tab.selectedFilePath), False)
# If needed in the future, you can uncomment this to add a new header.
# headers.append("Protoburp-Response: True")
# Create a new HTTP message with the JSON body and update it in the messageInfo object.
new_message = self._helpers.buildHttpMessage(headers, json_body)
messageInfo.setResponse(new_message)
# Implement IMessageEditorTabFactory methods
def createNewInstance(self, controller, editable):
return ProtobufTab.ProtobufTab(controller, editable, self._callbacks, self.Bridge, self.suite_tab)