forked from m4tth2w/my-pentest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwvs-thread.py
134 lines (110 loc) · 4.58 KB
/
wvs-thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python
#coding=utf-8
#author:Double8
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
import threading
import datetime,time
import Queue
import requests,subprocess
import os
from conf import wvs_location, wvs_save_dir, wvs_scan_sentence,_WORKER_THREAD_NUM
import random
global log_dir
from mail import send_mail
from xml.dom import minidom
from conf import callback_mail
from conf import filter
import re
import argparse
class ThreadClass(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue = queue
self.num=self.queue.qsize()
def run(self):
#print self.num
while not self.queue.empty():
self.yuming=self.queue.get()
self.task_id = random.randint(10000, 100000)
self.save_dir = wvs_save_dir + (str(self.task_id)) + '/'
print 'Testing=============='
shell = wvs_scan_sentence % (self.yuming.replace('\n','').replace('\r',''),self.save_dir)
res=subprocess.Popen(shell,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output,error)=res.communicate()
try:
if not error:
print output
self.result = {"target": self.yuming, "scan_result": {}}
self.result['scan_result'] = self.parse_xml(self.save_dir + 'export.xml')
self.do_callback(self.yuming)
print 'ok'
except Exception,e:
pass
def parse_xml(self, file_name):
bug_list = {}
try:
root = minidom.parse(file_name).documentElement
ReportItem_list = root.getElementsByTagName('ReportItem')
bug_list['time'] = root.getElementsByTagName('ScanTime')[0].firstChild.data.encode('utf-8')
bug_list['bug'] = []
if ReportItem_list:
for node in ReportItem_list:
color = node.getAttribute("color")
name = node.getElementsByTagName("Name")[0].firstChild.data.encode('utf-8')
if color in filter['color_white_list'] and name not in filter['bug_black_list']:
temp = {}
temp['name'] = name
temp['color'] = color.encode('utf-8')
temp['details'] = node.getElementsByTagName("Details")[0].firstChild.data.encode('utf-8')
temp['affect'] = node.getElementsByTagName("Affects")[0].firstChild.data.encode('utf-8')
bug_list['bug'].append(temp)
except Exception, e:
pass
return bug_list
def do_callback(self,target):
result = self.result
if callback_mail:
cont = ('对 %s 的WVS扫描结果(扫描时间 %s)如下:<br/><br/><table border=1 cellpadding=0 cellspacing=0>'
'<tr><td>漏洞名称</td><td>等级</td><td>细节</td><td>URL</td></tr>') \
% (result['target'], result['scan_result']['time'])
for bug in result['scan_result']['bug']:
cont += '<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>' % \
(bug["name"], bug["color"], bug["details"], bug["affect"])
cont += '</table>'
send_mail(callback_mail, '[WVS_Result] %s' % target, cont)
def main():
queue = Queue.Queue()
threads = []
for url in urls:
queue.put(url)
for i in range(_WORKER_THREAD_NUM):
t = ThreadClass(queue)
threads.append(t)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__=='__main__':
parser = argparse.ArgumentParser(description='wvs-scan to do more,author:Double8 ')
parser.add_argument('--u',action="store",required=False,dest="url",type=str,help='url like http://127.0.0.1')
parser.add_argument("--file",action="store",required=False,dest="file",type=str,help='get ips or domains for this file')
urls=[]
args=parser.parse_args()
url=args.url
file=args.file
urls=[]
if url:
urls.append(url)
elif file:
with open(file,'r') as f:
for url in f.readlines():
urls.append(url)
else:
print "error args";exit()
if not os.path.exists(wvs_save_dir):
os.mkdir(wvs_save_dir)
st = time.time()
main()
print '%f'%(time.time()-st)