forked from huashengdun/webssh
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_policy.py
123 lines (104 loc) · 4.24 KB
/
test_policy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import unittest
import paramiko
from shutil import copyfile
from paramiko.client import RejectPolicy, WarningPolicy
from tests.utils import make_tests_data_path
from webssh.policy import (
AutoAddPolicy, get_policy_dictionary, load_host_keys,
get_policy_class, check_policy_setting
)
class TestPolicy(unittest.TestCase):
def test_get_policy_dictionary(self):
classes = [AutoAddPolicy, RejectPolicy, WarningPolicy]
dic = get_policy_dictionary()
for cls in classes:
val = dic[cls.__name__.lower()]
self.assertIs(cls, val)
def test_load_host_keys(self):
path = '/path-not-exists'
host_keys = load_host_keys(path)
self.assertFalse(host_keys)
path = '/tmp'
host_keys = load_host_keys(path)
self.assertFalse(host_keys)
path = make_tests_data_path('known_hosts_example')
host_keys = load_host_keys(path)
self.assertEqual(host_keys, paramiko.hostkeys.HostKeys(path))
def test_get_policy_class(self):
keys = ['autoadd', 'reject', 'warning']
vals = [AutoAddPolicy, RejectPolicy, WarningPolicy]
for key, val in zip(keys, vals):
cls = get_policy_class(key)
self.assertIs(cls, val)
key = 'non-exists'
with self.assertRaises(ValueError):
get_policy_class(key)
def test_check_policy_setting(self):
host_keys_filename = make_tests_data_path('host_keys_test.db')
host_keys_settings = dict(
host_keys=paramiko.hostkeys.HostKeys(),
system_host_keys=paramiko.hostkeys.HostKeys(),
host_keys_filename=host_keys_filename
)
with self.assertRaises(ValueError):
check_policy_setting(RejectPolicy, host_keys_settings)
try:
os.unlink(host_keys_filename)
except OSError:
pass
check_policy_setting(AutoAddPolicy, host_keys_settings)
self.assertEqual(os.path.exists(host_keys_filename), True)
def test_is_missing_host_key(self):
client = paramiko.SSHClient()
file1 = make_tests_data_path('known_hosts_example')
file2 = make_tests_data_path('known_hosts_example2')
client.load_host_keys(file1)
client.load_system_host_keys(file2)
autoadd = AutoAddPolicy()
for f in [file1, file2]:
entry = paramiko.hostkeys.HostKeys(f)._entries[0]
hostname = entry.hostnames[0]
key = entry.key
self.assertIsNone(
autoadd.is_missing_host_key(client, hostname, key)
)
for f in [file1, file2]:
entry = paramiko.hostkeys.HostKeys(f)._entries[0]
hostname = entry.hostnames[0]
key = entry.key
key.get_name = lambda: 'unknown'
self.assertTrue(
autoadd.is_missing_host_key(client, hostname, key)
)
del key.get_name
for f in [file1, file2]:
entry = paramiko.hostkeys.HostKeys(f)._entries[0]
hostname = entry.hostnames[0][1:]
key = entry.key
self.assertTrue(
autoadd.is_missing_host_key(client, hostname, key)
)
file3 = make_tests_data_path('known_hosts_example3')
entry = paramiko.hostkeys.HostKeys(file3)._entries[0]
hostname = entry.hostnames[0]
key = entry.key
with self.assertRaises(paramiko.BadHostKeyException):
autoadd.is_missing_host_key(client, hostname, key)
def test_missing_host_key(self):
client = paramiko.SSHClient()
file1 = make_tests_data_path('known_hosts_example')
file2 = make_tests_data_path('known_hosts_example2')
filename = make_tests_data_path('known_hosts')
copyfile(file1, filename)
client.load_host_keys(filename)
n1 = len(client._host_keys)
autoadd = AutoAddPolicy()
entry = paramiko.hostkeys.HostKeys(file2)._entries[0]
hostname = entry.hostnames[0]
key = entry.key
autoadd.missing_host_key(client, hostname, key)
self.assertEqual(len(client._host_keys), n1 + 1)
self.assertEqual(paramiko.hostkeys.HostKeys(filename),
client._host_keys)
os.unlink(filename)