forked from oxsecurity/megalinter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver_worker.py
238 lines (221 loc) · 8.73 KB
/
server_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import glob
import logging
import os
import shutil
import tempfile
import zipfile
from typing import List
import git
from megalinter import MegaLinter
from pygments import lexers
from server.errors import MegalinterServerException
from server.types import AnalysisRequestInput, AnalysisStatus
logger = logging.getLogger(__name__)
def processAnalysisRequest(
request_input: dict,
request_id: str,
server_id: str,
):
analysis = MegaLinterAnalysis()
analysis.initialize(
AnalysisRequestInput.parse_obj(request_input), request_id, server_id
)
analysis.initialize_files()
analysis.process()
return analysis.__dict__
# Analysis processor class
class MegaLinterAnalysis:
id: str | None = None
server_id: str | None = None
status: AnalysisStatus | None = None
repository: str | None = None
snippet_language: str | None = None
workspace: str | None = None
web_hook_url: str | None = None
results: List = []
# Initialize analysis request and assign an unique Id
def initialize(self, request_input: AnalysisRequestInput, request_id, server_id):
self.id = request_id
self.server_id = server_id
self.status = AnalysisStatus.NEW
self.request_input = request_input
if request_input.webHookUrl:
self.web_hook_url = request_input.webHookUrl
logger.info(f"Analysis request {self.id} has been initialized")
# Initialize files for analysis
def initialize_files(self):
# Clone repo from provided url
if self.request_input.repositoryUrl:
self.init_from_repository()
return
# Use uploaded files
elif self.request_input.fileUploadId:
self.init_from_file_upload(self.request_input.fileUploadId)
return
# Detect language and create temporary workspace with file
elif self.request_input.snippet:
self.init_from_snippet()
return
# Nothing to create a request !
err = MegalinterServerException(
"Unable to initialize files for analysis",
"missingAnalysisType",
self.id,
{"request_input": self.request_input},
)
err.send_redis_message()
raise err
# Init by cloning a remote repository
def init_from_repository(self):
temp_dir = self.create_temp_dir()
try:
git.Repo.clone_from(self.request_input.repositoryUrl, temp_dir)
except Exception as e:
err = MegalinterServerException(
f"Unable to clone repository {self.request_input.repositoryUrl}",
"gitCloneError",
self.id,
{"error": str(e)},
)
err.send_redis_message()
raise err
logger.info(f"Cloned {self.request_input.repositoryUrl} in temp dir {temp_dir}")
self.workspace = temp_dir
self.repository = self.request_input.repositoryUrl
# Init by getting uploaded file(s)
def init_from_file_upload(self, file_upload_id):
temp_dir = self.create_temp_dir()
upload_dir = os.path.join("/tmp/server-files", file_upload_id)
if os.path.exists(upload_dir):
zip_files = glob.glob(upload_dir + "/*.zip")
if len(zip_files) == 1:
# Unique zip file
with zipfile.ZipFile(zip_files[0], "r") as zip_ref:
zip_ref.extractall(temp_dir)
else:
# No zip file
shutil.copytree(upload_dir, temp_dir, dirs_exist_ok=True)
logger.info(f"Copied uploaded files from {self.id} in temp dir {temp_dir}")
self.workspace = temp_dir
self.repository = self.request_input.repositoryUrl
else:
err = MegalinterServerException(
"Unable to load uploaded files for analysis",
"uploadedFileNotFound",
self.id,
{"file_upload_id": file_upload_id},
)
err.send_redis_message()
raise err
# Init from user snippet
def init_from_snippet(self):
logger.info(f"Input snippet:\n {self.request_input.snippet}")
if self.request_input.snippetExtension:
snippet_file_name = "snippet" + self.request_input.snippetExtension
elif self.request_input.snippetLanguage:
snippet_file_name = self.get_language_extension()
else:
snippet_file_name = self.guess_snippet_filename()
logger.info(f"Snippet file name: {snippet_file_name}")
temp_dir = self.create_temp_dir()
snippet_file = os.path.join(temp_dir, snippet_file_name)
with open(snippet_file, "w", encoding="utf-8") as file:
file.write(self.request_input.snippet)
self.workspace = temp_dir
# Get extension from language
def get_language_extension(self):
languageId = self.request_input.snippetLanguage
all_code_lexers = lexers.get_all_lexers()
code_lexer_name = None
code_lexer_filenames = None
for name, aliases, filenames, _mimetypes in all_code_lexers:
if name == languageId or name in aliases:
code_lexer_name = name
code_lexer_filenames = filenames
break
if code_lexer_name is None:
err = MegalinterServerException(
"Unable to find pygments language for " + languageId,
"snippetUnknownLanguage",
self.id,
{"snippet": self.request_input.snippet, "snippetLanguage": languageId},
)
err.send_redis_message()
raise err
logger.info(f"Identified snippet language: {code_lexer_name}")
# Build file name
if len(code_lexer_filenames) > 0:
if "*." in code_lexer_filenames[0]:
snippet_file_name = "snippet" + code_lexer_filenames[0].replace("*", "")
else:
snippet_file_name = code_lexer_filenames[0]
else:
err = MegalinterServerException(
f"Unable build file from {code_lexer_name} snippet",
"snippetBuildError",
self.id,
{"snippet": self.request_input.snippet},
)
err.send_redis_message()
raise err
return snippet_file_name
# Guess language using pygments
def guess_snippet_filename(self):
code_lexer = lexers.guess_lexer(self.request_input.snippet)
if not code_lexer:
err = MegalinterServerException(
"Unable to detect language from snippet",
"snippetGuessError",
self.id,
{"snippet": self.request_input.snippet},
)
err.send_redis_message()
raise err
self.snippet_language = code_lexer.name
logger.info(f"Guessed snipped language: {self.snippet_language}")
# Build file name
if len(code_lexer.filenames) > 0:
if "*." in code_lexer.filenames[0]:
snippet_file_name = "snippet" + code_lexer.filenames[0].replace("*", "")
else:
snippet_file_name = code_lexer.filenames[0]
else:
err = MegalinterServerException(
f"Unable build file from {code_lexer.name} snippet",
"snippetBuildError",
self.id,
{"snippet": self.request_input.snippet},
)
err.send_redis_message()
raise err
return snippet_file_name
# Run MegaLinter
def process(self):
megalinter_params = {
"cli": False,
"request_id": self.id,
"workspace": self.workspace,
"SARIF_REPORTER": "true",
}
if self.web_hook_url:
megalinter_params["WEBHOOK_REPORTER"] = "true"
megalinter_params["WEBHOOK_REPORTER_URL"] = self.web_hook_url
mega_linter = MegaLinter.Megalinter(megalinter_params)
self.change_status(AnalysisStatus.IN_PROGRESS)
mega_linter.run()
for linters in mega_linter.linters:
for reporter in linters.reporters:
if reporter.name == "WEBHOOK_REPORTER" and reporter.web_hook_data:
self.results.append(reporter.web_hook_data)
self.change_status(AnalysisStatus.COMPLETE)
del mega_linter
# Create uniform temp directories
def create_temp_dir(self):
return tempfile.mkdtemp(prefix="ct-megalinter-x")
# Change status of analysis request
def change_status(self, status: AnalysisStatus):
self.status = status
logger.info(f"Analysis request {self.id} status change: {status}")
self.save()
def save(self):
logger.debug("Saved state " + str(self.__dict__))