forked from paramiko/paramiko
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
75 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
|
||
import weakref | ||
import time | ||
import re | ||
|
||
from paramiko.common import ( | ||
cMSG_SERVICE_REQUEST, | ||
|
@@ -298,6 +299,23 @@ def _finalize_pubkey_algorithm(self, key_type): | |
key_type | ||
), | ||
) | ||
# NOTE re #2017: When the key is an RSA cert and the remote server is | ||
# OpenSSH 7.7 or earlier, always use [email protected]. | ||
# Those versions of the server won't support rsa-sha2 family sig algos | ||
# for certs specifically, and in tandem with various server bugs | ||
# regarding server-sig-algs, it's impossible to fit this into the rest | ||
# of the logic here. | ||
if key_type.endswith("[email protected]") and re.search( | ||
r"-OpenSSH_(?:[1-6]|7\.[0-7])", self.transport.remote_version | ||
): | ||
pubkey_algo = "[email protected]" | ||
self.transport._agreed_pubkey_algorithm = pubkey_algo | ||
self._log(DEBUG, "OpenSSH<7.8 + RSA cert = forcing ssh-rsa!") | ||
self._log( | ||
DEBUG, "Agreed upon {!r} pubkey algorithm".format(pubkey_algo) | ||
) | ||
return pubkey_algo | ||
# Normal attempts to handshake follow from here. | ||
# Only consider RSA algos from our list, lest we agree on another! | ||
my_algos = [x for x in self.transport.preferred_pubkeys if "rsa" in x] | ||
self._log(DEBUG, "Our pubkey algorithm list: {}".format(my_algos)) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -152,7 +152,12 @@ def tearDown(self): | |
self.sockl.close() | ||
|
||
def _run( | ||
self, allowed_keys=None, delay=0, public_blob=None, kill_event=None | ||
self, | ||
allowed_keys=None, | ||
delay=0, | ||
public_blob=None, | ||
kill_event=None, | ||
server_name=None, | ||
): | ||
if allowed_keys is None: | ||
allowed_keys = FINGERPRINTS.keys() | ||
|
@@ -164,6 +169,8 @@ def _run( | |
self.socks.close() | ||
return | ||
self.ts = paramiko.Transport(self.socks) | ||
if server_name is not None: | ||
self.ts.local_version = server_name | ||
keypath = _support("test_rsa.key") | ||
host_key = paramiko.RSAKey.from_private_key_file(keypath) | ||
self.ts.add_server_key(host_key) | ||
|
@@ -179,11 +186,11 @@ def _test_connection(self, **kwargs): | |
""" | ||
(Most) kwargs get passed directly into SSHClient.connect(). | ||
The exception is ``allowed_keys`` which is stripped and handed to the | ||
``NullServer`` used for testing. | ||
The exceptions are ``allowed_keys``/``public_blob``/``server_name`` | ||
which are stripped and handed to the ``NullServer`` used for testing. | ||
""" | ||
run_kwargs = {"kill_event": self.kill_event} | ||
for key in ("allowed_keys", "public_blob"): | ||
for key in ("allowed_keys", "public_blob", "server_name"): | ||
run_kwargs[key] = kwargs.pop(key, None) | ||
# Server setup | ||
threading.Thread(target=self._run, kwargs=run_kwargs).start() | ||
|
@@ -205,7 +212,9 @@ def _test_connection(self, **kwargs): | |
self.event.wait(1.0) | ||
self.assertTrue(self.event.is_set()) | ||
self.assertTrue(self.ts.is_active()) | ||
self.assertEqual("slowdive", self.ts.get_username()) | ||
self.assertEqual( | ||
self.connect_kwargs["username"], self.ts.get_username() | ||
) | ||
self.assertEqual(True, self.ts.is_authenticated()) | ||
self.assertEqual(False, self.tc.get_transport().gss_kex_used) | ||
|
||
|
@@ -345,6 +354,29 @@ def test_certs_implicitly_loaded_alongside_key_filename_keys(self): | |
), | ||
) | ||
|
||
def _cert_algo_test(self, ver, alg): | ||
# Issue #2017; see auth_handler.py | ||
self.connect_kwargs["username"] = "somecertuser" # neuter pw auth | ||
self._test_connection( | ||
# NOTE: SSHClient is able to take either the key or the cert & will | ||
# set up its internals as needed | ||
key_filename=_support( | ||
os.path.join("cert_support", "test_rsa.key-cert.pub") | ||
), | ||
server_name="SSH-2.0-OpenSSH_{}".format(ver), | ||
) | ||
assert ( | ||
self.tc._transport._agreed_pubkey_algorithm | ||
== "{}[email protected]".format(alg) | ||
) | ||
|
||
def test_old_openssh_needs_ssh_rsa_for_certs_not_rsa_sha2(self): | ||
self._cert_algo_test(ver="7.7", alg="ssh-rsa") | ||
|
||
def test_newer_openssh_uses_rsa_sha2_for_certs_not_ssh_rsa(self): | ||
# NOTE: 512 happens to be first in our list and is thus chosen | ||
self._cert_algo_test(ver="7.8", alg="rsa-sha2-512") | ||
|
||
def test_default_key_locations_trigger_cert_loads_if_found(self): | ||
# TODO: what it says on the tin: ~/.ssh/id_rsa tries to load | ||
# ~/.ssh/id_rsa-cert.pub. Right now no other tests actually test that | ||
|