Skip to content

Commit

Permalink
[tls] use PyOpenSSL to validate certificate
Browse files Browse the repository at this point in the history
We cannot depend on reaching into cryptography's internals to get a
handle on OpenSSL, so instead use PyOpenSSL.

This allows us to unpin our cryptography version.
  • Loading branch information
jlaine committed Feb 3, 2022
1 parent a993c10 commit 363f0f9
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 97 deletions.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
packages=["aioquic", "aioquic.asyncio", "aioquic.h0", "aioquic.h3", "aioquic.quic"],
install_requires=[
"certifi",
"cryptography >= 3.1, < 4",
"cryptography >= 3.1",
"pylsqpack >= 0.3.3, < 0.4.0",
"pyopenssl >= 20",
],
)
95 changes: 15 additions & 80 deletions src/aioquic/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.bindings.openssl.binding import Binding
from cryptography.hazmat.primitives import hashes, hmac, serialization
from cryptography.hazmat.primitives.asymmetric import (
dsa,
Expand All @@ -38,14 +37,10 @@
)
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from OpenSSL import crypto

from .buffer import Buffer

binding = Binding()
binding.init_static_locks()
ffi = binding.ffi
lib = binding.lib

TLS_VERSION_1_2 = 0x0303
TLS_VERSION_1_3 = 0x0304
TLS_VERSION_1_3_DRAFT_28 = 0x7F1C
Expand Down Expand Up @@ -204,29 +199,6 @@ def load_pem_x509_certificates(data: bytes) -> List[x509.Certificate]:
return certificates


def openssl_assert(ok: bool, func: str) -> None:
if not ok:
lib.ERR_clear_error()
raise AlertInternalError("OpenSSL call to %s failed" % func)


def openssl_decode_string(charp) -> str:
return ffi.string(charp).decode("utf-8") if charp else ""


def openssl_encode_path(s: Optional[str]) -> Any:
if s is not None:
return os.fsencode(s)
return ffi.NULL


def cert_x509_ptr(certificate: x509.Certificate) -> Any:
"""
Accessor for private attribute.
"""
return getattr(certificate, "_x509")


def verify_certificate(
certificate: x509.Certificate,
chain: List[x509.Certificate] = [],
Expand Down Expand Up @@ -263,63 +235,26 @@ def verify_certificate(
except ssl.CertificateError as exc:
raise AlertBadCertificate("\n".join(exc.args)) from exc

# verify certificate chain
store = lib.X509_STORE_new()
openssl_assert(store != ffi.NULL, "X509_store_new")
store = ffi.gc(store, lib.X509_STORE_free)

# load default CAs
openssl_assert(
lib.X509_STORE_set_default_paths(store), "X509_STORE_set_default_paths"
)
openssl_assert(
lib.X509_STORE_load_locations(
store,
openssl_encode_path(certifi.where()),
openssl_encode_path(None),
),
"X509_STORE_load_locations",
)

# load extra CAs
# load CAs
store = crypto.X509Store()
store.load_locations(certifi.where())
if cadata is not None:
for cert in load_pem_x509_certificates(cadata):
openssl_assert(
lib.X509_STORE_add_cert(store, cert_x509_ptr(cert)),
"X509_STORE_add_cert",
)
store.add_cert(crypto.X509.from_cryptography(cert))

if cafile is not None or capath is not None:
openssl_assert(
lib.X509_STORE_load_locations(
store, openssl_encode_path(cafile), openssl_encode_path(capath)
),
"X509_STORE_load_locations",
)

chain_stack = lib.sk_X509_new_null()
openssl_assert(chain_stack != ffi.NULL, "sk_X509_new_null")
chain_stack = ffi.gc(chain_stack, lib.sk_X509_free)
for cert in chain:
openssl_assert(
lib.sk_X509_push(chain_stack, cert_x509_ptr(cert)), "sk_X509_push"
)
store.load_locations(cafile, capath)

store_ctx = lib.X509_STORE_CTX_new()
openssl_assert(store_ctx != ffi.NULL, "X509_STORE_CTX_new")
store_ctx = ffi.gc(store_ctx, lib.X509_STORE_CTX_free)
openssl_assert(
lib.X509_STORE_CTX_init(
store_ctx, store, cert_x509_ptr(certificate), chain_stack
),
"X509_STORE_CTX_init",
# verify certificate chain
store_ctx = crypto.X509StoreContext(
store,
crypto.X509.from_cryptography(certificate),
[crypto.X509.from_cryptography(cert) for cert in chain],
)

res = lib.X509_verify_cert(store_ctx)
if not res:
err = lib.X509_STORE_CTX_get_error(store_ctx)
err_str = openssl_decode_string(lib.X509_verify_cert_error_string(err))
raise AlertBadCertificate(err_str)
try:
store_ctx.verify_certificate()
except crypto.X509StoreContextError as exc:
raise AlertBadCertificate(exc.args[0][2])


class CipherSuite(IntEnum):
Expand Down
16 changes: 0 additions & 16 deletions tests/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,22 +1296,6 @@ def test_verify_certificate_chain_self_signed(self):
server_name="localhost",
)

@patch("aioquic.tls.lib.X509_STORE_new")
def test_verify_certificate_chain_internal_error(self, mock_store_new):
mock_store_new.return_value = tls.ffi.NULL

certificate, _ = generate_ec_certificate(
common_name="localhost", curve=ec.SECP256R1
)

with self.assertRaises(tls.AlertInternalError) as cm:
verify_certificate(
cadata=certificate.public_bytes(serialization.Encoding.PEM),
certificate=certificate,
server_name="localhost",
)
self.assertEqual(str(cm.exception), "OpenSSL call to X509_store_new failed")

def test_verify_dates(self):
certificate, _ = generate_ec_certificate(
common_name="example.com", curve=ec.SECP256R1
Expand Down

0 comments on commit 363f0f9

Please sign in to comment.