-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpenetration_test.py
111 lines (76 loc) · 2.94 KB
/
penetration_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import requests
import grequests
import time
import os
current_path = os.path.dirname(os.path.abspath(__file__))
# ===========================================
# configuration-variables
# ===========================================
userName = "olegsfinest"
repoName = "test7"
tailrToken = "6c8369343aa0754741225b6f161bb7f0bcd1d388"
contentType = "application/n-triples"
urlListFilePath = "urls2.txt"
concurrency_limit = 30
# ===========================================
# configuration variables for penetration test
# ===========================================
# number of lines that will be added every iteration
lineSteps = 10000
# if true, the file will be emtied. If False. the script will start with the current number of lines
startNew = True
filePath = current_path+"/penet.nt"
def readURIs ():
# TODO exception handling for empty file and empty lines
for line in open(urlListFilePath,'r'):
urls.append(line)
def exception_handler(request, exception):
print "Request failed"
def printResponse(response, *args, **kwargs) :
print (response.url +" returned status code: " + str(response.status_code))
print ("This took "+str(time.time() - startTime)+" seconds")
if response.status_code != 200:
failedRequestsUrls[response.url] = response.status_code
# print kwargs
# print args
def push():
allRequests = set()
for url in urls:
allRequests.add( grequests.put(apiURI, params={'key':url}, headers=header, data=open(filePath, 'rb'), hooks={'response': printResponse}) )
# params={'key':url,'datetime':datetime}
# allRequests = (grequests.put(apiURI, params={'key':url}, headers=header, data=open(filePath, 'rb'), hooks={'response': printResponse}) for url in urls)
responses = grequests.map(allRequests, stream=False, size=concurrency_limit)
# ============================
# helper functions
# ============================
def file_len(filename):
with open(filename) as f:
numberOfLines = 0
for i, l in enumerate(f):
numberOfLines += 1
return numberOfLines + 1
header = {'Authorization':"token "+tailrToken, 'Content-Type':contentType}
apiURI = "http://tailr.s16a.org/api/"+userName+"/"+repoName
urls = []
readURIs()
failedRequestsUrls = {}
# clear file
if startNew:
open("/Users/Oleg1/Documents/Wille/Studium/LinkedData/tailr_concurrent-upload/penet.nt", 'w').close()
lineCount = file_len(filePath)
# penetration testing
while len(failedRequestsUrls) == 0:
# add 2500 lines to file
print("\nAdding "+str(lineSteps)+" lines")
for i in range(0, lineSteps):
# just some unique triple
tmpStr = "<http://dbpedia.org/resource/Queens_of_the_Stone_Age> <http://www.w3.org/2002/07/owl#sameAs> <http://commons.dbpedia.org/resource/Queens_of_the_Stone_Age"+str(i)+"> .\n"
#append newline and new triple
with open(filePath, 'a') as file:
file.write(tmpStr)
lineCount += 1
print ("\n \nTrying with "+ str(lineCount) + " lines now")
# push
startTime = time.time()
push()
print("\n" + str(len(failedRequestsUrls)) + " requests failed")