diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index a770e08..fda4623 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -4,10 +4,16 @@ Changelog ********* -v0.5.0 +v0.4.3 ====== -Released: not yet +Released: August 15, 2017 + +Bugs Fixed +---------- + + * Private key checksum calculations were not getting stored for ECDSA keys; this has been fixed. + * The test suite gpg wrappers have been replaced with use of the `gpg `_ package. (#171) v0.4.2 ====== diff --git a/pgpy/_author.py b/pgpy/_author.py index d872f38..708adf2 100644 --- a/pgpy/_author.py +++ b/pgpy/_author.py @@ -15,4 +15,4 @@ __author__ = "Michael Greene" __copyright__ = "Copyright (c) 2014 Michael Greene" __license__ = "BSD" -__version__ = str(LooseVersion("0.5.0")) +__version__ = str(LooseVersion("0.4.3")) diff --git a/pgpy/packet/fields.py b/pgpy/packet/fields.py index e70fcf3..f0f7816 100644 --- a/pgpy/packet/fields.py +++ b/pgpy/packet/fields.py @@ -1303,6 +1303,7 @@ def _generate(self, oid): self.x = MPI(pubn.x) self.y = MPI(pubn.y) self.s = MPI(pk.private_numbers().private_value) + self._compute_chksum() def parse(self, packet): super(ECDSAPriv, self).parse(packet) diff --git a/requirements-test.txt b/requirements-test.txt index 1eca9e4..9aa678a 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -3,4 +3,5 @@ pytest pytest-cov pytest-ordering flake8 -pep8-naming \ No newline at end of file +pep8-naming +gpg==1.8.0 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index e108b23..668cc29 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,14 +1,10 @@ """PGPy conftest""" import pytest -import contextlib import glob +import gpg import os -import re -import select -import subprocess import sys -import time from distutils.version import LooseVersion @@ -16,7 +12,8 @@ openssl_ver = LooseVersion(openssl.backend.openssl_version_text().split(' ')[1]) gpg_ver = LooseVersion('0') -pgpdump_ver = LooseVersion('0') +python_gpg_ver = LooseVersion(gpg.version.versionstr) +gnupghome = os.path.join(os.path.dirname(__file__), 'gnupghome') # ensure external commands we need to run exist @@ -33,214 +30,6 @@ sys.path.insert(1, os.path.join(os.getcwd(), 'tests')) -def _which(cmd): - for d in iter(p for p in os.getenv('PATH').split(':') if os.path.isdir(p)): - if cmd in os.listdir(d) and os.access(os.path.realpath(os.path.join(d, cmd)), os.X_OK): - return os.path.join(d, cmd) - - -# run a subprocess command, wait for it to complete, and then return decoded output -def _run(bin, *binargs, **pkw): - _default_pkw = {'stdout': subprocess.PIPE, - 'stderr': subprocess.PIPE} - - popen_kwargs = _default_pkw.copy() - popen_kwargs.update(pkw) - - cmd = subprocess.Popen([bin] + list(binargs), **popen_kwargs) - cmd.wait() - cmdo, cmde = cmd.communicate() - - cmdo = cmdo.decode('latin-1') if cmdo is not None else "" - cmde = cmde.decode('latin-1') if cmde is not None else "" - - return cmdo, cmde - - -_gpg_bin = _which('gpg2') -_gpg_args = ('--options', './pgpy.gpg.conf', '--expert', '--status-fd') -_gpg_env = {} -_gpg_env['GNUPGHOME'] = os.path.abspath(os.path.abspath('tests/testdata')) -_gpg_kwargs = dict() -_gpg_kwargs['cwd'] = 'tests/testdata' -_gpg_kwargs['env'] = _gpg_env -_gpg_kwargs['stdout'] = subprocess.PIPE -_gpg_kwargs['stderr'] = subprocess.STDOUT -_gpg_kwargs['close_fds'] = False - - -# GPG boilerplate function -def _gpg(*gpg_args, **popen_kwargs): - # gpgfd is our "read" end of the pipe - # _gpgfd is gpg's "write" end - gpgfd, _gpgfd = os.pipe() - - # on python >= 3.4, we need to set _gpgfd as inheritable - # older versions do not have this function - if sys.version_info >= (3, 4): - os.set_inheritable(_gpgfd, True) - - args = (_gpg_bin,) + _gpg_args + (str(_gpgfd),) + gpg_args - kwargs = _gpg_kwargs.copy() - kwargs.update(popen_kwargs) - - try: - # use this as the buffer for collecting status-fd output - c = bytearray() - - cmd = subprocess.Popen(args, **kwargs) - while cmd.poll() is None: - while gpgfd in select.select([gpgfd,], [], [], 0)[0]: - c += os.read(gpgfd, 1) - - else: - # sleep for a bit - time.sleep(0.010) - - # finish reading if needed - while gpgfd in select.select([gpgfd,], [], [], 0)[0]: - c += os.read(gpgfd, 1) - - # collect stdout and stderr - o, e = cmd.communicate() - - finally: - # close the pipes we used for this - os.close(gpgfd) - os.close(_gpgfd) - - return c.decode('latin-1'), (o or b'').decode('latin-1'), (e or b'').decode('latin-1') - - -# fixtures -@pytest.fixture() -def gpg_import(): - @contextlib.contextmanager - def _gpg_import(*keypaths): - # if GPG version is 2.1 or newer, we need to add a setup/teardown step in creating the keybox folder - if gpg_ver >= '2.1': - if not os.path.exists('tests/testdata/private-keys-v1.d'): - os.mkdir('tests/testdata/private-keys-v1.d') - time.sleep(0.5) - - gpgc, gpgo, gpge = _gpg('--import', *list(keypaths)) - - try: - yield gpgo - - finally: - [os.remove(f) for f in glob.glob('tests/testdata/testkeys.*')] - if gpg_ver >= '2.1': - [os.remove(f) for f in glob.glob('tests/testdata/private-keys-v1.d/*')] - - time.sleep(0.5) - - return _gpg_import - - -@pytest.fixture() -def gpg_check_sigs(): - def _gpg_check_sigs(*keyids): - gpgc, gpgo, gpge = _gpg('--check-sigs', *keyids) - return 'sig-' not in gpgo - - return _gpg_check_sigs - - -@pytest.fixture() -def gpg_verify(): - sfd_verify = re.compile(r'^\[GNUPG:\] (?:GOOD|EXP)SIG (?P[0-9A-F]+) .*' - r'^\[GNUPG:\] VALIDSIG (?:[0-9A-F]{,24})\1', flags=re.MULTILINE | re.DOTALL) - - def _gpg_verify(gpg_subjpath, gpg_sigpath=None, keyid=None): - rargs = [gpg_sigpath, gpg_subjpath] if gpg_sigpath is not None else [gpg_subjpath,] - - gpgc, gpgo, gpge = _gpg('--verify', *rargs) - - sigs = [ sv.group('keyid') for sv in sfd_verify.finditer(gpgc) ] - - if keyid is not None: - return keyid in sigs - - return sigs - - return _gpg_verify - - -@pytest.fixture -def gpg_decrypt(): - sfd_decrypt = re.compile(r'^\[GNUPG:\] BEGIN_DECRYPTION\n' - r'^\[GNUPG:\] DECRYPTION_INFO \d+ \d+\n' - r'^\[GNUPG:\] PLAINTEXT (?:62|74|75) (?P\d+) (?P.*)\n' - r'^\[GNUPG:\] PLAINTEXT_LENGTH \d+\n' - r'\[GNUPG:\] DECRYPTION_OKAY\n' - r'(?:^\[GNUPG:\] GOODMDC\n)?' - r'^\[GNUPG:\] END_DECRYPTION', flags=re.MULTILINE) - - def _gpg_decrypt(encmsgpath, passphrase=None, keyid=None): - a = [] - - if passphrase is not None: - # create a pipe to send the passphrase to GnuPG through - pfdr, pfdw = os.pipe() - - # write the passphrase to the pipe buffer right away - os.write(pfdw, passphrase.encode()) - os.write(pfdw, b'\n') - - # on python >= 3.4, we need to set pfdr as inheritable - # older versions do not have this function - if sys.version_info >= (3, 4): - os.set_inheritable(pfdr, True) - - a.extend(['--batch', '--passphrase-fd', str(pfdr)]) - - elif keyid is not None: - a.extend(['--recipient', keyid]) - - a.extend(['--decrypt', encmsgpath]) - - gpgc, gpgo, gpge = _gpg(*a, stderr=subprocess.PIPE) - - status = sfd_decrypt.match(gpgc) - return gpgo - - return _gpg_decrypt - - -@pytest.fixture -def gpg_print(): - sfd_text = re.compile(r'^\[GNUPG:\] PLAINTEXT (?:62|74|75) (?P\d+) (?P.*)\n' - r'^\[GNUPG:\] PLAINTEXT_LENGTH (?P\d+)\n', re.MULTILINE) - - gpg_text = re.compile(r'(?:- gpg control packet\n)?(?P.*)', re.MULTILINE | re.DOTALL) - - def _gpg_print(infile): - gpgc, gpgo, gpge = _gpg('-o-', infile, stderr=subprocess.PIPE) - status = sfd_text.match(gpgc) - tlen = len(gpgo) if status is None else int(status.group('len')) - - return gpg_text.match(gpgo).group('text')[:tlen] - - return _gpg_print - - -@pytest.fixture -def gpg_keyid_file(): - def _gpg_keyid_file(infile): - c, o, e = _gpg('--list-packets', infile) - return re.findall(r'^\s+keyid: ([0-9A-F]+)', o, flags=re.MULTILINE) - return _gpg_keyid_file - - -@pytest.fixture() -def pgpdump(): - def _pgpdump(infile): - return _run(_which('pgpdump'), '-agimplu', infile)[0] - - return _pgpdump - - # pytest hooks # pytest_configure @@ -248,22 +37,26 @@ def _pgpdump(infile): def pytest_configure(config): print("== PGPy Test Suite ==") - # ensure commands we need exist - for cmd in ['gpg2', 'pgpdump']: - if _which(cmd) is None: - print("Error: Missing Command: " + cmd) - exit(-1) + # clear out gnupghome + clear_globs = [os.path.join(gnupghome, 'private-keys-v1.d', '*.key'), + os.path.join(gnupghome, '*.kbx*'), + os.path.join(gnupghome, '*.gpg*'), + os.path.join(gnupghome, '.*'), + os.path.join(gnupghome, 'random_seed')] + for fpath in iter(f for cg in clear_globs for f in glob.glob(cg)): + os.unlink(fpath) # get the GnuPG version - gpg_ver.parse(_run(_which('gpg2'), '--version')[0].splitlines()[0].split(' ')[-1]) + gpg_ver.parse(gpg.core.check_version()) - # get the pgpdump version - v, _ = _run(_which('pgpdump'), '-v', stderr=subprocess.STDOUT) - pgpdump_ver.parse(v.split(' ')[2].strip(',')) + # check that there are no keys loaded, now + with gpg.Context(offline=True) as c: + c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome) + assert len(list(c.keylist())) == 0 + assert len(list(c.keylist(secret=True))) == 0 # display the working directory and the OpenSSL/GPG/pgpdump versions print("Working Directory: " + os.getcwd()) print("Using OpenSSL " + str(openssl_ver)) print("Using GnuPG " + str(gpg_ver)) - print("Using pgpdump " + str(pgpdump_ver)) print("") diff --git a/tests/gnupghome/gpg-agent.conf b/tests/gnupghome/gpg-agent.conf new file mode 100644 index 0000000..d1b6ae3 --- /dev/null +++ b/tests/gnupghome/gpg-agent.conf @@ -0,0 +1 @@ +allow-loopback-pinentry diff --git a/tests/testdata/pgpy.gpg.conf b/tests/gnupghome/gpg.conf similarity index 67% rename from tests/testdata/pgpy.gpg.conf rename to tests/gnupghome/gpg.conf index 46702a3..30040dc 100644 --- a/tests/testdata/pgpy.gpg.conf +++ b/tests/gnupghome/gpg.conf @@ -2,11 +2,6 @@ # always expert expert trust-model always -# keyring stuff -no-default-keyring -keyring testkeys.pub.gpg -secret-keyring testkeys.sec.gpg -trustdb-name testkeys.trust.gpg # don't try to auto-locate keys except in the local keyring(s) no-auto-key-locate diff --git a/tests/test_04_PGP_objects.py b/tests/test_04_PGP_objects.py index 3dfa7e7..e0f3e8e 100644 --- a/tests/test_04_PGP_objects.py +++ b/tests/test_04_PGP_objects.py @@ -90,60 +90,48 @@ def test_format(self, un, unc, une, unce): _keyfiles = sorted(glob.glob('tests/testdata/blocks/*key*.asc')) +_fingerprints = {'dsapubkey.asc': '2B5BBB143BA0B290DCEE6668B798AE8990877201', + 'dsaseckey.asc': '2B5BBB143BA0B290DCEE6668B798AE8990877201', + 'eccpubkey.asc': '502D1A5365D1C0CAA69945390BA52DF0BAA59D9C', + 'eccseckey.asc': '502D1A5365D1C0CAA69945390BA52DF0BAA59D9C', + 'openpgp.js.pubkey.asc': 'C7C38ECEE94A4AD32DDB064E14AB44C74D1BDAB8', + 'openpgp.js.seckey.asc': 'C7C38ECEE94A4AD32DDB064E14AB44C74D1BDAB8', + 'rsapubkey.asc': 'F4294BC8094A7E0585C85E8637473B3758C44F36', + 'rsaseckey.asc': 'F4294BC8094A7E0585C85E8637473B3758C44F36',} class TestPGPKey(object): @pytest.mark.parametrize('kf', _keyfiles, ids=[os.path.basename(f) for f in _keyfiles]) - def test_load_from_file(self, kf, gpg_keyid_file): + def test_load_from_file(self, kf): key, _ = PGPKey.from_file(kf) - # TODO: maybe store the fingerprint instead of relying on a particular version of GnuPG...? - if 'ecc' in kf and gpg_ver < '2.1': - assert key.fingerprint - - else: - assert key.fingerprint.keyid in gpg_keyid_file(kf.replace('tests/testdata/', '')) + assert key.fingerprint == _fingerprints[os.path.basename(kf)] @pytest.mark.parametrize('kf', _keyfiles, ids=[os.path.basename(f) for f in _keyfiles]) - def test_load_from_str(self, kf, gpg_keyid_file): + def test_load_from_str(self, kf): with open(kf, 'r') as tkf: key, _ = PGPKey.from_blob(tkf.read()) - # TODO: maybe store the fingerprint instead of relying on a particular version of GnuPG...? - if 'ecc' in kf and gpg_ver < '2.1': - assert key.fingerprint - - else: - assert key.fingerprint.keyid in gpg_keyid_file(kf.replace('tests/testdata/', '')) + assert key.fingerprint == _fingerprints[os.path.basename(kf)] @pytest.mark.regression(issue=140) @pytest.mark.parametrize('kf', _keyfiles, ids=[os.path.basename(f) for f in _keyfiles]) - def test_load_from_bytes(self, kf, gpg_keyid_file): + def test_load_from_bytes(self, kf): with open(kf, 'rb') as tkf: key, _ = PGPKey.from_blob(tkf.read()) - # TODO: maybe store the fingerprint instead of relying on a particular version of GnuPG...? - if 'ecc' in kf and gpg_ver < '2.1': - assert key.fingerprint - - else: - assert key.fingerprint.keyid in gpg_keyid_file(kf.replace('tests/testdata/', '')) + assert key.fingerprint == _fingerprints[os.path.basename(kf)] @pytest.mark.regression(issue=140) @pytest.mark.parametrize('kf', _keyfiles, ids=[os.path.basename(f) for f in _keyfiles]) - def test_load_from_bytearray(self, kf, gpg_keyid_file): + def test_load_from_bytearray(self, kf): tkb = bytearray(os.stat(kf).st_size) with open(kf, 'rb') as tkf: tkf.readinto(tkb) key, _ = PGPKey.from_blob(tkb) - # TODO: maybe store the fingerprint instead of relying on a particular version of GnuPG...? - if 'ecc' in kf and gpg_ver < '2.1': - assert key.fingerprint - - else: - assert key.fingerprint.keyid in gpg_keyid_file(kf.replace('tests/testdata/', '')) + assert key.fingerprint == _fingerprints[os.path.basename(kf)] @pytest.mark.parametrize('kf', sorted(filter(lambda f: not f.endswith('enc.asc'), glob.glob('tests/testdata/keys/*.asc')))) def test_save(self, kf): diff --git a/tests/test_05_actions.py b/tests/test_05_actions.py index cbee46b..08c1c38 100644 --- a/tests/test_05_actions.py +++ b/tests/test_05_actions.py @@ -2,10 +2,11 @@ """ test doing things with keys/signatures/etc """ import pytest -from conftest import gpg_ver +from conftest import gpg_ver, gnupghome import copy import glob +import gpg import itertools import os import six @@ -45,9 +46,25 @@ def EncodedNamedTemporaryFile(mode, **kw): class TestPGPMessage(object): + @staticmethod + def gpg_message(msg): + with gpg.Context(offline=True) as c: + c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome) + msg, _ = c.verify(gpg.Data(string=str(msg))) + return msg + + @staticmethod + def gpg_decrypt(msg, passphrase): + with gpg.Context(offline=True) as c: + c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome) + msg, decres, _ = c.decrypt(gpg.Data(string=str(msg)), passphrase=passphrase) + + assert decres + return msg + @pytest.mark.parametrize('comp_alg,sensitive', itertools.product(CompressionAlgorithm, [False, True])) - def test_new(self, comp_alg, sensitive, gpg_print): + def test_new(self, comp_alg, sensitive): mtxt = u"This is a new message!" msg = PGPMessage.new(mtxt, compression=comp_alg, sensitive=sensitive) @@ -58,14 +75,12 @@ def test_new(self, comp_alg, sensitive, gpg_print): assert msg.message == mtxt assert msg._compression == comp_alg - with tempfile.NamedTemporaryFile('w+') as mf: - mf.write(str(msg)) - mf.flush() - assert gpg_print(mf.name) == mtxt + # see if GPG can parse our message + assert self.gpg_message(msg).decode('utf-8') == mtxt @pytest.mark.parametrize('comp_alg,sensitive,path', itertools.product(CompressionAlgorithm, [False, True], sorted(glob.glob('tests/testdata/files/literal*')))) - def test_new_from_file(self, comp_alg, sensitive, path, gpg_print): + def test_new_from_file(self, comp_alg, sensitive, path): msg = PGPMessage.new(path, file=True, compression=comp_alg, sensitive=sensitive) assert isinstance(msg, PGPMessage) @@ -74,16 +89,14 @@ def test_new_from_file(self, comp_alg, sensitive, path, gpg_print): assert msg.is_sensitive is sensitive with open(path, 'rb') as tf: - mtxt = tf.read().decode('latin-1') + mtxt = tf.read() - with tempfile.NamedTemporaryFile('w+') as mf: - mf.write(str(msg)) - mf.flush() - assert gpg_print(mf.name) == mtxt + # see if GPG can parse our message + assert self.gpg_message(msg) == mtxt @pytest.mark.regression(issue=154) # @pytest.mark.parametrize('cleartext', [False, True]) - def test_new_non_unicode(self, gpg_print): + def test_new_non_unicode(self): # this message text comes from http://www.columbia.edu/~fdc/utf8/ text = u'色は匂へど 散りぬるを\n' \ u'我が世誰ぞ 常ならむ\n' \ @@ -94,13 +107,11 @@ def test_new_non_unicode(self, gpg_print): assert msg.type == 'literal' assert msg.message == text.encode('jisx0213') - with tempfile.NamedTemporaryFile('w+') as mf: - mf.write(str(msg)) - mf.flush() - assert gpg_print(mf.name).encode('latin-1').decode('jisx0213').strip() == text + # see if GPG can parse our message + assert self.gpg_message(msg).decode('jisx0213') == text @pytest.mark.regression(issue=154) - def test_new_non_unicode_cleartext(self, gpg_print): + def test_new_non_unicode_cleartext(self): # this message text comes from http://www.columbia.edu/~fdc/utf8/ text = u'色は匂へど 散りぬるを\n' \ u'我が世誰ぞ 常ならむ\n' \ @@ -111,11 +122,6 @@ def test_new_non_unicode_cleartext(self, gpg_print): assert msg.type == 'cleartext' assert msg.message == text - with EncodedNamedTemporaryFile('w+', encoding='utf-8') as mf: - mf.write(six.text_type(msg).encode('utf-8') if six.PY2 else six.text_type(msg)) - mf.flush() - assert gpg_print(mf.name).encode('latin-1').decode('utf-8').strip() == text - def test_add_marker(self): msg = PGPMessage.new(u"This is a new message") marker = Packet(bytearray(b'\xa8\x03\x50\x47\x50')) @@ -133,7 +139,7 @@ def test_decrypt_passphrase_message(self, enc_msg): assert decmsg.message == b"This is stored, literally\\!\n\n" @pytest.mark.parametrize('comp_alg', CompressionAlgorithm) - def test_encrypt_passphrase(self, comp_alg, gpg_decrypt): + def test_encrypt_passphrase(self, comp_alg): mtxt = "This message is to be encrypted" msg = PGPMessage.new(mtxt, compression=comp_alg) assert not msg.is_encrypted @@ -153,11 +159,8 @@ def test_encrypt_passphrase(self, comp_alg, gpg_decrypt): assert decmsg.message == mtxt assert decmsg._compression == msg._compression - # decrypt with GPG - with tempfile.NamedTemporaryFile('w+') as mf: - mf.write(str(encmsg)) - mf.flush() - assert gpg_decrypt(mf.name, "QwertyUiop") == mtxt + # decrypt with GPG via python-gnupg + assert self.gpg_decrypt(encmsg, 'QwertyUiop').decode('utf-8') == decmsg.message def test_encrypt_passphrase_2(self): mtxt = "This message is to be encrypted" @@ -211,18 +214,11 @@ class TestPGPKey_Management(object): keys = {} def gpg_verify_key(self, key): - from conftest import gpg_import as gpgi - gpg_import = gpgi() - - if gpg_ver < '2.1' and key.key_algorithm in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}: - # GPG prior to 2.1.x does not support EC* keys - return + with gpg.Context(offline=True) as c: + c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome) + data, vres = c.verify(gpg.Data(string=str(key))) - with tempfile.NamedTemporaryFile('w+') as kf: - kf.write(str(key)) - kf.flush() - with gpg_import(kf.name) as kio: - assert 'invalid self-signature' not in kio + assert vres @pytest.mark.run('first') @pytest.mark.parametrize('alg,size', pkeyspecs) @@ -581,25 +577,44 @@ class TestPGPKey_Actions(object): sigs = {} msgs = {} - def gpg_verify(self, subject, sig, pubkey): + def gpg_verify(self, subject, sig=None, pubkey=None): # verify with GnuPG - from conftest import gpg_import as gpgi - from conftest import gpg_verify as gpgv - gpg_import = gpgi() - gpg_verify = gpgv() - - with tempfile.NamedTemporaryFile('w+') as sigf, \ - tempfile.NamedTemporaryFile('w+') as subjf, \ - tempfile.NamedTemporaryFile('w+') as keyf: - sigf.write(str(sig)) - subjf.write(str(subject)) - keyf.write(str(pubkey)) - sigf.flush() - subjf.flush() - keyf.flush() - - with gpg_import(keyf.name): - assert gpg_verify(subjf.name, sigf.name, keyid=sig.signer) + with gpg.Context(armor=True, offline=True) as c: + c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome) + + # do we need to import the key? + if pubkey: + try: + c.get_key(pubkey.fingerprint) + + except gpg.errors.KeyNotFound: + key_data = gpg.Data(string=str(pubkey)) + gpg.core.gpgme.gpgme_op_import(c.wrapped, key_data) + + vargs = [gpg.Data(string=str(subject))] + if sig is not None: + vargs += [gpg.Data(string=str(sig))] + + _, vres = c.verify(*vargs) + + assert vres + + def gpg_decrypt(self, message, privkey): + # decrypt with GnuPG + with gpg.Context(armor=True, offline=True) as c: + c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome) + + # do we need to import the key? + try: + c.get_key(privkey.fingerprint, True) + + except gpg.errors.KeyNotFound: + key_data = gpg.Data(string=str(privkey)) + gpg.core.gpgme.gpgme_op_import(c.wrapped, key_data) + + pt, _, _ = c.decrypt(gpg.Data(string=str(message)), verify=False) + + return pt # test non-management PGPKey actions using existing keys, i.e.: # - signing/verifying @@ -641,7 +656,7 @@ def test_verify_string(self, targette_pub, string): assert sv assert sig in sv - def test_sign_message(self, targette_sec, targette_pub, message, gpg_import, gpg_verify): + def test_sign_message(self, targette_sec, targette_pub, message): # test signing a message sig = targette_sec.sign(message) @@ -652,13 +667,7 @@ def test_sign_message(self, targette_sec, targette_pub, message, gpg_import, gpg message |= sig # verify with GnuPG - with tempfile.NamedTemporaryFile('w+') as mf, tempfile.NamedTemporaryFile('w+') as pubf: - mf.write(str(message)) - pubf.write(str(targette_pub)) - mf.flush() - pubf.flush() - with gpg_import(pubf.name): - assert gpg_verify(mf.name, keyid=sig.signer) + self.gpg_verify(message, pubkey=targette_pub) @pytest.mark.run(after='test_sign_message') def test_verify_message(self, targette_pub, message): @@ -667,7 +676,7 @@ def test_verify_message(self, targette_pub, message): assert sv assert len(sv) > 0 - def test_sign_ctmessage(self, targette_sec, targette_pub, ctmessage, gpg_import, gpg_verify): + def test_sign_ctmessage(self, targette_sec, targette_pub, ctmessage): # test signing a cleartext message expire_at = datetime.utcnow() + timedelta(days=1) @@ -680,13 +689,7 @@ def test_sign_ctmessage(self, targette_sec, targette_pub, ctmessage, gpg_import, ctmessage |= sig # verify with GnuPG - with tempfile.NamedTemporaryFile('w+') as ctmf, tempfile.NamedTemporaryFile('w+') as pubf: - ctmf.write(str(ctmessage)) - pubf.write(str(targette_pub)) - ctmf.flush() - pubf.flush() - with gpg_import(pubf.name): - assert gpg_verify(ctmf.name, keyid=sig.signer) + self.gpg_verify(ctmessage, pubkey=targette_pub) @pytest.mark.run(after='test_sign_ctmessage') def test_verify_ctmessage(self, targette_pub, ctmessage): @@ -822,18 +825,9 @@ def test_verify_key(self, pub, abe): assert sv assert len(list(sv.good_signatures)) > 0 - def test_gpg_import_abe(self, abe, gpg_import, gpg_check_sigs): + def test_gpg_import_abe(self, abe): # verify all of the things we did to Abe's key with GnuPG in one fell swoop - with tempfile.NamedTemporaryFile('w+') as abef: - abef.write(str(abe)) - abef.flush() - - # import all of the public keys first - with gpg_import(*(os.path.realpath(f) for f in sorted(glob.glob('tests/testdata/keys/*.pub.asc')))): - # import Abe's key - with gpg_import(abef.name) as kio: - assert 'invalid self-signature' not in kio - assert gpg_check_sigs(abe.fingerprint.keyid) + self.gpg_verify(abe) @pytest.mark.parametrize('pub,cipher', itertools.product(pubkeys, sorted(SymmetricKeyAlgorithm)), @@ -854,7 +848,7 @@ def test_encrypt_message(self, pub, cipher): @pytest.mark.run(after='test_encrypt_message') @pytest.mark.parametrize('sf,cipher', itertools.product(sorted(glob.glob('tests/testdata/keys/*.sec.asc')), sorted(SymmetricKeyAlgorithm))) - def test_decrypt_message(self, sf, cipher, gpg_import, gpg_print): + def test_decrypt_message(self, sf, cipher): # test decrypting a message sec, _ = PGPKey.from_file(sf) if (sec.fingerprint, cipher) not in self.msgs: @@ -870,12 +864,7 @@ def test_decrypt_message(self, sf, cipher, gpg_import, gpg_print): # GnuPG prior to 2.1.x does not support EC* keys, so skip this step return - with tempfile.NamedTemporaryFile('w+') as emsgf: - emsgf.write(str(emsg)) - emsgf.flush() - - with gpg_import(os.path.realpath(sf)) as kf: - assert gpg_print(emsgf.name) == dmsg.message + assert self.gpg_decrypt(emsg, sec).decode('utf-8') == dmsg.message @pytest.mark.run(after='test_encrypt_message') @pytest.mark.parametrize('sf,cipher', diff --git a/tests/test_99_regressions.py b/tests/test_99_regressions.py index 06cc9af..6890405 100644 --- a/tests/test_99_regressions.py +++ b/tests/test_99_regressions.py @@ -1,16 +1,18 @@ """ I've got 99 problems but regression testing ain't one """ +from conftest import gpg_ver, gnupghome + +import gpg import os import pytest import glob -import tempfile import warnings from pgpy import PGPKey from pgpy.types import Armorable @pytest.mark.regression(issue=56) -def test_reg_bug_56(gpg_import, gpg_verify): +def test_reg_bug_56(): # some imports only used by this regression test import hashlib from datetime import datetime @@ -147,19 +149,15 @@ def test_reg_bug_56(gpg_import, gpg_verify): assert pk.verify(sigsubject, sig) # with GnuPG - with tempfile.NamedTemporaryFile('w+') as subjf, \ - tempfile.NamedTemporaryFile('w+') as sigf, \ - tempfile.NamedTemporaryFile('w+') as pubf: - subjf.write(sigsubject.decode('latin-1')) - sigf.write(str(sig)) - pubf.write(str(pk)) - - subjf.flush() - sigf.flush() - pubf.flush() - - with gpg_import(pubf.name): - assert gpg_verify(subjf.name, sigf.name) + with gpg.Context(armor=True, offline=True) as c: + c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=gnupghome) + + # import the key + key_data = gpg.Data(string=pub) + gpg.core.gpgme.gpgme_op_import(c.wrapped, key_data) + + _, vres = c.verify(gpg.Data(string=sigsubject.decode('latin-1')), gpg.Data(string=str(sig))) + assert vres @pytest.mark.regression(issue=157) diff --git a/tox.ini b/tox.ini index c4ed7d2..cd0b4bf 100644 --- a/tox.ini +++ b/tox.ini @@ -17,6 +17,7 @@ passenv = HOME ARCHFLAGS LDFLAGS CFLAGS INCLUDE LIB LD_LIBRARY_PATH PATH deps = cryptography>=1.1 enum34 + gpg==1.8.0 pyasn1 six>=1.9.0 singledispatch diff --git a/tox.sh b/tox.sh index 7718d3b..acb2478 100755 --- a/tox.sh +++ b/tox.sh @@ -7,4 +7,4 @@ if [[ $(uname) == "Darwin" ]] && command -v brew &>/dev/null && brew list openss export CFLAGS="-I/usr/local/opt/openssl/include" fi -tox $* +exec tox $*